Инкрементальное обновление данных с SCD2 и как его готовить
В одном из прошлых постов в телеграмме я сравнил подходы загрузки снэпшотом и инкрементом, про SCD2 тоже было. Теперь ближе к тому, как это реализовать. Код и практическое задание для тренировки лежат по ссылке.
Допустим, что на источнике дата-время создания записи регистрируется в created_at поле и при обновлении не меняется, при загрузке в хранилище — ingested_at, а историчность SCD2 будет в eff_from_dttm и eff_to_dttm. Типы данных везде timestamp (datetime). Загрузку сделаем в два этапа — предварительно отфильтруем данные и загрузим в staging, а потом — дополним данные в oda слое по SCD2 логике и получим историчность, как на картинке.
stg_extract
Вначале можно определить в core (или oda) слое max (created_at), вычесть из этого окно возможного обновления данных, и потом выполнить extract данных с источника для всех строк, у которых created_at > этого значения, то есть они появились позднее. Назовём эти отобранные данные датасетом. Целевой таблицей в хранилище будет таргет.
stg_transform и stg_load
Затем можно обогатить эти данные технической информацией загрузки (dag_run_id, source_id и пр.) и загрузить в стейджинг слой, таблица в котором предварительно очищается.
oda_extract и oda_transform_i
Теперь нужно понять, какие данные нужно вставить, а какие — обновить. Для этого подумаем логически:
1. если данные появились на источнике позже, чем последняя уже загруженная запись, значит их не нужно сравнивать детально — просто грузим
2. если строки созданы на источнике раньше, чем последняя загруженная в таргет запись, то это могут или обновлённые, или уже загруженные
oda_transform_u
При апдейте нужно сравнить значения всех столбцов неключевых бизнес-столбцов. Можно сделать это попарно, можно воспользоваться хэш-функцией вроде md5 или sha1, которые на основе строки на входе любой длины генерируют значение фиксированной длины.
Тогда объединим эти значения через конкатенацию, для безопасности разделим каким-нибудь знаком вроде '|' и подадим на вход хэш-функции. Получим набор данных с первичным ключом и хэшом от бизнес-столбцов. Такое преобразование нужно сделать как для 2-й части датасета, так и для экстракта из таргета за аналогичный период.
Здесь задаём последнее оставшееся техническое поле ingested_at = now (). После этого сравниваем хэши «слева» и «справа» и оставляем в датасете только те, где они различаются.
oda_load_i
Просто вставляем первую часть датасета.
oda_load_u
Обновляем eff_to_dttm = ingested_at - 1 second у загруженных в таргет строк. Это удобно делать через конструкцию UPDATE TABLE1 SET… FROM TABLE2 в самой базе, предварительно загрузив 2ю отфильтрованную часть датасета во временную таблицу или формируя CTE с константами на стороне ETL.
Затем строки вставляем с eff_from_dttm = ingested_at, а eff_to_dttm = максимальное время в системе (напр. 9999-12-31 23:59:59). Получим результат на картинке.
oda_transform_d
Осталось только поговорить про удаление.
Здесь нужно договориться с источником (и закрепить договорённость хотя бы в почте), что если они данные удаляют у себя, в хранилище они тоже будут помечаться на удаление. Если там данные хранятся ограниченное время, например год, то для записей старше года нужно вводить отдельные дополнительные проверки, чтобы в хранилище они действовали.
Если в инкременте не пришли строки для уже загруженных PK, значит их нужно пометить на удаление — «закрыть» период действия, приравняв eff_to_dttm = ingested_at и не вставляя новые строки. Тогда при работе с SCD2 таблицей данные уже не попадут на срез, задаваемый исторической меткой.
Например, строка действует до 31 января 2023, а мы забираем данные из таблицы oda на момент 9 часов 14 февраля 2023. Логика примерно такая:select pk, col1, col2 from oda.table where '2023-02-14 09:00:00': timestamp between eff_from_dttm and eff_to_dttm.
Ключевые заметки
- Работу с историчностью в ETL можно логически представить в виде FULL OUTER JOIN выборки из источника и таргета за один временной промежуток: вставляем то, чего нет в таргете; удаляем то, что есть в таргете, но пропало на источнике, обновляем остальные, если нужно.
- Одна из колонок историчности добавляется в первичный ключ каждой историчной таблицы.
- Удаляй 1 день или 1 секунду (в зависимости от гранулярности) из обновляемой строки, чтобы аналитики могли писать простые between, а не where loading_dt ≥ eff_from_dttm and loading_dt < eff_to_dttm.
- Мы не можем использовать для историчности фактический момент, в который обновились данные, потому что на источнике он не регистрируется — created_at не обновляется. Поэтому пользуемся временем обработки — загрузки в хранилище.
- Важно, чтобы набор столбцов на этапе расчёта хэшей в выборке из датасета и из таргета совпадал. Это нужно, чтобы при неизменных значениях на входе получилась одинаковая строка на выходе хэш-функции.
- Из-за логики работы с created_at на источнике, «долёты» или пропущенные при прошлой загрузке данные невозможны. Если у вас не так, лучше не бить на части _i и _u загрузку, а приходить сразу к FULL OUTER JOIN и последующему MERGE c ON CONFLICT.
- Если на источнике есть поле updated_at или обновляется created_at, окно забора добавлять не нужно — отбираем все записи, младше последней загруженной.
- Часто добавляют технические поля-флаги "is_current" или "is_active", "is_deleted", чтобы не воспроизводить их логику через eff_from_dttm и eff_to_dttm и для облегчения запросов.