AsyncIO и синхронные вызовы
С чего все началось
Недавно на работе столкнулся с интересным багом связанным с асинхронностью во время написания тестов. Мне нужно было залогиниться в сервисе несколько раз с неправильными данными, чтобы получить блокировку аккаунта (собственно механизм блокировки я и тестировал).
Немного про технологический стек: Python 3.11, FastAPI, Redis, PostgreSQL. Тесты — синхронные на PyTest. Есть своя обвязка для запуска интеграционных тестов с запуском необходимого окружения с помощью Docker и самого тестируемого сервиса в контейнере.
Довольно типичная задача. Пишу тест — делаем несколько POST запросов по роуту входа с правильным юзернеймом и неправильным паролем и потом пробуем сделать запрос с полностью корректными данными.
Внезапная проблема
При запуске тестов в первый раз все нормально - тест отработал за 20 сек, вроде кажется долго, но там специфическая система подготовки окружения, спишем на нее эту задержку. Готовлю Pull Request, прогоняю все тесты вместе с моим новым и получаю ошибку TimeoutError — запрос к роуту логина завершился неудачно, т. к. сервер не ответил в течении 60 сек на запрос, то клиент автоматически его завершил по таймауту.
Ну, может баг или какой-то side effect сработал. Возможно тесты локально плохо работают в целом. Тем более на созвоне говорили, что с интеграционными тестами какая-то беда и они падают регулярно.
Можно было забить и списать это все на плохой проект, среду, организацию тестов и т д, но что-то здесь было не так...
Локализация проблемы
Основную информацию о работе любого приложения можно получить с помощью логирования. Добавляем дополнительных сообщений после каждой инструкции, понижаем уровень отображения до DEBUG и смотрим логи.
Внезапно сервис на самом деле нормально отвечал и воспринимал запрос как вполне обычный. Да, он отвечал на него 67 сек., но статус был правильный — 401 Unauthorized (не смог войти). Почему он не мог мне ответить в тесте быстро?
Первая мысль, т. к. речь шла о веб-сервисе — где-то происходит блокировка сервиса, а потом его отпускает. Смотрим в код, а код роута логина полностью синхронный, там что-то типа:
@router.get("/login")
def login(...):
...
settings = settings_provider.get_settings()
if settings.count_failed_login_attempts:
account_locker = account_locker_factory.get_or_create(...)
# processing
...
Хоть FastAPI и полностью асинхронный фреймворк, но если роут описан как синхронная функция, то FastAPI его оборачивает в асинхронную корутину и выполняет в отдельном потоке, чтобы не блокировать работу веб-сервиса. Вроде все продумано, но почему-то сервис не отвечал.
Дополнительные логи и принты показали, что конкретно в ручке происходит зависание на этапе обращения к другому сервису. Получалась такая картина: тестовый клиент ходит в сервис авторизации, а сервис авторизации в сервис конфигурации за актуальными настройками. И зависает вот этот дополнительный запрос.
В сервисе конфигурации тоже ничего криминального не было обнаружено, просто запрос в БД, который сам по себе выполнялся за миллисекунды. Но весь ответ сервера работал либо быстро (до секунды), либо зависал на неопределенный срок до минуты.
Цель обнаружена
Было понятно, что где-то на сервисе конфигурации происходит процесс, который на минуту блокирует сервис, а потом его отпускает, но не было понятно, где его искать.
Через пару десятков запусков было понятно, что время задержки ответа строго варьируется от 0 до 60 сек. Довольно не случайное число, скорее всего где-то стоит таймаут выполнения задачи, которая должна выполняться не более 60 сек и потом она или выполняется успешно, или ее завершают извне. Внезапно помог обычный поиск числа 60 по проекту.
Нашелся таймер, который используется в библиотеке PortChecker для проверки доступности сервисов через отправку healthcheck-запросов: если сервис не отвечает, то пробуем минуту до него достучаться. В интеграционных тестах поднимались только необходимые сервисы для запуска конкретных тестов, но общее поведение не менялось, поэтому данные запросы уходили в пустоту и просто каждый раз возвращали ошибку.
Но это совсем другой роут, другая функция-обработчик отвечающя за проверку доступов сервиса /healthchecks. Эти запросы напрямую никак не участвовали в логине.
Так, код синхронный обнаружен, но ведь я уже выше писал: "FastAPI его оборачивает в асинхронную корутину и выполняет в отдельном потоке". В чем же может быть проблема? Обертка написана криво? Может быть неправильно определено место?
Ошибка закралась в сам роут отвечающий за проверку доступов сервисов: функция была обозначена как асинхронная, но под капотом выполняла синхронные блокирующие запросы. Соответственно для фреймворка "снаружи" это выглядит как неблокирующая функция из-за async, а то, что внутри она была написана именно в синхронной логике, не беспокоит ни сам фреймворк, ни, видимо, программиста, который ее написал.
@route.get("/healthcheck")
async def healthcheck_services(...):
...
PortChecker.check(...) # 60 сек пытаемся получить ответ и на это время блокируем весь сервер
...
Это один из немногих случаев, когда решение бага состоит не в написании нового кода, а только в удалении старого. Если FastAPI роут написан как синхронная функция, то он его вызывает в потоке и во время этого вызова не блокируется работа всего сервера. (ссылка на исходники)
async def run_endpoint_function(
*, dependant: Dependant, values: Dict[str, Any], is_coroutine: bool
) -> Any:
...
if is_coroutine:
return await dependant.call(**values)
else:
return await run_in_threadpool(dependant.call, **values)
Когда я сделал функцию синхронной, тесты стали стабильно работать и проблем больше не было.
Так что далеко не всё становится быстрее, когда бездумно используется асинхронный код.