January 26, 2025

AsyncIO и синхронные вызовы

С чего все началось

Недавно на работе столкнулся с интересным багом связанным с асинхронностью во время написания тестов. Мне нужно было залогиниться в сервисе несколько раз с неправильными данными, чтобы получить блокировку аккаунта (собственно механизм блокировки я и тестировал).

Немного про технологический стек: Python 3.11, FastAPI, Redis, PostgreSQL. Тесты — синхронные на PyTest. Есть своя обвязка для запуска интеграционных тестов с запуском необходимого окружения с помощью Docker и самого тестируемого сервиса в контейнере.

Довольно типичная задача. Пишу тест — делаем несколько POST запросов по роуту входа с правильным юзернеймом и неправильным паролем и потом пробуем сделать запрос с полностью корректными данными.

План теста

Внезапная проблема

При запуске тестов в первый раз все нормально - тест отработал за 20 сек, вроде кажется долго, но там специфическая система подготовки окружения, спишем на нее эту задержку. Готовлю Pull Request, прогоняю все тесты вместе с моим новым и получаю ошибку TimeoutError — запрос к роуту логина завершился неудачно, т. к. сервер не ответил в течении 60 сек на запрос, то клиент автоматически его завершил по таймауту.

Как тест работал реально

Ну, может баг или какой-то side effect сработал. Возможно тесты локально плохо работают в целом. Тем более на созвоне говорили, что с интеграционными тестами какая-то беда и они падают регулярно.

Можно было забить и списать это все на плохой проект, среду, организацию тестов и т д, но что-то здесь было не так...

Локализация проблемы

Основную информацию о работе любого приложения можно получить с помощью логирования. Добавляем дополнительных сообщений после каждой инструкции, понижаем уровень отображения до DEBUG и смотрим логи.

Внезапно сервис на самом деле нормально отвечал и воспринимал запрос как вполне обычный. Да, он отвечал на него 67 сек., но статус был правильный — 401 Unauthorized (не смог войти). Почему он не мог мне ответить в тесте быстро?

Первая мысль, т. к. речь шла о веб-сервисе — где-то происходит блокировка сервиса, а потом его отпускает. Смотрим в код, а код роута логина полностью синхронный, там что-то типа:

@router.get("/login")
def login(...):
    ...
    settings = settings_provider.get_settings()
    if settings.count_failed_login_attempts:
        account_locker = account_locker_factory.get_or_create(...)
        # processing
    ...

Хоть FastAPI и полностью асинхронный фреймворк, но если роут описан как синхронная функция, то FastAPI его оборачивает в асинхронную корутину и выполняет в отдельном потоке, чтобы не блокировать работу веб-сервиса. Вроде все продумано, но почему-то сервис не отвечал.

Дополнительные логи и принты показали, что конкретно в ручке происходит зависание на этапе обращения к другому сервису. Получалась такая картина: тестовый клиент ходит в сервис авторизации, а сервис авторизации в сервис конфигурации за актуальными настройками. И зависает вот этот дополнительный запрос.

В сервисе конфигурации тоже ничего криминального не было обнаружено, просто запрос в БД, который сам по себе выполнялся за миллисекунды. Но весь ответ сервера работал либо быстро (до секунды), либо зависал на неопределенный срок до минуты.

Цель обнаружена

Было понятно, что где-то на сервисе конфигурации происходит процесс, который на минуту блокирует сервис, а потом его отпускает, но не было понятно, где его искать.

Через пару десятков запусков было понятно, что время задержки ответа строго варьируется от 0 до 60 сек. Довольно не случайное число, скорее всего где-то стоит таймаут выполнения задачи, которая должна выполняться не более 60 сек и потом она или выполняется успешно, или ее завершают извне. Внезапно помог обычный поиск числа 60 по проекту.

Нашелся таймер, который используется в библиотеке PortChecker для проверки доступности сервисов через отправку healthcheck-запросов: если сервис не отвечает, то пробуем минуту до него достучаться. В интеграционных тестах поднимались только необходимые сервисы для запуска конкретных тестов, но общее поведение не менялось, поэтому данные запросы уходили в пустоту и просто каждый раз возвращали ошибку.

Полная схема запросов

Но это совсем другой роут, другая функция-обработчик отвечающя за проверку доступов сервиса /healthchecks. Эти запросы напрямую никак не участвовали в логине.

Так, код синхронный обнаружен, но ведь я уже выше писал: "FastAPI его оборачивает в асинхронную корутину и выполняет в отдельном потоке". В чем же может быть проблема? Обертка написана криво? Может быть неправильно определено место?

Ошибка закралась в сам роут отвечающий за проверку доступов сервисов: функция была обозначена как асинхронная, но под капотом выполняла синхронные блокирующие запросы. Соответственно для фреймворка "снаружи" это выглядит как неблокирующая функция из-за async, а то, что внутри она была написана именно в синхронной логике, не беспокоит ни сам фреймворк, ни, видимо, программиста, который ее написал.

@route.get("/healthcheck")
async def healthcheck_services(...):
    ...
    PortChecker.check(...) # 60 сек пытаемся получить ответ и на это время блокируем весь сервер
    ...

Это один из немногих случаев, когда решение бага состоит не в написании нового кода, а только в удалении старого. Если FastAPI роут написан как синхронная функция, то он его вызывает в потоке и во время этого вызова не блокируется работа всего сервера. (ссылка на исходники)

async def run_endpoint_function(
   *, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
    ...
    if is_coroutine:
       return await dependant.call(**values)
    else:
       return await run_in_threadpool(dependant.call, **values)
После исправления

Когда я сделал функцию синхронной, тесты стали стабильно работать и проблем больше не было.

Так что далеко не всё становится быстрее, когда бездумно используется асинхронный код.

Полезные материалы

  1. [DEV] Turn sync function to async - Python Tips
  2. [Python Docs] Coroutines and Tasks - asyncio.to_thread
  3. [Советы разработчикам] Потокобезопасность и конкурентный доступ