Golang
November 17, 2024

Don't Use Synchronization Primitives In Golang Instead Build Them Yourself

DISCLAIMER

This article is for Educational Purposes Only!

I don't recommend using your own implementation for sync package's structure in production.

The article's goal is to strengthen the understanding of sync package's structures and how you can build them by yourself using chan .

Synchronization Primitives

Synchronization primitives are used to control the behavior of the application in a multithreaded processing state.

In this article we will cover 3 structures from the sync package.

  • Mutex
  • RWMutex
  • WaitGroup

And for each structure, we will have

  • Example: Divided into 2 sections
    • Bug in a code block due to lack of synchronization primitive.
    • Solution using synchronization primitive.
  • Signature: Methods of that synchronization primitive.
  • Implementation: Coding synchronization primitive from scratch using chan

Mutex โ›“๏ธ

Mutex used to control changes on some variable

Example โœ๏ธ

import (
	"net/http"
	"strconv"
)

func main() {
	var counter int

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		counter++
		_, _ = w.Write([]byte(strconv.Itoa(counter)))
	})
	
	_ = http.ListenAndServe(":8080", nil)
}

The code above looks innocent, but it has a severe bug!

Upon handling many requests, we will notice that

Amount of requests processed != Counter variable value

The reason behind that is due to 2 factors.

  • Lack of control over the counter variable's change
  • HTTP handle function handles requests in parallel (i.e. in different goroutine)

So as a solution, we can add Mutex.

import (
	"net/http"
	"strconv"
	"sync"
)

func main() {
	var (
		mutex   sync.Mutex
		counter int
	)

	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		mutex.Lock()
		defer mutex.Unlock()

		counter++
		_, _ = w.Write([]byte(strconv.Itoa(counter)))
	})

	_ = http.ListenAndServe(":8080", nil)
}

Signature ๐Ÿ“•

Mutex structure has the following signature

type Mutex struct {}

func NewMutex() *Mutex {}

func (m *Mutex) Lock() {}

func (m *Mutex) Unlock() {}

Implementation โœ…

Implementation for sync.Mutex

type Mutex struct {
   channel chan struct{}
}

func NewMutex() *Mutex {
   return &Mutex{
      // channel with buffer == 1 to allow a single Lock at a time
      channel: make(chan struct{}, 1), 
   }
}

func (m *Mutex) Lock() {
   m.channel <- struct{}{}
}

func (m *Mutex) Unlock() {
	select {
	case <-m.channel: 
	default:
		panic("unlock of unlocked Mutex")
	}
}

RW Mutex โ›“๏ธ

RWMutex used to control changes on some variable while allowing for concurrent reads.

Example โœ๏ธ

type cache struct {
	storage map[string]interface{}
}

func NewCache() *cache {
	return &cache{storage: make(map[string]interface{})}
}

func (c *cache) Get(key string) interface{} {
	return c.storage[key]
}

func (c *cache) Set(key string, value interface{}) {
	c.storage[key] = value
}

The code above has the same issue with the Mutex example, The cache.storage isn't protected from concurrent write access.

But here we can allow concurrent reads from the cache since this doesn't violate data consistency.

So as a solution, we can add RWMutex.

import (
	"sync"
)

type cache struct {
	mutex   sync.RWMutex
	storage map[string]interface{}
}

func NewCache() *cache {
	return &cache{storage: make(map[string]interface{})}
}

func (c *cache) Get(key string) interface{} {
	c.mutex.RLock()
	defer c.mutex.RUnlock()
	return c.storage[key]
}

func (c *cache) Set(key string, value interface{}) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	c.storage[key] = value
}

Signature ๐Ÿ“•

RWMutex structure has the following signature

type RWMutex struct {}

func New() *RWMutex {}

func (rw *RWMutex) RLock() {}

func (rw *RWMutex) RUnlock() {}

func (rw *RWMutex) Lock() {}

func (rw *RWMutex) Unlock() {}

Implementation โœ…

Implementation for sync.RWMutex

type RWMutex struct {
	counter int // amount of readLocks taken

	readChan  chan struct{} // synchronization channel for readers
	writeChan chan struct{} // synchronization channel for writers

	mutex Mutex // protects `counter` variable
}

