RWMutex —— 細粒度的讀寫鎖
我們之前有講過 Mutex 互斥鎖。這是在任何時刻下只允許一個 goroutine 執行的串行化的鎖。而現在這個 RWMutex 就是在 Mutex 的基礎上進行了拓展能支持多個 goroutine 持有讀鎖,而在嘗試持有寫鎖時就會如 Mutex 一樣就會陷入等待鎖的釋放。它是一種細粒度的鎖。雖然可以允許多次持有讀鎖,但是 Go 團隊還特意囑咐,為了確保鎖的可用性,不能用於遞歸讀鎖。一個阻塞的鎖要排除正在持有鎖的新讀。
那么上面說到的這些功能,RWMutex 是如何實現的呢?首先我們來看它的內部結構:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
只有 5 個對象,其中最重要的就是 Mutex 鎖的字段 w,它就是實現寫鎖的關鍵。
- writerSem 是寫等待讀完成的信號量
- readerSem 是讀等待寫完成的信號量
- readerCount 正處於讀鎖的個數
- readerWait 嘗試獲取寫鎖時讀等待的個數(這個怎么理解?)
其中還有一個全局的常數變量 rwmutexMaxReaders,表示最多的讀操作。
我們先來看寫鎖
Lock/UnLock 寫鎖/解鎖
func (rw *RWMutex) Lock() {
...
rw.w.Lock()
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
這里直接用到了 Mutex 互斥鎖來保證只有一個 goroutine 能進來。接下來就會判斷在獲取寫鎖的時候如果還存在其他的讀鎖沒有釋放,那么這個時候就會陷入睡眠進入等待者隊列中等待所有的讀鎖被釋放之后喚醒。
可能有些人對這個限制有些不懂,其實這就是為了保證鎖的區間的讀的值順序性的正確性。因為在獲取寫的時候,目的就是進行寫操作,所謂我就必須要在此時還存在其他可能會讀這個變量的讀鎖全部釋放才行。
而釋放寫鎖就是 UnLock 操作了。如果調用此操作時,本就沒有上鎖那么就會直接拋異常。
func (rw *RWMutex) Unlock() {
...
// Announce to readers there is no active writer.
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
...
}
如果還存在讀鎖時,那么就會進入 runtime.Semrelease 對那些阻塞的讀鎖解鎖(找到對應的信號量等待者隊列然后彈出喚醒)。最后釋放 w 鎖。
RLock/RUnlock 讀鎖/解鎖
func (rw *RWMutex) RLock() {
...
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
...
}
讀鎖就非常簡單了,僅僅只是對 readerCount 字段自增。這里的判斷要注意,這個判斷成立說明有協程調用了 rw.Lock 獲取了寫鎖。所以就要等待其它協程的釋放。
知道讀鎖的機制,那么就能想到釋放讀鎖其實就是撤銷讀鎖,將 readerCount 字段減1即可。
func (rw *RWMutex) RUnlock() {
...
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
...
}
同樣在釋放讀鎖時會判斷 r 是否為負數,如果為負數就說明有其它協程獲取了寫鎖,就會進入 rUnlockSlow 方法。
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// A writer is pending.
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
如果鎖狀態已經是解鎖狀態則拋異常。
如果是只剩下一個讀等待,則釋放寫信號量通知其他正在嘗試持有寫鎖的協程上鎖。
關於信號量的細節
我們上面分析了讀寫鎖的上鎖與解鎖的過程,其實有一個點不知道大家有沒有注意。就是關於信號量的操作對象的細節。
- 調用 Lock 獲取寫鎖,會持有 writerSem 信號
- 調用 Unlock 釋放寫鎖時,會釋放 readerSem 信號
- 調用 RLock 獲取讀鎖時,會持有 readerSem 信號
- 調用 RUnlock 釋放讀鎖時,會釋放 writerSem 信號
大家有沒有發現其中的規律,這么做的目的是什么呢?
也就是說:我們在獲取寫鎖之前,會先等待讀鎖的釋放操作。而在獲取讀鎖時,會先等待寫鎖的釋放操作。
我們用反證法來假設這個場景:我這里有一個連續的寫操作;那么也就是說我要連續反復的調用 Lock + Unlock 操作。如果沒有上面的信號量的互相牽制,那么就很容易出現讀操作沒法執行的問題,也就是說會”餓死“。
所以 RWMutex 加入讀寫信號量的機制是為了更好達到 RW 的目的,而不是一直 W。
總結
- 在調用 Lock 獲取寫鎖時,會先等待 RUnlock 將其 readerCount 置為 0,然后成功獲取寫鎖。
- 還有一個操作是將 readerCount - rwmutexMaxReaders,其目的是為了阻塞后續的 RLock 操作。即在讀取寫鎖其他任何讀寫操作都不允許了。
- 在調用 Unlock 釋放寫鎖時,會通知所有讀操作,解鎖那些阻塞的讀鎖,然后成功釋放寫鎖。