April 8

Асинхронное программирование в Python: от простого к сложному

Введение

Привет! Сегодня мы разберемся с асинхронным программированием в Python. Если вы пишете на Python и еще не разобрались с асинхронностью - эта статья для вас. Разберем все базовые концепты, популярные инструменты и реальные примеры использования.

Часть 1: Основные концепты

Давайте для начала разберемся, что такое асинхронное программирование и зачем оно нужно.

Представьте, что вы разрабатываете бэкенд для e-commerce сайта. В синхронном подходе при оформлении заказа вы бы:

  1. Проверили товар на складе
  2. Дождались ответа от склада
  3. Отправили запрос в платежную систему
  4. Дождались ответа по оплате
  5. Отправили уведомление пользователю

Но ведь можно эффективнее:

  • Отправили запрос на склад
  • Пока ждем ответ, начали проверять платежные данные
  • Параллельно подготовили шаблон уведомления
  • К моменту получения всех ответов, уже можем сразу отправить результат

Вот это и есть асинхронность - возможность не простаивать в ожидании медленных операций, а в это время делать что-то полезное.

В Python для этого есть специальные инструменты - корутины. Выглядит это так:

async def process_order(order_id: int):
    # Запускаем несколько задач параллельно
    stock_info, payment_info, notification = await asyncio.gather(
        check_stock(order_id),
        process_payment(order_id),
        prepare_notification(order_id)
    )

Ключевые слова здесь:

  • async - говорит что функция асинхронная
  • await - указывает места, где мы можем "переключиться" на другую задачу
  • asyncio.gather() - запускает несколько корутин параллельно

Но как это работает внутри? За всем этим следит event loop (цикл событий). Он как умный менеджер проекта:

  • Знает все задачи, которые нужно выполнить
  • Следит за их состоянием
  • Переключается между ними в нужные моменты
  • Раздает процессорное время тем, кто готов его использовать

Когда использовать асинхронность

  • Много операций ввода/вывода
  • Нужно обрабатывать множество параллельных соединений
  • Есть длительные операции, которые часто "ждут" чего-то

Когда НЕ стоит использовать

  • Много CPU-интенсивных вычислений
  • Простые скрипты без параллельных задач
  • Когда код должен выполняться строго последовательно

Операции ввода/вывода (I/O operations)

Давайте подробнее разберем, что такое операции ввода/вывода. Это не просто print() или input() - это любые операции, где программа ждет данные из внешнего источника. Примеры I/O операций:

  • Запросы к базе данных (SELECT, INSERT, UPDATE)
  • HTTP запросы к внешним API
  • Чтение/запись файлов на диск
  • Получение данных из очередей (Redis, RabbitMQ)
  • Работа с сокетами

Реальный пример

Давайте посмотрим на простой, но реальный пример - получение данных из нескольких API:

