轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
本文使用的go的源碼時14.4
Mutex介紹
Mutex 結構體包含兩個字段:
-
字段state:表示當前互斥鎖的狀態。
-
字段 sema:是個信號量變量,用來控制等待 goroutine 的阻塞休眠和喚醒。
type Mutex struct {
state int32
sema uint32
}
在Go的1.9版本中,為了解決等待中的 goroutine 可能會一直獲取不到鎖,增加了飢餓模式,讓鎖變得更公平,不公平的等待時間限制在 1 毫秒。
state狀態字段所表示的含義較為復雜,如下圖所示,最低三位分別表示mutexLocked、mutexWoken、mutexStarving,state總共是32位長度,所以剩下的位置,用來表示可以有1<<(32-3)個Goroutine 等待互斥鎖的釋放:

代碼表示如下:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)
加鎖流程
fast path
func (m *Mutex) Lock() {
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
m.lockSlow()
}
加鎖的時候,一開始會通過CAS看一下能不能直接獲取鎖,如果可以的話,那么直接獲取鎖成功。
lockSlow
// 等待時間
var waitStartTime int64
// 飢餓標記
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
// 當前的鎖的狀態
old := m.state
for {
// 鎖是非飢餓狀態,鎖還沒被釋放,嘗試自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次數加1
iter++
// 設置當前鎖的狀態
old = m.state
continue
}
...
}
進入到lockSlow方法之后首先會判斷以下能否可以自旋,判斷依據就是通過計算:
old&(mutexLocked|mutexStarving) == mutexLocked
可以知道當前鎖的狀態必須是上鎖,並且不能處於飢餓狀態,這個判斷才為true,然后再看看iter是否滿足次數的限制,如果都為true,那么則往下繼續。
內層if包含了四個判斷:
-
首先判斷了awoke是不是喚醒狀態;
-
old&mutexWoken == 0為真表示沒有其他正在喚醒的節點; -
old>>mutexWaiterShift != 0表明當前有正在等待的goroutine; -
CAS將state的mutexWoken狀態位設置為
old|mutexWoken,即為1是否成功。
如果都滿足,那么將awoke狀態設置為真,然后將自旋次數加一,並重新設置狀態。
繼續往下看:
new := old
if old&mutexStarving == 0 {
// 如果當前不是飢餓模式,那么將mutexLocked狀態位設置1,表示加鎖
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
// 如果當前被鎖定或者處於飢餓模式,則waiter加一,表示等待一個等待計數
new += 1 << mutexWaiterShift
}
// 如果是飢餓狀態,並且已經上鎖了,那么mutexStarving狀態位設置為1,設置為飢餓狀態
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// awoke為true則表明當前線程在上面自旋的時候,修改mutexWoken狀態成功
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除喚醒標志位
new &^= mutexWoken
}
走到這里有兩種情況:1. 自旋超過了次數;2. 目前鎖沒有被持有。
所以第一個判斷,如果當前加了鎖,但是沒有處於飢餓狀態,也會重復設置new |= mutexLocked,即將mutexLocked狀態設置為1;
如果是old已經是飢餓狀態或者已經被上鎖了,那么需要設置Waiter加一,表示這個goroutine下面不會獲取鎖,會等待;
如果starving為真,表示當前goroutine是飢餓狀態,並且old已經被上鎖了,那么設置new |= mutexStarving,即將mutexStarving狀態位設置為1;
awoke如果在自旋時設置成功,那么在這里要new &^= mutexWoken消除mutexWoken標志位。因為后續流程很有可能當前線程會被掛起,就需要等待其他釋放鎖的goroutine來喚醒,如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該線程就無法再醒來加鎖。
繼續往下:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原來狀態沒有上鎖,也沒有飢餓,那么直接返回,表示獲取到鎖
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// 2.到這里是沒有獲取到鎖,判斷一下等待時長是否不為0
// 如果不為0,那么加入到隊列頭部
queueLifo := waitStartTime != 0
// 3.如果等待時間為0,那么初始化等待時間
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 4.阻塞等待
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 5.喚醒之后檢查鎖是否應該處於飢餓狀態
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 6.判斷是否已經處於飢餓狀態
if old&mutexStarving != 0 {
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 7.加鎖並且將waiter數減1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// 8.如果當前goroutine不是飢餓狀態,就從飢餓模式切換會正常模式
delta -= mutexStarving
}
// 9.設置狀態
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
到這里,首先會CAS設置新的狀態,如果設置成功則往下走,否則返回之后循環設置狀態。設置成功之后:
- 首先會判斷old狀態,如果沒有飢餓,也沒有獲取到鎖,那么直接返回,因為這種情況在進入到這段代碼之前會將new狀態設置為mutexLocked,表示已經獲取到鎖。這里還判斷了一下old狀態不能為飢餓狀態,否則也不能獲取到鎖;
- 判斷waitStartTime是否已經初始化過了,如果是新的goroutine來搶占鎖,那么queueLifo會返回false;如果不是新的goroutine來搶占鎖,那么加入到等待隊列頭部,這樣等待最久的 goroutine 優先能夠獲取到鎖;
- 如果等待時間為0,那么初始化等待時間;
- 阻塞等待,當前goroutine進行休眠;
- 喚醒之后檢查鎖是否應該處於飢餓狀態,並設置starving變量值;
- 判斷是否已經處於飢餓狀態,如果不處於飢餓狀態,那么這里直接進入到下一個for循環中獲取鎖;
- 加鎖並且將waiter數減1,這里我看了一會,沒用懂什么意思,其實需要分兩步來理解,相當於state+mutexLocked,然后state再將waiter部分的數減一;
- 如果當前goroutine不是飢餓狀態或者waiter只有一個,就從飢餓模式切換會正常模式;
- 設置狀態;
下面用圖例來解釋:
這部分的圖解是休眠前的操作,休眠前會根據old的狀態來判斷能不能直接獲取到鎖,如果old狀態沒有上鎖,也沒有飢餓,那么直接break返回,因為這種情況會在CAS中設置加上鎖;
接着往下判斷,waitStartTime是否等於0,如果不等於,說明不是第一次來了,而是被喚醒后來到這里,那么就不能直接放到隊尾再休眠了,而是要放到隊首,防止長時間搶不到鎖;

