Don't Use Synchronization Primitives In Golang Instead Build Them Yourself
This article is for Educational Purposes Only!
I don't recommend using your own implementation for sync package's structure in production.
The article's goal is to strengthen the understanding of sync package's structures and how you can build them by yourself using chan .
Synchronization Primitives
Synchronization primitives are used to control the behavior of the application in a multithreaded processing state.
In this article we will cover 3 structures from the sync package.
And for each structure, we will have
- Example: Divided into 2 sections
- Bug in a code block due to lack of synchronization primitive.
- Solution using synchronization primitive.
- Signature: Methods of that synchronization primitive.
- Implementation: Coding synchronization primitive from scratch using
chan
Mutex โ๏ธ
Mutex used to control changes on some variable
Example โ๏ธ
import (
"net/http"
"strconv"
)
func main() {
var counter int
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
counter++
_, _ = w.Write([]byte(strconv.Itoa(counter)))
})
_ = http.ListenAndServe(":8080", nil)
}The code above looks innocent, but it has a severe bug!
Upon handling many requests, we will notice that
Amount of requests processed != Counter variable valueThe reason behind that is due to 2 factors.
- Lack of control over the counter variable's change
- HTTP handle function handles requests in parallel (i.e. in different goroutine)
So as a solution, we can add Mutex.
import (
"net/http"
"strconv"
"sync"
)
func main() {
var (
mutex sync.Mutex
counter int
)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
mutex.Lock()
defer mutex.Unlock()
counter++
_, _ = w.Write([]byte(strconv.Itoa(counter)))
})
_ = http.ListenAndServe(":8080", nil)
}Signature ๐
Mutex structure has the following signature
type Mutex struct {}
func NewMutex() *Mutex {}
func (m *Mutex) Lock() {}
func (m *Mutex) Unlock() {}Implementation โ
type Mutex struct {
channel chan struct{}
}
func NewMutex() *Mutex {
return &Mutex{
// channel with buffer == 1 to allow a single Lock at a time
channel: make(chan struct{}, 1),
}
}
func (m *Mutex) Lock() {
m.channel <- struct{}{}
}
func (m *Mutex) Unlock() {
select {
case <-m.channel:
default:
panic("unlock of unlocked Mutex")
}
}RW Mutex โ๏ธ
RWMutex used to control changes on some variable while allowing for concurrent reads.
Example โ๏ธ
type cache struct {
storage map[string]interface{}
}
func NewCache() *cache {
return &cache{storage: make(map[string]interface{})}
}
func (c *cache) Get(key string) interface{} {
return c.storage[key]
}
func (c *cache) Set(key string, value interface{}) {
c.storage[key] = value
}The code above has the same issue with the Mutex example, The cache.storage isn't protected from concurrent write access.
But here we can allow concurrent reads from the cache since this doesn't violate data consistency.
So as a solution, we can add RWMutex.
import (
"sync"
)
type cache struct {
mutex sync.RWMutex
storage map[string]interface{}
}
func NewCache() *cache {
return &cache{storage: make(map[string]interface{})}
}
func (c *cache) Get(key string) interface{} {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.storage[key]
}
func (c *cache) Set(key string, value interface{}) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.storage[key] = value
}Signature ๐
RWMutex structure has the following signature
type RWMutex struct {}
func New() *RWMutex {}
func (rw *RWMutex) RLock() {}
func (rw *RWMutex) RUnlock() {}
func (rw *RWMutex) Lock() {}
func (rw *RWMutex) Unlock() {}Implementation โ
Implementation for sync.RWMutex
type RWMutex struct {
counter int // amount of readLocks taken
readChan chan struct{} // synchronization channel for readers
writeChan chan struct{} // synchronization channel for writers
mutex Mutex // protects `counter` variable
}
func New() *RWMutex {
return &RWMutex{
readChan: make(chan struct{}, 1),
writeChan: make(chan struct{}, 1),
mutex: NewMutex(),
}
}
func (rw *RWMutex) RLock() {
rw.mutex.Lock()
defer rw.mutex.Unlock()
// If that's the first lock of readers
// then acquire read lock
if rw.counter == 0 {
rw.readChan <- struct{}{}
}
rw.counter++ // Increment reader's count
}
func (rw *RWMutex) RUnlock() {
rw.mutex.Lock()
defer rw.mutex.Unlock()
// If `counter` == 0 that means `RUnlock` called before `RLock`
if rw.counter == 0 {
panic("unlock of unlocked RWMutex")
}
rw.counter-- // Decrement reader's count
// If readers == 0 then release the reader lock
if rw.counter == 0 {
<-rw.readChan
}
}
func (rw *RWMutex) Lock() {
rw.writeChan <- struct{}{} // Acquire write lock
rw.readChan <- struct{}{} // Acquire read lock
// Notice that if `counter` > 0 then `Lock` will block
// until all readers are unlocked, i.e wait for all `RLock` to `Unlock`
}
func (rw *RWMutex) Unlock() {
select {
// Read from `writeChan` should be instant since `Lock` was called
case <-rw.writeChan:
default: // Otherwise that means `Unlock` was called before `Lock`
panic("unlock of unlocked RWMutex")
}
<-rw.readChan
}Wait Group โ๏ธ
WaitGroup used to notify goroutine about execution of other processes.
Example โ๏ธ
import (
"math/rand/v2"
"time"
)
func main() {
for range 100 {
go func() {
// simulate work
time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
}()
}
}In the code above we spawned 100 goroutines to do some process.
But our application might finish without waiting for all work to be done, which might lead to unexpected behaviour.
So as a solution we can use WaitGroup
import (
"math/rand/v2"
"sync"
"time"
)
func main() {
wg := sync.WaitGroup{}
for range 100 {
wg.Add(1)
go func() {
defer wg.Done()
// simulate work
time.Sleep(time.Duration(rand.IntN(10)) * time.Millisecond)
}()
}
wg.Wait()
}Signature ๐
RWMutex structure has the following signature
type WaitGroup struct {}
func New() *WaitGroup {}
func (wg *WaitGroup) Add(delta int) {}
func (wg *WaitGroup) Done() {}
func (wg *WaitGroup) Wait() {}Implementation โ
Implementation for sync.WaitGroup
type WaitGroup struct {
counter int
channel chan struct{}
mutex Mutex // protects `counter` variable
}
func New() *WaitGroup {
return &WaitGroup{
// Channel has buffer == 1 to allow `Wait` instant exectution
// when `counter` == 0
channel: make(chan struct{}, 1),
mutex: Mutex{channel: make(chan struct{}, 1)},
}
}
func (wg *WaitGroup) Add(delta int) {
wg.mutex.Lock()
defer wg.mutex.Unlock()
// If `counter` will increase and be postive from zero
// then acquire the lock for change so that `Wait` blocks
if wg.counter == 0 && delta > 0 {
wg.channel <- struct{}{}
}
wg.counter += delta // Update counter value
// If `counter` is negative then `Done` was called
// more than the delta added by `Add`
if wg.counter < 0 {
panic("negative WaitGroup counter")
}
// If counter returned back to zero then release acquired
// lock to unblock all `Wait` hanging if any
if wg.counter == 0 {
<-wg.channel
}
}
func (wg *WaitGroup) Done() {
wg.Add(-1) // Decrease counter by 1
}
func (wg *WaitGroup) Wait() {
// If wg.channel is empty we can then make a single write => read
// instantly, i.e. `counter` == 0
wg.channel <- struct{}{}
<-wg.channel
}If you made it that far, I wish you learned something new ๐ซ