Java
February 26, 2023

Prostore — простой Event sourcing + CQRS фреймворк

На данный момент на рынке мало Event sourcing + CQRS фреймворков. А те, что есть, непопулярны и имеют слабую поддержку, поэтому многие создают свои in-house решения. В этой статье расскажу о выполненном командой «Programming Store» проекте Prostore, который может послужить примером при создании вашего собственного решения.

Event Sourcing

Event Sourcing — это архитектурный шаблон, подход для хранения данных в виде событий. В традиционном подходе мы храним конечное состояние. Event Sourcing — это кардинально иной подход. Вместо хранения конечного состояния, мы храним все промежуточные состояния в виде событий. Конечное состояние получаем последовательным применением всех промежуточных состояний. Хранилище данных реализуется как неизменяемый журнал только для добавления, и как правило, именуется отдельно как Event Store.

CQRS

CQRS расшифровывается как Command and Query Responsibility Segregation. Это шаблон, который разделяет операции чтения (Query) и обновления (Command) для хранилища данных. Разделяя операции на Command и Query мы можем максимизировать производительность, масштабируемость и безопасность наших сервисов. Также это увеличивает гибкость всей системы. Для Command и Query сервисов мы можем выбирать разные фреймворки, языки программирования, базы данных. Например, мы могли бы хранить/обновлять данные в реляционной БД, а делать запросы в графовой БД.

Event Sourcing + CQRS

Как было описано выше, Event Sourcing в качестве хранилища данных использует структуру типа «журнал». Мы можем только добавлять события в конец, а изменять и удалять события не можем. Тут важно подчеркнуть, мы можем хранить в Event Store только события, другие структуры шаблоном не предусмотрены. События Event Sourcing — это события предметной области, которые уже произошли в какой-то момент в прошлом. Например:

class OrderItemAddedEvent {
    String orderId;
    String userId;
    String itemId;
    long timestamp;
}

Таких событий в сложной предметной области может быть много. Это накладывает ограничения на структуру хранения и производительность. Для решения данной проблемы Event Sourcing применяется вместе с CQRS. Конечное решение может выглядеть так:

Тут:

  • Client — клиент наших сервисов (пользовательский интерфейс, внешний/внутренний сервис).
  • Commands — команды в определении CQRS, операции, результат которых приводит к созданию событий в Event Store.
  • Command Service A — сервис или группа сервисов, которые обслуживают команды определенных типов. Например, сервис заказов.
  • Command Service B — другой сервис команд.
  • Event Store — хранилище событий. Например, MongoDB, PostgreSQL, Apache Cassandra, Redis. Главная характеристика при выборе Event Store — это производительность добавления записи.
  • Events — события.
  • Event Bus — шина событий. Например, Apache Kafka, Redis, RabbitMQ.
  • Query — команда - запрос на получение агрегированных данных.
  • Query Service A — сервис или группа сервисов, которые обслуживают запросы определенных типов. Например, история заказов.
  • Query Service B — другой сервис запросов.
  • Query Storage — хранилище агрегированных данных, оптимальных для запросов. Тут для двух сервисов использован один Query Storage, но вы можете для каждого сервиса запросов использовать свой storage. Например, для одного Mongodb базу, для другого PostgreSQL.

Это только один из вариантов реализации, и на мой взгляд, наиболее близкий к «каноническому». Для простоты схемы я не стал добавлять сюда Service Discovery, хотя он на практике обязателен (клиент должен знать какие command/query сервисы ему доступны).

Преимущества и применения

Простая структура типа «журнал» дает нам некоторые преимущества:

  • Низкие требования к транзакционной логике. Операции, проводимые с базой, атомарные и неизменяемые (добавление в конец).
  • Высокая вертикальная производительность.
  • Простое масштабирование (шардирование).
  • Большой выбор баз данных (key-value, NoSQL, SQL).
  • Возможность восстановить состояние из любой точки в истории.
  • Быстрые изменения логики, связанной с базой (нет схемы и не нужно делать миграции).

Так как мы используем события, мы также получаем преимущества событийно-ориентированной архитектуры:

  • Слабая связность.
  • Простота в проектировании доменной области.

Возможные применения:

  • Аудит, журналирование.
  • Логистика.
  • Денежные и иные транзакции с высоким требованием к согласованности данных.
  • Способ хранения для «state machine».
  • Системы, проектируемые с ориентиром на слабую связность.

Для данных случаев подход Event Sourcing отлично показывает себя на практике (Netflix, Walmart), в особенности, когда мы имеем дело с большими данными.

Недостатки

Так как нам нужно хранить все события доменной области, Event Sourcing требует большего объема дискового пространства в сравнении с традиционными подходами.

Как было сказано выше, другие недостатки Event Sourcing решаются применением CQRS. CQRS архитектурно решает проблему масштабируемости и производительности сложных запросов. К сожалению, требования к объему хранимых данных нельзя решить только лишь архитектурным подходом. Это сужает применимость данного шаблона. При выборе Event Sourcing для вашего проекта, следует изучить этот недостаток, сделать расчеты. Так же я бы не советовал применять данный подход как прямую замену традиционного подхода, поскольку он им не является. Я видел проект, где данный подход был применен для всего проекта, для CRUD логики, и это было ужасно. Так как Event Sourcing требует хранения всех событий доменной области, были события вроде UserEmailChanged. И так практически для каждого поля.

К самым главным недостаткам я бы отнес нишевость, и как следствие непопулярность. На рынке не так много готовых решений. Платные решения могут стоить дорого, а решения с открытым исходным кодом имеют небольшое сообщество. И как следствие популярны in-house решения. Вам придется сделать выбор: платить дорого, выбрав платное решение, или заплатить временем, выбрав решение с открытым исходным, либо все же сделать свое решение. Конечно, не зная вашего случая, мне трудно давать какие-то практические советы. Далее я хотел бы показать пример того, как можно сделать собственное решение.

Решение

Ссылка на код решения https://github.com/ProgrammingStore/prostore. Проект реализован на Spring Boot 2.7.5. Архитектурно решение выглядит как на ранее предложенной схеме.

Требования:

  • Java >= 17
  • Apache maven >= 3.8
  • Mongodb >= 6.0.1
  • Kafka >= 2.13-3.3.1

Состав проекта:

  • prostore-core — ядро, основные абстракции;
  • prostore-eureka — подключаемый модуль, имплементация CommandBus/QueryBus на основе Spring Cloud Eureka;
  • prostore-mongo — подключаемый модуль, имплементация EventStore на Mongodb;
  • prostore-kafka — подключаемый модуль, имплементация EventBus на основе Apache Kafka;
  • prostore-test-common — общая библиотека для тестовых проектов;
  • prostore-test-service — тестовый проект «сервис». Это комбинированный сервис для команд и запросов;
  • prostore-test-client — тестовый проект «клиент»;
  • spring-boot-prostore-starter — основной стартер;
  • spring-boot-prostore-eureka-starter — стартер для eureka;
  • spring-boot-prostore-mongo-starter — стартер для mongo;
  • spring-boot-prostore-kafka-starter — стартер для kafka;
  • prostore-eureka-server — eureka server.

При создании собственного решения Event sourcing важно обратить внимание на гибкость. Не стоит внедрять жесткие зависимости. Возможно, для каких-то случаев вам понадобится заменить базу, например на SQL.

В данном проекте был выбран Mongodb, как наиболее оптимальный для хранения event-ов. Другие возможные варианты:

  • Apache Cassandra;
  • Redis;
  • PostgreSQL/MySQL.

Главная характеристика при выборе Event Store — это производительность добавления записи, а также простота шардинга.

Apache Kafka был выбран в качестве Event Bus. Требования к Event Bus зависят от выбранной архитектуры и реализации. Также в некоторых реализациях Event Store может служить как Event Bus. В текущей реализации задача Event Bus — это гарантированная доставка событий.

Spring Cloud Eureka используется как имплементация CommandBus/QueryBus.

Определения основных абстракций:

  • Aggregate — агрегат состояний. Это то самое конечное состояние, которое получаем, применяя последовательно все состояния. Имеет уникальный id. Например:
@Data
class Order implements Aggregate {
    String id;
    String userId;
    List<OrderItem> orderItems;
}
  • AggregateEvent — событие, произошедшее в контексте агрегата. Например, событие OrderItemAddedEvent происходит в контексте агрегата Order.
  • Cache — для кэширования агрегатов. Постоянно строить агрегаты, применяя все события — дорого, поэтому мы используем кэш.
  • Command — команда, маркерный интерфейс. Пример:
public class CreateOrderCommand implements Command {
    private String userId;
    private String storeId;
}
  • CommandBus — шина для отправки команд.
  • CommandHandler — обработчик команд.
  • Event — событие, маркерный интерфейс.
  • EventBus — шина для отправки событий.
  • EventHandler — обработчик событий.
  • EventStore — хранилище событий.
  • Query — запрос, маркерный интерфейс. Пример:
public class GetOrderByIdQuery implements Query {
    private String aggregateId;
}
  • QueryBus — шина для отправки запросов.
  • QueryHandler — обработчик запросов.

Важно понимать разницу между CommandHandler и EventHandler. CommandHandler — это обработчик команд. CommandBus отправляет команду только одному инстансу. При неудачной попытке отправки клиент должен повторно отправить команду. В обработчике, получив команду, мы должны сгенерировать соответствующее команде событие или события, сохранить их в EventStore и отправить в EventBus. Пример:

@Component
class CreateShipmentCommandHandler implements CommandHandler<CreateShipmentCommand> {
    private final Logger logger = LoggerFactory.getLogger(CreateShipmentCommandHandler.class);

    private final EventStore eventStore;
    private final EventBus eventBus;

    private CreateShipmentCommandHandler(EventStore eventStore, EventBus eventBus) {
        this.eventStore = eventStore;
        this.eventBus = eventBus;
    }

    @Override
    public String handle(CreateShipmentCommand command) {
        logger.debug("command = {}", command);
        ShipmentCreatedEvent event = new ShipmentCreatedEvent(
            UUID.randomUUID().toString(), command.getDestination(), command.getLocation()
        );
        eventStore.save(event);
        eventBus.publish(event);
        return String.format("created: %s", event.getAggregateId());
    }
}

EventHandler — это обработчик событий. EventBus отправляет события всем инстансам. Переотправку событий после неудачной попытки должен взять на себя EventBus. В обработчике EventHandler мы можем обновить состояние агрегата (если это AggregateEvent), отправить новую команду. Пример:

@Component
class ShipmentMovedEventHandler implements EventHandler<ShipmentMovedEvent> {
    private final Logger logger = LoggerFactory.getLogger(ShipmentMovedEventHandler.class);

    private final EventStore eventStore;

    private ShipmentMovedEventHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @Override
    public void handle(ShipmentMovedEvent event) {
        logger.debug("Got event: {}", event);
        Shipment shipment = eventStore.get(event.getAggregateId());
        shipment.setLocation(event.getLocation());
    }
}

Запуск тестового проекта:

  • Запустите локально mongodb;
  • Запустите локально kafka;
  • mvn clean install;
  • mvn spring-boot:run -f prostore-eureka-server;
  • PORT=9000 mvn spring-boot:run -f prostore-test-client;
  • PORT=9001 mvn spring-boot:run -f prostore-test-service;
  • PORT=9002 mvn spring-boot:run -f prostore-test-service.

Тут мы запустили сервер Eureka, тестовый клиент на порту 9000, два инстанса тестовых сервисов на портах 9001 и 9002.

Теперь мы можем приступить к тестированию. Тестируем через отправку запросов на тестовый сервис:

  • Создание shipment (в ответ получите aggregateId):
curl -v -H "Content-Type: application/json" \
     -d '{"destination":"Moscow", "location": "Almaty"}' \
     http://localhost:9000/shipment
  • Получение shipment (вместо AGGREGATE_ID вставьте aggregateId полученный командой создания):
curl -v -X GET -H "Content-Type: application/json" \
     -d '{"aggregateId": "AGGREGATE_ID"}' \
     http://localhost:9000/shipment

Изменение shipment (вместо AGGREGATE_ID вставьте aggregateId полученный командой создания):

curl -v -H "Content-Type: application/json" \
     -d '{"aggregateId":"AGGREGATE_ID", "location": "Sydney"}' \
     http://localhost:9000/shipment/move
  • Если запущены несколько инстансов `prostore-test-service`, запросы будут балансироваться по round-robin.
  • Попробуйте перезапустить сервисы. При старте сервиса будет запущен replay event-ов для восстановления агрегатов.

Все события будут добавляться в коллекцию MongoEvent (база test по умолчанию):

{
    "_id" : ObjectId("63a3f2d70ac51a617297cda5"),
    "aggregateId" : "7a97ca5a-3375-4be1-bbe2-83ae7b40614d",
    "eventType" : "ru.programstore.prostore.test.common.event.ShipmentCreatedEvent",
    "eventJson" : "{\"aggregateId\":\"7a97ca5a-3375-4be1-bbe2-83ae7b40614d\",\"destination\":\"Moscow\",\"location\":\"Almaty\"}",
    "timestamp" : NumberLong(2075100266365871),
    "_class" : "ru.programstore.prostore.mongo.MongoEvent"
}

MongoEvent представлен модулем mongodb и не требует каких-то действий со стороны разработчика.

Тут если сравнивать с традиционным подходом, нет отдельных коллекций/таблиц для сущностей доменной области. Структурно это можно представить как поток данных, которые делятся по aggregateId. При традиционном подходе мы бы сохранили сущность Shipment в таблице/коллекции и затем меняли бы состояние Shipment по id. В Event Sourcing мы сохраняем событие ShipmentCreatedEvent с каким-то начальным состоянием, задаем/получаем aggregateId. И далее можем генерировать любые события по этому aggregateId. Например, ShipmentMovedEvent. При добавлении/изменении полей нам не нужно менять схему. Мы меняем поля событий, либо добавляем новые типы событий.

Источник