go學習筆記 sync/RWMutex源碼


RWMutex是一個讀寫鎖,該鎖可以加多個讀鎖或者一個寫鎖,其經常用於讀次數遠遠多於寫次數的場景.
func (rw *RWMutex) Lock() 寫鎖,如果在添加寫鎖之前已經有其他的讀鎖和寫鎖,則lock就會阻塞直到該鎖可用,為確保該鎖最終可用,已阻塞的 Lock 調用會從獲得的鎖中排除新的讀取器,即寫鎖權限高於讀鎖,有寫鎖時優先進行寫鎖定
func (rw *RWMutex) Unlock() 寫鎖解鎖,如果沒有進行寫鎖定,則就會引起一個運行時錯誤.
func (rw *RWMutex) RLock() 讀鎖,當有寫鎖時,無法加載讀鎖,當只有讀鎖或者沒有鎖時,可以加載讀鎖,讀鎖可以加載多個,所以適用於"讀多寫少"的場景
func (rw *RWMutex) RUnlock() 讀鎖解鎖,RUnlock 撤銷單次 RLock 調用,它對於其它同時存在的讀取器則沒有效果。若 rw 並沒有為讀取而鎖定,調用 RUnlock 就會引發一個運行時錯誤

 
package sync
 
import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)
 
// There is a modified copy of this file in runtime/rwmutex.go.
// If you make any changes here, see if you should make them there.
 
// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
    w           Mutex  // held if there are pending writers // 互斥鎖
    writerSem   uint32 // semaphore for writers to wait for completing readers 寫鎖信號量
    readerSem   uint32 // semaphore for readers to wait for completing writers 讀鎖信號量
    readerCount int32  // number of pending readers 讀鎖計數器
    readerWait  int32  // number of departing readers 獲取寫鎖時需要等待的讀鎖釋放數量
}
 
const rwmutexMaxReaders = 1 << 30 // 支持最多2^30個讀鎖
 
// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type.
// 它不應該用於遞歸讀鎖定;
func (rw *RWMutex) RLock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // 每次goroutine獲取讀鎖時,readerCount+1
    // 如果寫鎖已經被獲取,那么readerCount在 - rwmutexMaxReaders與 0 之間,這時掛起獲取讀鎖的goroutine,
    // 如果寫鎖沒有被獲取,那么readerCount>=0,獲取讀鎖,不阻塞
    // 通過readerCount的正負判斷讀鎖與寫鎖互斥,如果有寫鎖存在就掛起讀鎖的goroutine,多個讀鎖可以並行
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // A writer is pending, wait for it.
        // 將goroutine排到G隊列的后面,掛起goroutine, 監聽readerSem信號量
        runtime_SemacquireMutex(&rw.readerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
    }
}
 
// RUnlock undoes a single RLock call;
// it does not affect other simultaneous readers.
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
// 讀鎖不會影響其他讀操作
// 如果在進入RUnlock時沒有鎖沒有被施加讀鎖的話,則會出現運行時錯誤。
func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        _ = rw.w.state
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    // 讀鎖計數器 -1
    // 有四種情況,其中后面三種都會進這個 if
    // 【一】有讀鎖,但沒有寫鎖被掛起
    // 【二】有讀鎖,且也有寫鎖被掛起
    // 【三】沒有讀鎖且沒有寫鎖被掛起的時候, r+1 == 0
    // 【四】沒有讀鎖但是有寫鎖被掛起,則 r+1 == -(1 << 30)
    if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
    if race.Enabled {
        race.Enable()
    }
}
 
