Apache Superset ClickHouse
February 10, 2024

Как мы создавали внутреннее хранилище данных в ClickHouse

Перевод статьи: https://clickhouse.com/blog/building-a-data-warehouse-with-clickhouse


Подписывайтесь на телеграм канал Apache Superset: https://t.me/apache_superset_bi


В ClickHouse мы видим свою миссию в предоставлении нашим клиентам и пользователям сверхбыстрой облачной аналитической базы данных, которую можно использовать для внутренней аналитики и аналитики, ориентированной на клиентов. ClickHouse Cloud позволяет нашим клиентам хранить и обрабатывать практически неограниченные объемы данных, что помогает им принимать решения на основе данных. Принятие решений, основанных на фактах, а не на предположениях, сегодня имеет решающее значение для большинства успешных предприятий.

Конечно, внутри нашей команды мы придерживаемся такого же подхода. Разработка и эксплуатация нашей облачной базы данных генерирует огромный объем данных, которые можно использовать для планирования мощности, ценообразования, лучшего понимания потребностей наших клиентов и финансовой отчетности. Десятки источников данных, сотни терабайт и около сотни BI-пользователей и специальных пользователей… И угадайте, что для этого мы используем ClickHouse Cloud :)

В этом посте я расскажу, как устроено наше внутреннее хранилище данных (DWH), какой стек мы используем и как наше хранилище данных будет развиваться в ближайшие несколько месяцев.

Требования и источники данных

Мы запустили ClickHouse Cloud в режиме Private Preview в мае 2022 года и в то же время поняли, что хотим лучше понимать наших клиентов: как они используют наш сервис, с какими трудностями они сталкиваются, как мы можем им помочь и как мы можем сделать наши цены доступны и разумны для них. Для этого нам нужно было собирать и обрабатывать данные из нескольких внутренних источников данных: плоскости данных, которая отвечает за работу модулей базы данных клиента, плоскости управления, которая отвечает за пользовательский интерфейс и операции с базой данных, а также AWS Billing, которая дает нам точные затраты на выполнение рабочих нагрузок клиентов.

Был короткий период времени, когда наш вице-президент по продуктам Таня Брагин ежедневно вручную анализировала рабочие нагрузки наших клиентов в Excel, используя поиск. Мне, как бывшему архитектору СХД, было обидно, что ей пришлось так бороться, и в результате родилась первая концепция внутренней СХД.

При разработке системы перед нами стоял ряд важных задач, которые мы стремились поддержать наших внутренних заинтересованных сторон, некоторые из которых перечислены ниже.

Команда продукта

Отслеживание показателей конверсии и удержания, использования функций, размера сервисов, их использования и выявления наиболее распространенных проблем. Проведение глубокого специального анализа.

Операционная группа

Отслеживание приблизительного дохода и предоставление доступа к некоторым данным Salesforce в режиме только для чтения для большинства сотрудников компании.

Отдел продаж

Просмотр настроек и использования конкретных клиентов: сколько услуг, сколько данных, типичные проблемы и т. д.

Инженерная команда

Настройка нашего автомасштабирования, отслеживание частоты ошибок запросов и использования функций БД.

Группа поддержки

Просмотр конкретной настройки клиента: услуги, использование, объем данных и т. д.

Маркетинговая команда

Отслеживание коэффициентов конверсии на вершине воронки, стоимости привлечения клиентов и других маркетинговых показателей.

Команда экономии затрат

Анализ затрат CSP и активная оптимизация наших обязательств CSP.

команда CI-CD

Отслеживание затрат CI-CD

Примечание. В нашем внутреннем хранилище данных мы не собираем, не храним и не обрабатываем какую-либо часть данных наших клиентов (большая часть которых зашифрована), например данные таблиц, текст запроса, сетевые данные и т. д. Например, для анализа запросов. , мы лишь собираем список используемых функций, время выполнения запроса, используемую память и некоторую другую метаинформацию. Мы никогда не собираем данные запроса или текст запроса.

Для этого мы разработали план получения данных из десятков источников, включая следующие.

Учитывая наши основные цели, мы сделали несколько предположений:

  • На нынешнем этапе достаточно детализации наших данных в один час. Это означает, что мы можем собирать и хранить агрегаты за каждый час.
  • На данный момент нам не нужно использовать подход CDC (или «сбор измененных данных»), поскольку он делает инфраструктуру СХД намного дороже. Традиционная прямая загрузка/ETL должна удовлетворить наши потребности. Если эти источники данных подлежат обновлению, мы можем выполнить полную перезагрузку данных.
  • Поскольку у нас есть отличная масштабируемая и быстрая база данных, нам не нужно выполнять преобразования ETL за пределами базы данных. Вместо этого мы используем ClickHouse напрямую для выполнения преобразований с помощью SQL. Это прекрасно работает.
  • В ClickHouse мы по своей природе являемся открытым исходным кодом, поэтому мы хотим, чтобы весь наш стек содержал только компоненты с открытым исходным кодом. Мы также любим вносить свой вклад.
  • Поскольку у нас очень разные типы источников данных, нам понадобится несколько инструментов и подходов для извлечения данных из этих источников. В то же время нам необходимо стандартизированное промежуточное хранилище.

Однако одно из других наших первоначальных предположений оказалось неверным. Мы предполагали, что, поскольку наша структура данных не такая сложная, нам будет достаточно иметь в СХД всего два логических слоя — необработанный слой и слой «киоска данных». Это была ошибка. На самом деле нам нужен был третий промежуточный уровень, хранящий внутренние бизнес-объекты. Мы объясним это ниже.

Архитектура

В результате мы получили следующую архитектуру:

  1. На высоком уровне наш стек можно описать так:
    • ClickHouse Cloud как основная база данных
    • Airflow как планировщик (инструмент планирования с открытым исходным кодом)
    • AWS S3 как промежуточное хранилище для данных RAW
    • Superset как внутренний инструмент BI и AD-HOC
  2. Мы используем различные инструменты и подходы для сбора данных из источников данных в несколько корзин S3:
    • Для плоскости управления, плоскости данных, сегмента и AWS CUR мы используем встроенные функции источника данных для экспорта данных.
    • Для выставления счетов GCP мы используем экспортные запросы BigQuery для экспорта данных в GCS, откуда они могут быть получены табличной функцией ClickHouse S3.
    • Для Salesforce мы используем AWS AppFlow.
    • Для захвата данных из M3ter мы написали собственное приложение. Изначально он был написан на Kotlin, позже мы перенесли его на Python.
    • Для Galaxy (который представлен кластером ClickHouse Cloud) мы используем табличную функцию ClickHouse S3 для экспорта данных в S3.
    • Для Marketo мы используем Fivetran.
    • Наконец, поскольку цены AWS и GCP меняются очень редко, мы решили не автоматизировать их загрузку, а создали несколько скриптов, которые помогут нам вручную обновлять цены CSP при необходимости.
  3. Для больших таблиц фактов мы собираем почасовые приращения. Для словарей и таблиц, которые могут получать не только новые строки, но и обновления, мы используем подход «замены» (т.е. каждый час загружаем всю таблицу).
  4. Как только почасовые данные собираются в корзине S3, мы используем табличную функцию ClickHouse s3 для импорта данных в базу данных ClickHouse. Табличная функция S3 масштабируется по репликам и отлично работает с большими объемами данных.
  5. Из корзины S3 данные вставляются в слой RAW в базе данных. Этот слой имеет ту же структуру таблицы, что и источники.
  6. После серии преобразований, выполняемых Airflow (включая соединения), данные из необработанных таблиц вставляются в таблицы MART — эти таблицы представляют бизнес-объекты и удовлетворяют потребности наших внутренних заинтересованных сторон.При выполнении преобразований используется множество временных таблиц. Фактически, большинство преобразованных результатов сначала записываются в промежуточную таблицу, а только потом вставляются в целевую таблицу. Хотя такой подход вносит некоторую сложность, он также дает нам необходимую гибкость повторного использования данных приращения. Это позволяет использовать одну часть приращения несколько раз без ее пересчета или повторного сканирования целевой таблицы. Промежуточные таблицы имеют уникальные имена для каждого запуска Airflow DAG (направленные ациклические графы).
  7. Наконец, инструмент Superset BI позволяет нашим внутренним пользователям запрашивать таблицы MART, а также строить диаграммы и информационные панели:
Пример панели управления Superset. Примечание: в целях иллюстрации представлены примерные данные с поддельными номерами.

Идемпотентность

Большинство таблиц, которые мы используем в ClickHouse, используют движки ReplicationReplacingMergeTree . Этот движок позволяет нам не заботиться о дубликатах в таблицах — записи с одинаковым ключом будут удалены, и сохранится только последняя запись. Это также означает, что мы можем вставлять данные за один конкретный час столько раз, сколько потребуется — сохранится только одна последняя версия каждой строки. Мы также используем функцию ClickHouse « FINAL », когда таблица используется в дальнейших преобразованиях для достижения согласованности, поэтому, например, функция sum()не вычисляет строку дважды.

В сочетании с заданиями/DAG Airflow, которые допускают многократное выполнение в течение одного и того же периода, наш конвейер полностью идемпотентен и может безопасно выполняться повторно, не приводя к дублированию. Более подробная информация о конструкции внутреннего воздушного потока будет представлена ​​ниже.

Последовательность

По умолчанию ClickHouse обеспечивает итоговую согласованность. Это означает, что если вы успешно запустите запрос на вставку, это не гарантирует, что новые данные будут во всех репликах ClickHouse. Этого достаточно для аналитики в реальном времени, но неприемлемо для сценария СХД. Представьте, например, что вы вставляете данные в промежуточную таблицу. Вставка успешно завершается, и ваш процесс ELT начинает выполнять следующий запрос, который читает из промежуточной таблицы… и вы получаете только частичные данные.

Однако ClickHouse предлагает другой режим для случаев использования, когда согласованность важнее, чем мгновенная доступность вставленных данных на первом узле. Чтобы гарантировать, что запрос на вставку не вернет «успех», пока все реплики не получат данные, мы запускаем все запросы на вставку с настройкой insert_quorum=3(у нас в кластере три узла). Мы не используем настройку «авто», потому что при выходе из строя одного узла (например, при выполнении обновления ClickHouse) два оставшихся узла все равно смогут принимать вставки. Как только перезапущенный узел станет доступен, вставленные данные в этом узле могут отсутствовать в течение некоторого времени. Поэтому для нас лучше получить ошибку ( Number of alive replicas (2) is less than requested quorum (3/3).. (TOO_FEW_LIVE_REPLICAS) при вставке данных менее чем в три реплики. Поскольку перезапуски из-за обновлений происходят довольно быстро, запросы обычно завершаются успешно, если Airflow повторяет попытку после ошибки.

Конечно, такой подход не гарантирует, что незафиксированные части из предыдущих неудачных вставок не будут видны запросу, но это не проблема, поскольку мы поддерживаем идемпотентность, как описано в предыдущей части. Другим решением было бы запустить все процессы ELT, используя только одну реплику, но это может ограничить производительность.

Проектирование внутренней инфраструктуры

Учитывая наш масштаб, нам нужно, чтобы наша инфраструктура СХД была простой, удобной в эксплуатации и легко масштабируемой. После запуска внутреннего PoC непосредственно на AWS EC2 мы перенесли все компоненты нашей инфраструктуры в Docker.

  • У нас есть отдельные машины для веб-сервера Airflow, рабочего процесса Airflow и Superset. Все компоненты упакованы в Docker-контейнеры.
  • На машинах Airflow мы дополнительно каждые 5 секунд запускаем контейнер, который синхронизирует репозиторий, содержащий код наших DAG, запросы ELT и некоторые файлы конфигурации, с папкой, расположенной на машинах.
  • Мы используем панели мониторинга и функции оповещений Superset, поэтому у нас есть планировщик и рабочие контейнеры для Superset.
  • Все компоненты Airflow и Superset синхронизируются через экземпляр Redis, который работает на отдельной машине. Redis хранит состояние выполнения заданий и рабочий код для Airflow, кэшированные результаты запросов для Superset и некоторую другую служебную информацию.
  • Мы используем AWS RDS для PostgreSQL в качестве внутренней базы данных для Airflow и Superset.
  • У нас есть две среды, работающие независимо друг от друга с собственными экземплярами ClickHouse Cloud, Airflow и Superset, установленными в разных регионах.
  • Хотя одна среда называется Preprod, а другая — Prod, мы сохраняем целостность Preprod, чтобы иметь возможность переключиться, если Prod недоступен.

Такая настройка позволяет нам безопасно и легко делать релизы:

  1. Разработчик создает ветку из ветки разработки или производства.
  2. Разработчик вносит изменения
  3. Разработчик создает PR в ветку Preprod
  4. После рассмотрения и утверждения запроса на запрос изменения передаются в экземпляр Preprod Airflow, где они тестируются.
  5. Как только изменения будут готовы к выпуску в рабочую версию, выполняется PR из Preprod в ветку Prod.

Внутренняя конструкция воздушного потока

Изначально мы думали о создании сложной системы DAG со множеством зависимостей. К сожалению, ни один из существующих вариантов механики зависимостей DAG не может работать с нужной архитектурой (что является довольно распространенной проблемой в Airflow):

  • Airflow не позволяет именам наборов данных изменяться при выполнении. Поэтому недавно представленные наборы данных не могут использовать временные имена. Если мы используем статическое имя набора данных, нисходящий DAG будет запущен только один раз для последнего приращения.
  • Триггеры могут работать на нас, но их использование слишком усложнит нашу настройку. Наличие 10–20 групп DAG с триггерами выглядит как кошмар зависимостей с операционной точки зрения.

Таким образом, мы получили следующую структуру:

  • Отдельные DAG для загрузки данных из источника данных в S3 (например, M3ter -> S3)
  • Одна огромная основная группа обеспечения доступности баз данных, которая выполняет все преобразования при доставке данных в S3.

Основное преимущество такого подхода заключается в том, что он сочетает в себе наличие всех необходимых зависимостей, четко перечисленных в основных задачах группы обеспечения доступности баз данных, и возможность создавать сущности, не связанные с отказавшим набором данных.

Безопасность

Поскольку наша внутренняя система хранения данных хранит конфиденциальные данные, включая личные данные и финансовую информацию, безопасность должна быть основой нашей архитектуры. Для этого мы реализовали некоторые основные правила и набор схем работы с СХД.

Основные правила

  • Разные данные должны быть доступны разным пользователям в соответствии с ролевой моделью компании, и это должно происходить автоматически.
  • Разделение разрешений должно выполняться на уровне базы данных (не на стороне BI!)
  • Ограничения доступа к сети должны быть представлены на всех уровнях (от использования Okta для инструмента BI до IP-фильтрации).

Выполнение

Мы используем группы Google для контроля разрешений внутренних пользователей. Это позволяет нам использовать существующие внутренние группы компании, а также позволяет владельцам групп (которыми может быть нетехнический человек, не интересующийся SQL) контролировать доступ к различным данным. Группы могут быть вложенными. Например:

Для сопоставления групп Google с точными разрешениями мы используем системную таблицу, которая соединяет:

  • Название группы Google
  • Имя базы данных
  • Имя таблицы
  • Массив столбцов
  • Фильтр (например, «где организация='clickhouse'»)
  • Тип доступа (ВЫБРАТЬ, ВСТАВИТЬ)

У нас также есть скрипт, который делает следующее:

  1. Получает рекурсивный список групп и пользователей.
  2. Создает (фактически заменяет) этих пользователей в базе данных с уникальным паролем.
  3. Создает роли, соответствующие группам Google.
  4. Назначает роли пользователям
  5. Предоставляет разрешения ролям в соответствии с таблицей разрешений с предложением «WITH REPLACE OPTION» — это удалит все остальные разрешения, которые по какой-то причине можно было сделать вручную.

На стороне Superset мы используем функцию DB_CONNECTION_MUTATOR для замены имени пользователя базы данных на пользователя Superset при отправке запроса в БД. У нас также включен Google Oauth в Superset. Это означает, что в DB_CONNECTION_MUTATOR у нас есть все необходимое для подключения Superset с нужным именем пользователя и паролем:

def DB_CONNECTION_MUTATOR(uri, params, username, security_manager, source):
    # Only enable mutator on clickhouse cloud endpoints
    if not uri.host.lower().endswith("clickhouse.cloud"):
        return uri, params
    user = security_manager.find_user(username=username)
    
    generated_username = str(user.email).split('@')[0] + '--' + str(user.username)
    uri.username = generated_username
    # Password generation logic - hidden in this example
    uri.password = ...
    return uri, params

Вышеупомянутое означает, что Superset использует уникальное имя пользователя базы данных для каждого пользователя с уникальным набором разрешений, которые контролируются группами Google.

Соответствие GDPR

Пользователи ClickHouse Cloud могут попросить нас удалить все их личные данные, включая имя, адрес электронной почты и другую информацию. Разумеется, в этом случае мы также удаляем эту информацию из СХД. Самое крутое здесь то, что нам не нужно запускать какие-либо обновления или удаления в таблицах ClickHouse. Поскольку наш движок оставляет только одну последнюю запись для каждого значения ключа, все, что нам нужно сделать, это вставить новую версию строки с данными удаленных пользователей. Для исчезновения старых строк потребуется несколько часов, но стандарты GDPR дают вам от 3 до 30 дней на удаление данных в зависимости от сценария. Итак, полный алгоритм:

  1. Найдите в одной из исходных систем специальный флаг, согласно которому этот идентификатор следует замаскировать/удалить.
  2. Выбрать все записи из таблицы с этим идентификатором
  3. Маскировать обязательные поля
  4. Вставьте данные обратно в таблицу
  5. Запустите команду «оптимизировать таблицу… окончательная», чтобы убедиться, что старые записи удалены с диска.
  6. Когда приходит новое почасовое приращение, мы выполняем объединение со списком удаленных идентификаторов. Это означает, что если по какой-либо причине информация PII пользователя еще не была полностью удалена, мы автоматически замаскируем эти данные.

Улучшения и планы на будущее

Хотя в целом мы удовлетворены нашей СХД, есть некоторые вещи, которые мы планируем изменить в ближайшие месяцы:

Третий логический уровень

Идея иметь только два логических слоя, к сожалению, не работает. Мы обнаружили, что для расчета действительно сложных метрик, которые можно заполнять обратно и которым нужны данные из более чем 5 источников данных, нам приходится создавать зависимости между различными витринами. Иногда это даже включает в себя рекурсивные зависимости. Чтобы решить эту проблему, нам нужно ввести промежуточный уровень, называемый хранилищем подробных данных или DDS. Он будет хранить некоторые внутренние бизнес-объекты, такие как учетная запись, организация, служба и т. д. Этот уровень не будет доступен для конечных пользователей, но он поможет нам удалить зависимости между витринами.

DBT

Airflow — хороший планировщик, но нам нужен инструмент, который позаботится о многих других вещах: при необходимости полная перезагрузка витрин данных, контроль качества, описание и документирование данных и другие. Для этого мы планируем интегрировать Airflow с DBT. Поскольку мы запускаем всю нашу инфраструктуру данных в контейнерах Docker, довольно легко создать отдельный контейнер DBT для наших нужд, который будет запускаться DAG Airflow.

Соглашения об именах

Хотя с самого начала мы знали, что должны следовать некоторым правилам в именах таблиц, полей и диаграмм, мы не вложили в это слишком много ресурсов. В результате у нас теперь довольно запутанные имена, которые не позволяют пользователям понять назначение конкретной таблицы или поля. Нам нужно внести ясность.

Ресурсы

ClickHouse — относительно молодая компания, поэтому наша команда DWH относительно небольшая и насчитывает всего 3 человека:

  • Data Engineer — строит и поддерживает инфраструктуру.
  • Аналитик продукта — помогает пользователям получать ценную информацию, строить диаграммы и понимать данные.
  • Руководитель группы — тратит только ~30 % времени на задачи СХД.

Что касается инфраструктуры, мы используем две среды с отдельными сервисами ClickHouse Cloud. У каждого сервиса есть 3 узла (они же реплики, но все реплики принимают запросы). Использование памяти для наших сервисов ClickHouse составляет ~200 Гб. Хотя мы не платим за эти услуги, поскольку являемся частью команды ClickHouse Cloud, мы изучили цены и производительность конкурентов и полагаем, что другая облачная аналитическая база данных в нашем случае будет намного дороже.

Кроме того, наша инфраструктура включает в себя 8 машин EC2 и корзину S3 с необработанными данными. В общей сложности эти услуги стоят около ~$1500 в месяц.

Общие результаты

Наша СХД работает уже не один год. У нас более 70 активных пользователей в месяц, сотни информационных панелей и тысячи диаграмм. Всего пользователи выполняют около 40 000 запросов в день. На этой диаграмме показано количество запросов в день с разбивкой по пользователям. Исключены пользователи системы и ELT:

Да, наши пользователи работают и по выходным.

Мы храним ~115 ТБ несжатых данных в ~150 таблицах, но из-за эффективного сжатия ClickHouse реальный размер хранимых данных составляет всего ~13 ТБ.

Неделя за неделей растет объем данных в нашем СХД. Февральский всплеск представляет собой внутренний эксперимент, требующий дублирования всех данных.

Краткое содержание

За год мы развернули СХД на основе технологии с открытым исходным кодом, которая обеспечивает удобство, которое нравится нашим пользователям. Хотя наш СХД упрощает работу с данными, мы также видим множество улучшений и изменений, которые нам необходимо внести, чтобы двигаться вперед. Мы считаем, что использование ClickHouse Cloud доказывает, что его можно использовать для создания надежного хранилища данных.


Подписывайтесь на телеграм канал Apache Superset: https://t.me/apache_superset_bi