async def get_data(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

async def fetch_all_data():
    urls = [
        'https://api1.example.com/data',
        'https://api2.example.com/data',
        'https://api3.example.com/data'
    ]
    
    # Запускаем все запросы параллельно
    results = await asyncio.gather(
        *[get_data(url) for url in urls]
    )
    return results

# Запуск
asyncio.run(fetch_all_data())

В синхронном варианте мы бы ждали каждый запрос по очереди. С асинхронностью все запросы идут параллельно - пока ждем ответ от первого API, уже делаем запросы к остальным.

Часть 2: Задачи (Task) и инструменты управления ими

Теперь поговорим о задачах (Task) и главных инструментах для управления ими.

Что такое Task?

Task - это "обертка" вокруг корутины, которая позволяет:

  • Отслеживать состояние выполнения (запущена, завершена, отменена)
  • Получать результат выполнения
  • Управлять жизненным циклом (запуск, отмена, ожидание)
  • Обрабатывать ошибки

У Task есть несколько полезных свойств и методов:

task = asyncio.create_task(some_coro())

# Свойства таски
print(task.done())  # Завершена ли задача
print(task.cancelled())  # Была ли отменена
print(task.get_name())  # Имя задачи
print(task.get_coro())  # Получить корутину
print(task._state)  # Текущее состояние (PENDING, RUNNING, FINISHED, CANCELLED)

# Методы управления
task.cancel()  # Отменить задачу
task.set_name("my_task")  # Установить имя
result = await task  # Дождаться результата

Инструменты для работы с несколькими задачами

1. gather() - запуск всех задач параллельно

# Запускаем все задачи параллельно
results = await asyncio.gather(
    task1(),
    task2(),
    task3(),
    return_exceptions=True  # Не падаем при ошибках
)

# Результаты в том же порядке, что и задачи
for result in results:
    if isinstance(result, Exception):
        print(f"Ошибка: {result}")
    else:
        process_result(result)

2. wait() - гибкий контроль выполнения

# Создаем задачи
tasks = [
    asyncio.create_task(long_operation1()),
    asyncio.create_task(long_operation2()),
    asyncio.create_task(long_operation3())
]

# Варианты return_when:
# FIRST_COMPLETED - дождаться первой завершенной задачи
# FIRST_EXCEPTION - дождаться первой ошибки или завершения всех задач
# ALL_COMPLETED - дождаться завершения всех задач

# Пример: ждем первую завершенную
done, pending = await asyncio.wait(
    tasks,
    timeout=10,
    return_when=asyncio.FIRST_COMPLETED
)

# Пример: ждем все с таймаутом
done, pending = await asyncio.wait(
    tasks,
    timeout=5,
    return_when=asyncio.ALL_COMPLETED
)

# Обработка результатов
for task in done:
    try:
        result = task.result()
        process_result(result)
    except Exception as e:
        handle_error(e)

# Отменяем незавершенные
for task in pending:
    task.cancel()

Отмена задач

CancelledError возникает в трех случаях:

  • Явный вызов task.cancel()
  • Таймаут в wait()
  • Отмена родительской задачи

Вот пример правильной обработки отмены:

async def process_queue():
    # Создаем подключение к базе
    db = await create_db_connection()
    # Создаем подзадачи для обработки
    worker_tasks = []
    
    try:
        while True:
            # Получаем новые задания
            messages = await db.fetch_messages(limit=10)
            
            # Создаем задачи для обработки
            for msg in messages:
                task = asyncio.create_task(
                    process_message(msg, db),
                    name=f"worker-{msg.id}"
                )
                worker_tasks.append(task)
            
            # Ждем немного перед следующей проверкой
            await asyncio.sleep(1)
            
    except asyncio.CancelledError:
        # 1. Отменяем все рабочие задачи
        for task in worker_tasks:
            if not task.done():
                task.cancel()
        
        # 2. Ждем их завершение
        if worker_tasks:
            await asyncio.wait(worker_tasks)
            
        # 3. Закрываем соединение с БД
        await db.close()
        
        # 4. Пробрасываем отмену дальше
        raise

Часть 3: Примитивы синхронизации

Теперь разберем интересную часть асинхронного программирования - примитивы синхронизации. Это инструменты, которые помогают нам управлять доступом к ресурсам и координировать работу корутин.

Рассмотрим:

  • Semaphore
  • Lock
  • Event
  • Condition

Semaphore - управление пулом ресурсов

async def process_data(semaphore, url):
    async with semaphore:  # Ждем доступный слот в пуле
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                return await response.json()

async def main():
    # Максимум 10 одновременных подключений
    semaphore = asyncio.Semaphore(10)
    
    urls = [f"https://api.example.com/data/{i}" for i in range(100)]
    tasks = [process_data(semaphore, url) for url in urls]
    
    results = await asyncio.gather(*tasks)

Практические применения Semaphore:

  • Ограничение количества одновременных запросов к API (избегаем бана)
  • Контроль количества одновременных подключений к базе данных
  • Ограничение параллельной загрузки файлов на сервер
  • Управление количеством одновременно обрабатываемых задач в пуле воркеров

Lock - механизм взаимного исключения

class AsyncCache:
    def __init__(self):
        self.lock = asyncio.Lock()
        self.cache = {}
        
    async def get_data(self, key):
        # Чтение можно делать без блокировки
        if key in self.cache:
            return self.cache[key]
            
        # Обновление кэша требует взаимного исключения
        async with self.lock:
            # Проверяем еще раз после получения блокировки
            if key in self.cache:
                return self.cache[key]
                
            data = await fetch_expensive_data(key)
            self.cache[key] = data
            return data

Практические применения Lock:

  • Защита кэша при конкурентных обновлениях
  • Атомарное обновление счетчиков (например, статистики)

Event - механизм сигнализации

async def worker(event, worker_id):
    print(f"Процесс {worker_id} ожидает инициализации системы")
    await event.wait()
    print(f"Процесс {worker_id} начал обработку")
    
async def main():
    # Создаем событие
    system_ready = asyncio.Event()
    
    # Запускаем обработчики
    workers = [asyncio.create_task(worker(system_ready, i)) 
              for i in range(5)]
    
    # Инициализируем систему
    print("Загрузка конфигурации...")
    await load_config()
    print("Подключение к базе данных...")
    await init_database()
    
    # Сигнализируем о готовности системы
    system_ready.set()
    
    await asyncio.gather(*workers)

Практические применения Event:

  • Синхронизация старта нескольких сервисов
  • Реализация паттерна publish-subscribe

Condition - комбинация Lock и Event

class ResourcePool:
    def __init__(self):
        self.condition = asyncio.Condition()
        self.available_resources = []
        
    async def acquire_resource(self):
        async with self.condition:
            while not self.available_resources:
                # Ждем появления свободных ресурсов
                await self.condition.wait()
            
            return self.available_resources.pop()
            
    async def release_resource(self, resource):
        async with self.condition:
            self.available_resources.append(resource)
            # Уведомляем ожидающие корутины
            self.condition.notify()

Практические применения Condition:

  • Управление пулом подключений к базе данных
  • Координация процессов обработки и подготовки данных

Часть 4: Продвинутая работа с Task и TaskGroup

Теперь поговорим подробнее о Task и рассмотрим TaskGroup (новинка Python 3.11), групповую обработку ошибок и полезные приемы при работе с задачами.

TaskGroup - современный способ управления группой задач

async def process_user(user_id: int):
    # Имитация запросов к разным сервисам
    if user_id % 3 == 0:
        raise ValueError(f"Ошибка получения данных для пользователя {user_id}")
    if user_id % 5 == 0:
        raise ConnectionError(f"Ошибка соединения для пользователя {user_id}")
    await asyncio.sleep(1)
    return f"Данные пользователя {user_id}"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            # Создаем группу задач
            tasks = [
                tg.create_task(process_user(i), name=f"user_{i}")
                for i in range(10)
            ]
            
    except* ValueError as eg:
        # Ловим все ошибки типа ValueError
        print(f"Ошибки получения данных: {eg.exceptions}")
        
    except* ConnectionError as eg:
        # Отдельно обрабатываем ошибки соединения
        print(f"Ошибки соединения: {eg.exceptions}")
        
    except* Exception as eg:
        # Все остальные ошибки
        print(f"Непредвиденные ошибки: {eg.exceptions}")

Что такое except* и как это работает:

  • Обычный except ловит только одно исключение
  • except* может поймать несколько исключений одного типа
  • Исключения группируются в ExceptionGroup
  • При выходе из контекста TaskGroup все задачи автоматически отменяются

Преимущества TaskGroup:

  • Автоматическая отмена всех задач при ошибке
  • Группировка исключений

Полезные приемы при работе с задачами

1. Добавление callback для реакции на завершение

def log_result(task):
    try:
        result = task.result()
        print(f"✅ Задача {task.get_name()} вернула: {result}")
    except Exception as e:
        print(f"❌ Задача {task.get_name()} упала с ошибкой: {e}")

async def main():
    async with asyncio.TaskGroup() as tg:
        task = tg.create_task(process_user(1), name="user_processor")
        task.add_done_callback(log_result)

2. Получение информации о задаче

# Создаем задачу
task = asyncio.create_task(some_coro(), name="important_task")

# Доступная информация
print(task.get_name())  # Имя задачи
print(task.done())      # Завершена ли
print(task.cancelled()) # Отменена ли
print(task.exception()) # Исключение (если есть)

3. Таймаут для группы задач

async def main():
    try:
        async with asyncio.timeout(5):
            async with asyncio.TaskGroup() as tg:
                tasks = [
                    tg.create_task(process_user(i))
                    for i in range(10)
                ]
    except asyncio.TimeoutError:
        print("Превышено время выполнения группы задач")

4. Ожидание первого успешного результата

async def fetch_with_fallback(urls: list[str]):
    async def fetch_url(url: str):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                if response.status == 200:
                    return await response.json()
                raise ValueError(f"Ошибка получения данных: {response.status}")

    # Создаем задачи для всех URL
    tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
    
    try:
        # Ждем первую успешно завершенную задачу
        done, pending = await asyncio.wait(
            tasks, 
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Проверяем результаты завершенных задач
        for task in done:
            try:
                return task.result()
            except ValueError:
                continue
                
        # Отменяем оставшиеся задачи
        for task in pending:
            task.cancel()
            
        raise ValueError("Не удалось получить данные ни с одного URL")
            
    except Exception:
        # Отменяем все задачи при ошибке
        for task in tasks:
            if not task.done():
                task.cancel()
        raise

Практические кейсы использования TaskGroup:

  • Параллельные запросы к нескольким API
  • Выполнение связанных операций с разными сервисами

Часть 5: Очереди и потоки данных

Завершаем наш обзор асинхронного программирования очередями (Queue). Это инструменты, которые помогают организовать поток данных между корутинами.

Обработка данных через очередь

async def download_images(queue: asyncio.Queue, urls: list[str]):
    """Загружает изображения и кладет их в очередь"""
    async with aiohttp.ClientSession() as session:
        for url in urls:
            try:
                async with session.get(url) as response:
                    image_data = await response.read()
                    await queue.put((url, image_data))
                    print(f"Загружено: {url}")
            except Exception as e:
                print(f"Ошибка загрузки {url}: {e}")
                
async def process_images(queue: asyncio.Queue):
    """Обрабатывает изображения из очереди"""
    while True:
        try:
            url, image_data = await queue.get()
            try:
                # Какая-то обработка изображения
                await process_image(image_data)
                print(f"Обработано: {url}")
            finally:
                # Важно! Отмечаем задачу как выполненную
                queue.task_done()
        except asyncio.CancelledError:
            break

async def main():
    # Создаем очередь с ограничением
    queue = asyncio.Queue(maxsize=10)
    
    # Список URL для загрузки
    urls = [
        "https://example.com/image1.jpg",
        "https://example.com/image2.jpg",
        "https://example.com/image3.jpg"
    ]
    
    # Запускаем загрузчик и обработчики
    downloader = asyncio.create_task(download_images(queue, urls))
    processors = [
        asyncio.create_task(process_images(queue))
        for _ in range(3)  # 3 обработчика
    ]
    
    # Ждем завершения загрузки
    await downloader
    
    # Ждем обработки всех изображений
    await queue.join()
    
    # Отменяем обработчики
    for proc in processors:
        proc.cancel()

Виды очередей в asyncio

Queue в asyncio бывает трёх видов:

  • Queue - обычная очередь, первым пришел - первым вышел
  • PriorityQueue - элементы обрабатываются по приоритету
  • LifoQueue - последним пришел - первым вышел

Важные моменты при работе с очередями

  1. Не забывайте про queue.task_done():
try:
    item = await queue.get()
    await process_item(item)
finally:
    queue.task_done()  # Выполняем всегда!
  1. Используйте maxsize чтобы контролировать память:
# Очередь не больше 1000 элементов
queue = asyncio.Queue(maxsize=1000)
  1. Дожидайтесь обработки всех элементов через join():
# Положили все элементы
await producer()

# Дождались обработки всех
await queue.join()

# Теперь можно останавливать обработчики
for consumer in consumers:
    consumer.cancel()

Практические сценарии использования очередей

  • Загрузка и обработка файлов
  • Парсинг данных с сайтов
  • Обработка сообщений из брокера
  • Параллельная обработка задач с ограничением нагрузки

Заключение

В этой статье мы прошли путь от простых концепций асинхронного программирования к продвинутым техникам и инструментам. Мы разобрали:

  • Основные принципы асинхронности в Python
  • Работу с корутинами и event loop
  • Управление задачами и их группами
  • Примитивы синхронизации для координации корутин
  • Очереди для организации потоков данных

Асинхронное программирование - мощный инструмент, который позволяет создавать эффективные и масштабируемые приложения. Главное - правильно определить, когда его использовать, и соблюдать хорошие практики при написании кода.

Надеюсь, эта статья помогла вам лучше понять, как работает асинхронное программирование в Python и как применять его в своих проектах. Удачного кодинга!

Поддержать на Boosty
Посмотреть на Youtube
Почитать в TG