func (rw *RWMutex) rUnlockSlow(r int32) {
        // 讀鎖早就被沒有了,那么在此 -1 是需要拋異常的
        // 這里只有當讀鎖沒有的時候才會出現的兩種極端情況
        // 【一】沒有讀鎖且沒有寫鎖被掛起的時候, r+1 == 0
        // 【二】沒有讀鎖但是有寫鎖被掛起,則 r+1 == -(1 << 30)
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        throw("sync: RUnlock of unlocked RWMutex")
    }
    // A writer is pending.
        // 如果獲取寫鎖時的goroutine被阻塞,這時需要獲取讀鎖的goroutine全部都釋放,才會被喚醒
        // 更新需要釋放的 寫鎖的等待讀鎖釋放數目
        // 最后一個讀鎖解除時,寫鎖的阻塞才會被解除.
    if atomic.AddInt32(&rw.readerWait, -1) == 0 {
        // The last reader unblocks the writer.
        // 更新信號量,通知被掛起的寫鎖去獲取鎖
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}
 
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
// 對一個已經lock的rw上鎖會被阻塞
// 如果鎖已經鎖定以進行讀取或寫入,則鎖定將被阻塞,直到鎖定可用。
func (rw *RWMutex) Lock() {
    if race.Enabled {
        _ = rw.w.state
        race.Disable()
    }
    // First, resolve competition with other writers.
    // 首先,獲取互斥鎖,與其他來獲取寫鎖的goroutine 互斥
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    // 告訴其他來獲取讀鎖操作的goroutine,現在有人獲取了寫鎖
    // 減去最大的讀鎖數量,用0 -負數 來表示寫鎖已經被獲取
    r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    // 設置需要等待釋放的讀鎖數量,如果有,則掛起獲取 競爭寫鎖 goroutine
    if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 掛起,監控寫鎖信號量
        runtime_SemacquireMutex(&rw.writerSem, false, 0)
    }
    if race.Enabled {
        race.Enable()
        race.Acquire(unsafe.Pointer(&rw.readerSem))
        race.Acquire(unsafe.Pointer(&rw.writerSem))
    }
}
 
// Unlock unlocks rw for writing. It is a run-time error if rw is
// not locked for writing on entry to Unlock.
//
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) a RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
// 如果在寫鎖時,rw沒有被解鎖,則會出現運行時錯誤。
// 與互斥鎖一樣,鎖定的RWMutex與特定的goroutine無關。
// 一個goroutine可以RLock(鎖定)RWMutex然后安排另一個goroutine到RUnlock(解鎖)它。
func (rw *RWMutex) Unlock() {
    if race.Enabled {
        _ = rw.w.state
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }
 
    // Announce to readers there is no active writer.
    // 向讀鎖的goroutine發出通知,現在已經沒有寫鎖了
    // 還原加鎖時減去的那一部分readerCount
    r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        // 讀鎖數目超過了 最大允許數
        race.Enable()
        throw("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    // 喚醒獲取讀鎖期間所有被阻塞的goroutine
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock() // 釋放互斥鎖資源
    if race.Enabled {
        race.Enable()
    }
}
 
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
// RLocker返回一個Locker接口的實現
// 通過調用rw.RLock和rw.RUnlock來鎖定和解鎖方法。
func (rw *RWMutex) RLocker() Locker {
    return (*rlocker)(rw)
}
 
type rlocker RWMutex
 
func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }

從上面的代碼中我們可以看到,讀寫鎖首先是內置了一個互斥鎖,然后再加上維護各種計數器來實現的讀寫鎖,緊接着提供了四個函數支撐着讀寫鎖操作,由 Lock 和Unlock 分別支持寫鎖的鎖定和釋放,由RLock  和RUnlock 來支持讀鎖的的鎖定和釋放。其中,讀鎖不涉及 內置mutex的使用,寫鎖用了mutex來排斥其他寫鎖。

讀寫互斥鎖的實現比較有技巧性一些,需要幾點

1. 讀鎖不能阻塞讀鎖,引入readerCount實現

2. 讀鎖需要阻塞寫鎖,直到所以讀鎖都釋放,引入readerSem實現

3. 寫鎖需要阻塞讀鎖,直到所以寫鎖都釋放,引入wirterSem實現

4. 寫鎖需要阻塞寫鎖,引入Metux實現

【讀鎖的】Rlock:

【讀鎖的】RUnlock:

【寫鎖的】Lock:

【寫鎖的】Unlock:

參考 https://blog.csdn.net/qq_25870633/article/details/83448234


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM