Backend
October 12, 2023

Обработка событий с использованием Spring Cloud Stream

События очень важны в программировании. Когда происходит какое-то действие в программе - это событие. Например, пользователь нажал на кнопку, пришло новое сообщение, загрузился новый файл и т.д.

Чтобы программа правильно работала, она должна обрабатывать разные события и выполнять нужные действия. Например, когда пользователь нажимает "Отправить", программа должна взять введенный текст и действительно отправить его.

События помогают спроектировать архитектуру программы. Можно разделить программу на части, которые будут независимо обрабатывать свои события. Это удобно и позволяет программе легко масштабироваться.

Что такое Spring Cloud Stream?

Spring Cloud Stream - это инструмент в популярном фреймворке Spring, который помогает работать с событиями.

Он предоставляет специальные компоненты для публикации и получения событий через так называемые message broker'и - посредники сообщений. Популярные message broker'и - это RabbitMQ и Kafka.

Почему Spring Cloud Stream удобен:

  • Не надо писать сложный код для работы с message broker'ами. Достаточно просто использовать компоненты из Spring Cloud Stream.
  • Можно легко масштабировать обработку событий, запустив больше экземпляров программы.
  • Можно быстро переключиться с одного message broker'а на другой, например с RabbitMQ на Kafka.

Установка и настройка Spring Cloud Stream

Чтобы использовать Spring Cloud Stream, нужно подключить соответствующую библиотеку в проект.

Для Maven:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>

Для Gradle:

implementation 'org.springframework.cloud:spring-cloud-stream'

Также нужно выбрать и настроить message broker. Например, для RabbitMQ можно использовать готовый Docker контейнер:

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

В application.properties нужно указать доступ к RabbitMQ:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

Создание простого приложения

Давай создадим простое приложение на Spring Cloud Stream, которое будет публиковать событие "Message" с текстом сообщения.

Сначала добавим зависимости в pom.xml:

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
  </dependency>  
</dependencies>

Теперь создадим простой bean, который будет публиковать события:

@EnableBinding(Source.class)
public class MessagePublisher {

  @Autowired
  private Source source;
  
  public void publish(String message) {
    source.output().send(MessageBuilder.withPayload(message).build());
  }
  
}

Вся магия в интерфейсе Source, который предоставляет Spring Cloud Stream. У него есть метод output() для публикации событий в message broker.

Мы можем использовать этот bean в любом месте программы, чтобы опубликовать событие:

@SpringBootApplication
public class DemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(DemoApplication.class, args);
    
    MessagePublisher publisher = context.getBean(MessagePublisher.class);
    publisher.publish("Hello world!");
  }

}

Обработка событий

А как обработать опубликованные события? Для этого нужно создать другой bean и использовать интерфейс Sink:

@EnableBinding(Sink.class)
public class MessageHandler {

    @StreamListener("input")
    public void handle(Message<String> message) {
        System.out.println("Got message: " + message.getPayload());
    }

}

Метод handle будет вызываться автоматически при получении нового события "Message".

Мы можем обрабатывать событие как нам нужно, например сохранять в БД или выполнять какую-то бизнес-логику.

Продвинутые возможности

В Spring Cloud Stream есть много полезных возможностей:

  • Поддержка партиций и группировки событий, например по id пользователя. Это позволяет обрабатывать связанные события вместе.
  • Автоматическое масштабирование для балансировки нагрузки. Можно запустить много экземпляров обработчика событий.
  • Ретрай политики для повторной обработки событий при ошибках.
  • Мониторинг и метрики работы приложения. Можно посмотреть сколько событий обработано, скорость, ошибки и т.д.
  • И еще много всего, можно почитать в документации!

Например, можно настроить политику повторов событий при ошибке:

spring.cloud.stream.bindings.input.consumer.max-attempts=3

Заключение

В общем, Spring Cloud Stream - очень удобный инструмент для работы с событиями в Spring приложениях. Он избавляет от сложностей ручной настройки message broker'ов и позволяет сосредоточиться на бизнес-логике.

Масштабируемость и гибкость Spring Cloud Stream отлично подходит для современных приложений с растущей нагрузкой.