func New() *RWMutex {
	return &RWMutex{
		readChan:  make(chan struct{}, 1),
		writeChan: make(chan struct{}, 1),
		mutex: NewMutex(),
	}
}

func (rw *RWMutex) RLock() {
	rw.mutex.Lock()
	defer rw.mutex.Unlock()

	// If that's the first lock of readers
	// then acquire read lock
	if rw.counter == 0 {
		rw.readChan <- struct{}{}
	}
	
	rw.counter++ // Increment reader's count
}

func (rw *RWMutex) RUnlock() {
	rw.mutex.Lock()
	defer rw.mutex.Unlock()

	// If `counter` == 0 that means `RUnlock` called before `RLock`
	if rw.counter == 0 {
		panic("unlock of unlocked RWMutex")
	}
	
	rw.counter-- // Decrement reader's count
	
	// If readers == 0 then release the reader lock
	if rw.counter == 0 {
		<-rw.readChan
	}
}

func (rw *RWMutex) Lock() {
	rw.writeChan <- struct{}{} // Acquire write lock
	rw.readChan <- struct{}{} // Acquire read lock

	// Notice that if `counter` > 0 then `Lock` will block
	// until all readers are unlocked, i.e wait for all `RLock` to `Unlock`
}

func (rw *RWMutex) Unlock() {
	select {
	// Read from `writeChan` should be instant since `Lock` was called
	case <-rw.writeChan: 
	default: // Otherwise that means `Unlock` was called before `Lock`
		panic("unlock of unlocked RWMutex")
	}
	<-rw.readChan
}

Wait Group โ›“๏ธ

WaitGroup used to notify goroutine about execution of other processes.

Example โœ๏ธ

import (
	"math/rand/v2"
	"time"
)

func main() {
	for range 100 {
		go func() {
			// simulate work
			time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
		}()
	}
}

In the code above we spawned 100 goroutines to do some process.

But our application might finish without waiting for all work to be done, which might lead to unexpected behaviour.

So as a solution we can use WaitGroup

import (
	"math/rand/v2"
	"sync"
	"time"
)

func main() {
	wg := sync.WaitGroup{}

	for range 100 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			// simulate work
			time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
		}()
	}

	wg.Wait()
}

Signature ๐Ÿ“•

RWMutex structure has the following signature

type WaitGroup struct {}

func New() *WaitGroup {}

func (wg *WaitGroup) Add(delta int) {}

func (wg *WaitGroup) Done() {}

func (wg *WaitGroup) Wait() {}

Implementation โœ…

Implementation for sync.WaitGroup

type WaitGroup struct {
	counter int
	channel chan struct{}
	mutex   Mutex // protects `counter` variable
}

func New() *WaitGroup {
	return &WaitGroup{
		// Channel has buffer == 1 to allow `Wait` instant exectution
		// when `counter` == 0
		channel: make(chan struct{}, 1),
		mutex: Mutex{channel: make(chan struct{}, 1)},
	}
}

func (wg *WaitGroup) Add(delta int) {
	wg.mutex.Lock()
	defer wg.mutex.Unlock()

	// If `counter` will increase and be postive from zero
	// then acquire the lock for change so that `Wait` blocks
	if wg.counter == 0 && delta > 0 {
		wg.channel <- struct{}{}
	}

	wg.counter += delta // Update counter value

	// If `counter` is negative then `Done` was called 
	// more than the delta added by `Add`
	if wg.counter < 0 {
		panic("negative WaitGroup counter")
	}

	// If counter returned back to zero then release acquired
	// lock to unblock all `Wait` hanging if any
	if wg.counter == 0 {
		<-wg.channel
	}
}

func (wg *WaitGroup) Done() {
	wg.Add(-1) // Decrease counter by 1
}

func (wg *WaitGroup) Wait() {
    // If wg.channel is empty we can then make a single write => read
    // instantly, i.e. `counter` == 0
	wg.channel <- struct{}{}
	<-wg.channel
}

If you made it that far, I wish you learned something new ๐Ÿ’ซ

If so then stay tuned for more ๐Ÿš€โค๏ธโ€๐Ÿ”ฅ