下面這張圖是處於喚醒后的示意圖,如何被喚醒的可以直接到跳到解鎖部分看完再回來。
被喚醒一開始是需要判斷一下當前的starving狀態以及等待的時間如果超過了1ms,那么會將starving設置為true;
接下來會有一個if判斷, 這里有個細節,因為是被喚醒的,所以判斷前需要重新獲取一下鎖,如果當前不是飢餓模式,那么會直接返回,然后重新進入到for循環中;
如果當前是處於飢餓模式,那么會計算一下delta為加鎖,並且當前的goroutine是可以直接搶占鎖的,所以需要將waiter減一,如果starving不為飢餓,或者等待時間沒有超過1ms,或者waiter只有一個了,那么還需要將delta減去mutexStarving,表示退出飢餓模式;
最后通過AddInt32將state加上delta,這里之所以可以直接加上,因為這時候state的mutexLocked值肯定為0,並且mutexStarving位肯定為1,並且在獲取鎖之前至少還有當前一個goroutine在等待隊列中,所以waiter可以直接減1。

解鎖流程
fast path
func (m *Mutex) Unlock() {
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
//返回一個state被減后的值
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
//如果返回的state值不為0,那么進入到unlockSlow中
m.unlockSlow(new)
}
}
這里主要就是AddInt32重新設置state的mutexLocked位為0,然后判斷新的state值是否不為0,不為0則調用unlockSlow方法。
unlockSlow

unlockSlow方法里面也分為正常模式和飢餓模式下的解鎖:
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
// 正常模式
if new&mutexStarving == 0 {
old := new
for {
// 如果沒有 waiter,或者已經有在處理的情況,直接返回
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// waiter 數減 1,mutexWoken 標志設置上,通過 CAS 更新 state 的值
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 直接喚醒等待隊列中的 waiter
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else { // 飢餓模式
// 直接喚醒等待隊列中的 waiter
runtime_Semrelease(&m.sema, true, 1)
}
}
在正常模式下,如果沒有 waiter,或者mutexLocked、mutexStarving、mutexWoken有一個不為零說明已經有其他goroutine在處理了,直接返回;如果互斥鎖存在等待者,那么通過runtime_Semrelease直接喚醒等待隊列中的 waiter;
在飢餓模式,直接調用runtime_Semrelease方法將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖。
總結
Mutex的設計非常的簡潔的,從代碼可以看出為了設計出這么簡潔的代碼state一個字段可以當4個字段使用。並且為了解決goroutine飢餓問題,在1.9 中 Mutex 增加了飢餓模式讓鎖變得更公平,不公平的等待時間限制在 1 毫秒,但同時,代碼也變得越來越難懂了,所以要理解它上面的思想需要慢慢的廢些時間細細的體會一下了。
