Триггеры в Alembic + SQLAlchemy
Предыстория и суть проблемы
При разработке платформы с условным внутренним балансом (все балансы пользователей учитывались обычными записями в бд без фактических транзакций и хранения средств) мне понадобилась система списания и начисления средств при исполнении внутренних сделок, к тому же все изменения балансов должны записываться в отдельные таблицы. Принципиально система выглядела так: источник сделки (source, сурс) создавал сделку, которую работник (worker, воркер) исполнял, за что получал соответствующую сумму, которая списывалась с баланса сурса. Изначально для этого была написана следующая функция:
На самом деле эта функция была изменена в попытке избавиться от проблемы, которую я опишу далее
Однако с повышением количества входящих на сервер запросов и добавлением некоторого дополнительного функционала, который потреблял большое количество ресурсов и временами блокировал потоки на беке (предположение, на самом деле из-за чего это происходило на самом деле, я так и не понял наверняка) некоторые списания и начисления стали дублироваться или вноситься некорректно. И если с дублями было довольно просто разобраться, добавив в соответствующий столбец параметр unique, не допускающий эти самые дубли, то с некорректными записями бороться было сложнее. Для лучшего понимания происходящего представлю модель таблицы для записи изменений балансов:
Суть некорректности некоторых записей заключалась в том, что в некоторых последовательных строках таблицы могли быть одинаковые балансы до. Соответственно при записи двух сделок на сумму 10 и 20 тысяч у.е. я получал две условные строки:
Хотя по логике системы должно было быть:
Соответственно балансы пользователей также изменялись некорректно. Множественные попытки решить проблему различными костылями и не очень привели к тому, что умный человек по имени Артем предложил мне не насиловать себе мозг использовать триггеры. Из определения на википедии:
Три́ггер (англ. trigger) — хранимая процедура особого типа, которую пользователь не вызывает непосредственно, а исполнение которой обусловлено действием по модификации данных: добавлениемINSERT, удалениемDELETEстроки в заданной таблице, или изменениемUPDATEданных в определённом столбце заданной таблицы реляционной базы данных.
Проще говоря уже не надо ничего менять ручками при изменении каких-то зависимых данных, а можно просто написать определенную функцию, которая будет делать это за нас.
Узнав о такой чудесной волшебной палке, я полез искать информацию, что же это, собсна, такое, и с чем эти ваши триггеры едят (спойлер - удобоваримого ответа на вопрос, как же решить мою задачу, я не нашел). Проведя несколько часов в попытках разобраться, как же победить эту хрень, я отчаялся и обратился к великому ChatGPT, который мне, хоть и с некоторыми оговорками, все же помог.
Решение проблемы
Итак, что же по задумке должно происходить (но средствами питоновского кода происходило довольно хреново):
- В таблице сделок (deals) статус сделки изменяется на success
- Воркеру начисляется рассчитанная сумма за сделку (добавляется строка в таблицу worker_balance_changes)
- При успешном добавлении записи баланс воркера изменяется
- У сурса списывается сумма за сделку (так же добавляется строка в таблицу source_balance_changes)
- При успешном добавлении записи баланс сурса изменяется
То есть при изменении статуса сделки с определенным ID в таблице deals пользователю, который обрабатывал сделку, должна начислиться сумма этой сделки, запись о чем попадает в таблицу worker_balance_changes, а измененный баланс вписывается соответствующему воркеру в таблицу workers. Обратный процесс происходит у сурса. Короче сомнительных связей и условий взаимодействия больше, чем у Габсбургов.
В конечном счете все это безобразие вылилось в две следующие команды:
CREATE OR REPLACE FUNCTION handle_deals_update()
RETURNS TRIGGER AS $
DECLARE
s_before NUMERIC;
s_delta NUMERIC;
BEGIN
IF TG_OP = 'UPDATE' AND NEW.status = 'success' THEN
s_before := (SELECT balance FROM workers WHERE id = OLD.worker_id);
s_delta := (SELECT worker_amount FROM deals
WHERE foreign_id = OLD.foreign_id);
INSERT INTO worker_balance_changes(user_id,
deal_id,
before,
delta,
after,
registered)
VALUES (OLD.worker_id,
OLD.foreign_id,
s_before,
s_delta,
s_before + s_delta,
NEW.closed_at);
UPDATE workers SET balance = s_before + s_delta
WHERE id = OLD.worker_id;
s_before := (SELECT balance FROM sources WHERE id = OLD.source_id);
s_delta := (SELECT source_amount FROM deals
WHERE foreign_id = OLD.foreign_id);
INSERT INTO source_balance_changes(source_id,
deal_id,
before,
delta,
after,
registered)
VALUES (OLD.source_id, OLD.foreign_id,
s_before,
s_delta,
s_before - s_delta,
NEW.closed_at);
UPDATE sources SET balance = s_before - s_delta
WHERE id = OLD.source_id;
END IF;
RETURN NEW;
END;
$ LANGUAGE plpgsql;
И:CREATE TRIGGER deals_status_update
AFTER UPDATE ON deals
FOR EACH ROW
WHEN (NEW.status = 'success')
EXECUTE FUNCTION handle_deals_update()
Осталось дело за малым: накатить за успех миграцию и прогнать ее в базе. Так как разбираться в том, как алхимия взаимодействует с алембиком при создании триггеров, было уже выше моих сил, я попросту создал пустую миграцию командой:
alembic revision --autogenerate # опционально можно добавить флаг -m [пометка к ревизии], например -m 'added triggers'
И в появившийся файл миграции попросту вставил в сгенерированную функцию upgrade выполнение написанных мной ранее SQL команд через:
op.execute("""ТЕКСТ КОМАНДЫ""")
После чего прогнал миграцию командой:
И успешно (хаха, я еще полчаса исправлял синтаксис этих самых команд) протестировал триггеры, поражаясь сему чудному инструменту.
И нет, я точно не буду переписывать к хренам половину функционала бека просто потому что мне, несмотря на все страдания в процессе осознания, понравились триггеры.
Итог
Статья получилась не самой информативной для опытных разработчиков, но может быть она когда-то поможет такому же чайнику, как и я, сэкономить пару часов на решении подобной проблемы.
Пы.сы. Промт для ЧатГПТ выглядел так, можно адаптировать под свои нужды:
"Я использую alembic и sqlalchemy для написания запросов к базе данных. Мне нужно написать триггер при помощи sqlalchemy для добавления строк в двух таблицах workers_changes и source_changes при условии изменения значения в таблице deals. То есть когда в таблице deals status=pending строки с id = X меняется на значение status=paid, в мои таблицы workers_changes и source_changes должны добавиться строки, зависящие от значения balance в таблицах workers и sources соответственно, а также от значения amount таблицы deals. Скажи, как я могу выполнить эту задачу"