Асинхронное программирование в Python: от простого к сложному
Введение
Привет! Сегодня мы разберемся с асинхронным программированием в Python. Если вы пишете на Python и еще не разобрались с асинхронностью - эта статья для вас. Разберем все базовые концепты, популярные инструменты и реальные примеры использования.
Часть 1: Основные концепты
Давайте для начала разберемся, что такое асинхронное программирование и зачем оно нужно.
Представьте, что вы разрабатываете бэкенд для e-commerce сайта. В синхронном подходе при оформлении заказа вы бы:
- Проверили товар на складе
- Дождались ответа от склада
- Отправили запрос в платежную систему
- Дождались ответа по оплате
- Отправили уведомление пользователю
- Отправили запрос на склад
- Пока ждем ответ, начали проверять платежные данные
- Параллельно подготовили шаблон уведомления
- К моменту получения всех ответов, уже можем сразу отправить результат
Вот это и есть асинхронность - возможность не простаивать в ожидании медленных операций, а в это время делать что-то полезное.
В Python для этого есть специальные инструменты - корутины. Выглядит это так:
async def process_order(order_id: int):
# Запускаем несколько задач параллельно
stock_info, payment_info, notification = await asyncio.gather(
check_stock(order_id),
process_payment(order_id),
prepare_notification(order_id)
)
async- говорит что функция асинхроннаяawait- указывает места, где мы можем "переключиться" на другую задачуasyncio.gather()- запускает несколько корутин параллельно
Но как это работает внутри? За всем этим следит event loop (цикл событий). Он как умный менеджер проекта:
- Знает все задачи, которые нужно выполнить
- Следит за их состоянием
- Переключается между ними в нужные моменты
- Раздает процессорное время тем, кто готов его использовать
Когда использовать асинхронность
- Много операций ввода/вывода
- Нужно обрабатывать множество параллельных соединений
- Есть длительные операции, которые часто "ждут" чего-то
Когда НЕ стоит использовать
- Много CPU-интенсивных вычислений
- Простые скрипты без параллельных задач
- Когда код должен выполняться строго последовательно
Операции ввода/вывода (I/O operations)
Давайте подробнее разберем, что такое операции ввода/вывода. Это не просто print() или input() - это любые операции, где программа ждет данные из внешнего источника. Примеры I/O операций:
- Запросы к базе данных (SELECT, INSERT, UPDATE)
- HTTP запросы к внешним API
- Чтение/запись файлов на диск
- Получение данных из очередей (Redis, RabbitMQ)
- Работа с сокетами
Реальный пример
Давайте посмотрим на простой, но реальный пример - получение данных из нескольких API:
async def get_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def fetch_all_data():
urls = [
'https://api1.example.com/data',
'https://api2.example.com/data',
'https://api3.example.com/data'
]
# Запускаем все запросы параллельно
results = await asyncio.gather(
*[get_data(url) for url in urls]
)
return results
# Запуск
asyncio.run(fetch_all_data())
В синхронном варианте мы бы ждали каждый запрос по очереди. С асинхронностью все запросы идут параллельно - пока ждем ответ от первого API, уже делаем запросы к остальным.
Часть 2: Задачи (Task) и инструменты управления ими
Теперь поговорим о задачах (Task) и главных инструментах для управления ими.
Что такое Task?
Task - это "обертка" вокруг корутины, которая позволяет:
- Отслеживать состояние выполнения (запущена, завершена, отменена)
- Получать результат выполнения
- Управлять жизненным циклом (запуск, отмена, ожидание)
- Обрабатывать ошибки
У Task есть несколько полезных свойств и методов:
task = asyncio.create_task(some_coro())
# Свойства таски
print(task.done()) # Завершена ли задача
print(task.cancelled()) # Была ли отменена
print(task.get_name()) # Имя задачи
print(task.get_coro()) # Получить корутину
print(task._state) # Текущее состояние (PENDING, RUNNING, FINISHED, CANCELLED)
# Методы управления
task.cancel() # Отменить задачу
task.set_name("my_task") # Установить имя
result = await task # Дождаться результата
Инструменты для работы с несколькими задачами
1. gather() - запуск всех задач параллельно
# Запускаем все задачи параллельно
results = await asyncio.gather(
task1(),
task2(),
task3(),
return_exceptions=True # Не падаем при ошибках
)
# Результаты в том же порядке, что и задачи
for result in results:
if isinstance(result, Exception):
print(f"Ошибка: {result}")
else:
process_result(result)
2. wait() - гибкий контроль выполнения
# Создаем задачи
tasks = [
asyncio.create_task(long_operation1()),
asyncio.create_task(long_operation2()),
asyncio.create_task(long_operation3())
]
# Варианты return_when:
# FIRST_COMPLETED - дождаться первой завершенной задачи
# FIRST_EXCEPTION - дождаться первой ошибки или завершения всех задач
# ALL_COMPLETED - дождаться завершения всех задач
# Пример: ждем первую завершенную
done, pending = await asyncio.wait(
tasks,
timeout=10,
return_when=asyncio.FIRST_COMPLETED
)
# Пример: ждем все с таймаутом
done, pending = await asyncio.wait(
tasks,
timeout=5,
return_when=asyncio.ALL_COMPLETED
)
# Обработка результатов
for task in done:
try:
result = task.result()
process_result(result)
except Exception as e:
handle_error(e)
# Отменяем незавершенные
for task in pending:
task.cancel()
Отмена задач
CancelledError возникает в трех случаях:
Вот пример правильной обработки отмены:
async def process_queue():
# Создаем подключение к базе
db = await create_db_connection()
# Создаем подзадачи для обработки
worker_tasks = []
try:
while True:
# Получаем новые задания
messages = await db.fetch_messages(limit=10)
# Создаем задачи для обработки
for msg in messages:
task = asyncio.create_task(
process_message(msg, db),
name=f"worker-{msg.id}"
)
worker_tasks.append(task)
# Ждем немного перед следующей проверкой
await asyncio.sleep(1)
except asyncio.CancelledError:
# 1. Отменяем все рабочие задачи
for task in worker_tasks:
if not task.done():
task.cancel()
# 2. Ждем их завершение
if worker_tasks:
await asyncio.wait(worker_tasks)
# 3. Закрываем соединение с БД
await db.close()
# 4. Пробрасываем отмену дальше
raise
Часть 3: Примитивы синхронизации
Теперь разберем интересную часть асинхронного программирования - примитивы синхронизации. Это инструменты, которые помогают нам управлять доступом к ресурсам и координировать работу корутин.
Semaphore - управление пулом ресурсов
async def process_data(semaphore, url):
async with semaphore: # Ждем доступный слот в пуле
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
async def main():
# Максимум 10 одновременных подключений
semaphore = asyncio.Semaphore(10)
urls = [f"https://api.example.com/data/{i}" for i in range(100)]
tasks = [process_data(semaphore, url) for url in urls]
results = await asyncio.gather(*tasks)
Практические применения Semaphore:
- Ограничение количества одновременных запросов к API (избегаем бана)
- Контроль количества одновременных подключений к базе данных
- Ограничение параллельной загрузки файлов на сервер
- Управление количеством одновременно обрабатываемых задач в пуле воркеров
Lock - механизм взаимного исключения
class AsyncCache:
def __init__(self):
self.lock = asyncio.Lock()
self.cache = {}
async def get_data(self, key):
# Чтение можно делать без блокировки
if key in self.cache:
return self.cache[key]
# Обновление кэша требует взаимного исключения
async with self.lock:
# Проверяем еще раз после получения блокировки
if key in self.cache:
return self.cache[key]
data = await fetch_expensive_data(key)
self.cache[key] = data
return data
Event - механизм сигнализации
async def worker(event, worker_id):
print(f"Процесс {worker_id} ожидает инициализации системы")
await event.wait()
print(f"Процесс {worker_id} начал обработку")
async def main():
# Создаем событие
system_ready = asyncio.Event()
# Запускаем обработчики
workers = [asyncio.create_task(worker(system_ready, i))
for i in range(5)]
# Инициализируем систему
print("Загрузка конфигурации...")
await load_config()
print("Подключение к базе данных...")
await init_database()
# Сигнализируем о готовности системы
system_ready.set()
await asyncio.gather(*workers)
Практические применения Event:
Condition - комбинация Lock и Event
class ResourcePool:
def __init__(self):
self.condition = asyncio.Condition()
self.available_resources = []
async def acquire_resource(self):
async with self.condition:
while not self.available_resources:
# Ждем появления свободных ресурсов
await self.condition.wait()
return self.available_resources.pop()
async def release_resource(self, resource):
async with self.condition:
self.available_resources.append(resource)
# Уведомляем ожидающие корутины
self.condition.notify()
Практические применения Condition:
Часть 4: Продвинутая работа с Task и TaskGroup
Теперь поговорим подробнее о Task и рассмотрим TaskGroup (новинка Python 3.11), групповую обработку ошибок и полезные приемы при работе с задачами.
TaskGroup - современный способ управления группой задач
async def process_user(user_id: int):
# Имитация запросов к разным сервисам
if user_id % 3 == 0:
raise ValueError(f"Ошибка получения данных для пользователя {user_id}")
if user_id % 5 == 0:
raise ConnectionError(f"Ошибка соединения для пользователя {user_id}")
await asyncio.sleep(1)
return f"Данные пользователя {user_id}"
async def main():
try:
async with asyncio.TaskGroup() as tg:
# Создаем группу задач
tasks = [
tg.create_task(process_user(i), name=f"user_{i}")
for i in range(10)
]
except* ValueError as eg:
# Ловим все ошибки типа ValueError
print(f"Ошибки получения данных: {eg.exceptions}")
except* ConnectionError as eg:
# Отдельно обрабатываем ошибки соединения
print(f"Ошибки соединения: {eg.exceptions}")
except* Exception as eg:
# Все остальные ошибки
print(f"Непредвиденные ошибки: {eg.exceptions}")
Что такое except* и как это работает:
- Обычный except ловит только одно исключение
- except* может поймать несколько исключений одного типа
- Исключения группируются в ExceptionGroup
- При выходе из контекста TaskGroup все задачи автоматически отменяются
Полезные приемы при работе с задачами
1. Добавление callback для реакции на завершение
def log_result(task):
try:
result = task.result()
print(f"✅ Задача {task.get_name()} вернула: {result}")
except Exception as e:
print(f"❌ Задача {task.get_name()} упала с ошибкой: {e}")
async def main():
async with asyncio.TaskGroup() as tg:
task = tg.create_task(process_user(1), name="user_processor")
task.add_done_callback(log_result)
2. Получение информации о задаче
# Создаем задачу task = asyncio.create_task(some_coro(), name="important_task") # Доступная информация print(task.get_name()) # Имя задачи print(task.done()) # Завершена ли print(task.cancelled()) # Отменена ли print(task.exception()) # Исключение (если есть)
3. Таймаут для группы задач
async def main():
try:
async with asyncio.timeout(5):
async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(process_user(i))
for i in range(10)
]
except asyncio.TimeoutError:
print("Превышено время выполнения группы задач")
4. Ожидание первого успешного результата
async def fetch_with_fallback(urls: list[str]):
async def fetch_url(url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
raise ValueError(f"Ошибка получения данных: {response.status}")
# Создаем задачи для всех URL
tasks = [asyncio.create_task(fetch_url(url)) for url in urls]
try:
# Ждем первую успешно завершенную задачу
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
# Проверяем результаты завершенных задач
for task in done:
try:
return task.result()
except ValueError:
continue
# Отменяем оставшиеся задачи
for task in pending:
task.cancel()
raise ValueError("Не удалось получить данные ни с одного URL")
except Exception:
# Отменяем все задачи при ошибке
for task in tasks:
if not task.done():
task.cancel()
raise
Практические кейсы использования TaskGroup:
Часть 5: Очереди и потоки данных
Завершаем наш обзор асинхронного программирования очередями (Queue). Это инструменты, которые помогают организовать поток данных между корутинами.
Обработка данных через очередь
async def download_images(queue: asyncio.Queue, urls: list[str]):
"""Загружает изображения и кладет их в очередь"""
async with aiohttp.ClientSession() as session:
for url in urls:
try:
async with session.get(url) as response:
image_data = await response.read()
await queue.put((url, image_data))
print(f"Загружено: {url}")
except Exception as e:
print(f"Ошибка загрузки {url}: {e}")
async def process_images(queue: asyncio.Queue):
"""Обрабатывает изображения из очереди"""
while True:
try:
url, image_data = await queue.get()
try:
# Какая-то обработка изображения
await process_image(image_data)
print(f"Обработано: {url}")
finally:
# Важно! Отмечаем задачу как выполненную
queue.task_done()
except asyncio.CancelledError:
break
async def main():
# Создаем очередь с ограничением
queue = asyncio.Queue(maxsize=10)
# Список URL для загрузки
urls = [
"https://example.com/image1.jpg",
"https://example.com/image2.jpg",
"https://example.com/image3.jpg"
]
# Запускаем загрузчик и обработчики
downloader = asyncio.create_task(download_images(queue, urls))
processors = [
asyncio.create_task(process_images(queue))
for _ in range(3) # 3 обработчика
]
# Ждем завершения загрузки
await downloader
# Ждем обработки всех изображений
await queue.join()
# Отменяем обработчики
for proc in processors:
proc.cancel()
Виды очередей в asyncio
Queue в asyncio бывает трёх видов:
- Queue - обычная очередь, первым пришел - первым вышел
- PriorityQueue - элементы обрабатываются по приоритету
- LifoQueue - последним пришел - первым вышел
Важные моменты при работе с очередями
try:
item = await queue.get()
await process_item(item)
finally:
queue.task_done() # Выполняем всегда!
# Очередь не больше 1000 элементов queue = asyncio.Queue(maxsize=1000)
# Положили все элементы
await producer()
# Дождались обработки всех
await queue.join()
# Теперь можно останавливать обработчики
for consumer in consumers:
consumer.cancel()
Практические сценарии использования очередей
- Загрузка и обработка файлов
- Парсинг данных с сайтов
- Обработка сообщений из брокера
- Параллельная обработка задач с ограничением нагрузки
Заключение
В этой статье мы прошли путь от простых концепций асинхронного программирования к продвинутым техникам и инструментам. Мы разобрали:
- Основные принципы асинхронности в Python
- Работу с корутинами и event loop
- Управление задачами и их группами
- Примитивы синхронизации для координации корутин
- Очереди для организации потоков данных
Асинхронное программирование - мощный инструмент, который позволяет создавать эффективные и масштабируемые приложения. Главное - правильно определить, когда его использовать, и соблюдать хорошие практики при написании кода.
Надеюсь, эта статья помогла вам лучше понять, как работает асинхронное программирование в Python и как применять его в своих проектах. Удачного кодинга!