golang
April 19, 2023

🌱 DP: Publisher-Subscriber

In distributed systems, it is often necessary to establish an indirect and permanent connection between a set of microservices. For example, we may want to read messages from an SQS queue and publish processed messages to several SNS topics. Thus, one microservice can act as an orchestrator over the other processes.


To implement such a scheme, the Publish-Subscriber pattern can be used, which can be considered as a logical extension of the Observer pattern. There are many examples of implementing Publish-Subscriber based on Observer. However, developers often supplement the classic Observer pattern with a polling method for messages and use it as Publish-Subscriber, which is not entirely correct.

Instead, in Golang, control goroutines can be used to manage subprocesses, which makes the interaction between them more flexible and secure. Implementing interaction based on channels may require some effort, but it is worth it. In this article, I implemented this pattern and want to demonstrate some points to use it in further development.


Participants

Publisher

The Publisher should store information about the Subscribers to whom incoming events are planned to be distributed. However, unlike the Observer pattern, each Publisher is represented by an independent Goroutine. Thus, we must consider some possible situations and strategies for their processing:

  • Removing a Subscriber should be accompanied by closing the channel in the Publisher's Goroutine;
  • The termination of the Publisher's work should be accompanied by closing the channels of all Subscribers;
  • The Publisher's Goroutine cannot access the Subscriber after the channel is closed to prevent a race condition.

Note that the Publisher interface is very similar to the Subject interface, with the exception that we have added two additional methods here.

type Publisher[Event any, Subscriber any] interface {
	Attach(subscriber *Subscriber) error
	Detach(subscriber *Subscriber) error
	Publish(event *Event) error
	Start()
	Stop()
}

As you might guess, the Start method starts the work of the Publisher, and the Stop method should terminate the work of the Publisher. The other methods should perform the same functional operations as in the case of the Observer pattern. However, their internal implementation is significantly different, as we will explain below.

Subscriber

The Subscriber defines a strategy for updating and processing events. The Subscriber interface repeats the Observer interface.

type Subscriber[Event any] interface {
	Update(event *Event) error
	GetState()
}

As with the Observer pattern, the Subscriber can use the ChangeManager functionality to convert an incoming event into an object state. We will not focus on this further, as the ChangeManager is identical and repeats the functionality described in the article on the Observer pattern.

Implementation

Let's consider the implementation of the Publisher.

type publisher struct {
	chanAttacher  chan reposubscriber.Subscriber[event.Event]
	chanDetacher  chan reposubscriber.Subscriber[event.Event]
	chanPublisher chan event.Event
	chanStop      chan bool
	subscribers   []reposubscriber.Subscriber[event.Event]
}

As you can see, we use a whole set of channels in the Publisher implementation, intended for concurrent attaching, detaching of Subscribers, and event publication. Also, one channel is intended to process the signal to stop the Publisher's work.

  • Attaching a Subscriber, carried out using the Attach method, involves the chanAttacher channel;
func (publisher *publisher) Attach(newSubscriber *reposubscriber.Subscriber[event.Event]) error {
	...
	publisher.chanAttacher <- *newSubscriber
	...
}
  • Detaching a Subscriber from the Publisher, carried out using the Detach method, involves the chanDetacher channel;
func (publisher *publisher) Detach(newSubscriber *reposubscriber.Subscriber[event.Event]) error {
	...
	publisher.chanDetacher <- *newSubscriber
	...
}
  • Publishing a new event, carried out using the Publish method, involves the chanPublisher channel;
func (publisher *publisher) Publish(newEvent *event.Event) error {
	...
	publisher.chanPublisher <- *newEvent
	...
}

The Start method, which is launched as a separate control Goroutine, manages the process of processing signals in the channels.

func (publisher *publisher) Start() {
	for {
		select {
		case event := <-publisher.chanPublisher:
			for _, subscriber := range publisher.subscribers {
				subscriber.Update(&event)
				subscriber.GetState()
			}
		case subscriber := <-publisher.chanAttacher:
			publisher.subscribers = append(publisher.subscribers, subscriber)
		case newSubscriber := <-publisher.chanDetacher:
			for i, subscriber := range publisher.subscribers {
				if subscriber == newSubscriber {
					publisher.subscribers = append(publisher.subscribers[:i], publisher.subscribers[i+1:]...)
					continue
				}
			}
		case <-publisher.chanStop:
			close(publisher.chanPublisher)
			close(publisher.chanAttacher)
			return
		}
	}
}

Note that the presence of one control Goroutine ensures the separation of shared memory through subprocess communication.

Example of usage

The creation of ChangeManager and Subscriber objects is done in a similar way to the implementation of the Observer pattern.

changemanager, err := changemanager.New()
...
subscriber, err := subscriber.New("Subscriber", &changemanager)

After that, we need to create a Publisher object and start the controlling Goroutine.

publisher, err := publisher.New()
...
go publisher.Start()

Attachng/detaching a Subscriber to the Publisher, publishing messages, and stopping are performed afterwards.

publisher.Attach(&subscriber)
...
publisher.Publish(&event.Event{
	Id:      uuid.New(),
	Message: fmt.Sprintf("Update %d", i),
})
...
publisher.Detach(&subscriber_2)
...
publisher.Stop()

Repository

Below is a repository containing code with an example of using the Publish-Subscriber pattern that you can study.

🍃 GO-TO-REPO! 🍃

If you have downloaded the repository and want to see how the template works, simply enter the following command in the root directory of the repository.

make example-run
An example of how the Publisher-Subscriber pattern works

Content

  1. Erich Gamma, Richard Helm, Ralph Johnson, John Vlissides. Design Patterns. Elements of Reusable Object-Oriented Software. Addison-Wesley. 2022.
  2. Sam Newman. Building Microservices. O'Relly. 2016.
  3. Mario Castro Contreras. Go Design Patterns. Packt. 2017.