golang中的鎖是通過CAS原子操作實現的,Mutex結構如下:
type Mutex struct {
state int32
sema uint32
}
//state表示鎖當前狀態,每個位都有意義,零值表示未上鎖
//sema用做信號量,通過PV操作從等待隊列中阻塞/喚醒goroutine,等待鎖的goroutine會掛到等待隊列中,並且陷入睡眠不被調度,unlock鎖時才喚醒。具體在sync/mutex.go Lock函數實現中。
插播一下sema
雖然在Mutex中就是一個整形字段,但是它是很重要的一環,這個字段就是用於信號量管理goroutine的睡眠和喚醒的。
sema具體實現還沒詳看,這里大概分析下功能,注意不准確!!
首先sema為goroutine的“調度”提供了一種實現,可以讓goroutine阻塞和喚醒
信號量申請資源在runtime/sema.go中semacquire1
信號量釋放資源在semrelease1中
首先sema中,一個semaRoot結構和一個全局semtable變量,一個semaRoot用於一個信號量的PV操作(猜測與goroutine調度模型MGP有關,一個Processor掛多個goroutine,對於一個processor下的多個goroutine的需要一個信號量來管理,當然需要一個輕量的鎖在goroutine的狀態轉換時加鎖,即下面的lock結構,這個鎖與Mutex中的鎖不相同的,是sema中自己實現的),多個semaRoot的分配和查找就通過全局變量semtable來管理
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait uint32 // Number of waiters. Read w/o the lock.
}
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
1 讓當前goroutine睡眠阻塞是通過goparkunlock實現的,在semacquire1中這樣調用:
1) root := semroot(addr)
semroot中是通過信號量地址找到semaRoot結構
2) 略過一段..... 直接到使當前goroutine睡眠位置
首先lock(&root.lock)上鎖
然后調用root.queue()讓當前goroutine進入等待隊列(注意一個信號量管理多個goroutine,goroutine睡眠前,本身的詳細信息就要保存起來,放到隊列中,也就是在掛到了semaRoot結構的treap上,看注釋隊列是用平衡樹實現的?)
3)調用goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4)
最后會調用到gopark,gopark會讓系統重新執行一次調度,在重新調度之前,會將當前goroutine,即G對象狀態置為sleep狀態,不再被調度直到被喚醒,然后unlock鎖,這個函數給了系統一個機會,將代碼執行權限轉交給runtime調度器,runtime會去調度別的goroutine。
2 既然阻塞,就需要有喚醒的機制
喚醒機制是通過semtable結構
sema.go並非專門為mutex鎖中的設計的,在mutex中使用的話,是在其它goroutine釋放Mutex時,調用的semrelease1,從隊列中喚醒goroutine執行。詳細沒看。
不過根據分析,Mutex是互斥鎖,Mutex中的信號量應該是二值信號量,只有0和1。在Mutex中調用Lock,假如執行到semacquire1,從中判斷信號量如果為0,就讓當前goroutine睡眠,
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
如果不斷有goroutine嘗試獲取Mutex鎖,都會判斷到信號量為0,會不斷有goroutine陷入睡眠狀態。只有當unlock時,信號量才會+1,當然不能重復執行unlock,所以這個信號量應該只為0和1。
大概分析了下sema,轉回到Mutex中來。
上面說了sema字段的作用,state字段在Mutex中是更為核心的字段,標識了當前鎖的一個狀態。
state |31|30|....| 2 | 1 | 0 |
| | | 第0位表示當前被加鎖,0,unlock, 1 locked
| | 是否有goroutine已被喚醒,0 喚醒, 1 沒有
| 這一位表示當前Mutex處於什么模式,兩種模式,0 Normal 1 Starving
第三位表示嘗試Lock這個鎖而等待的goroutine的個數
先解釋下Mutex的normal和starving兩種模式,代碼中關於Mutex的注釋如下


兩種模式是為了鎖的公平性而實現,摘取網上的一段翻譯:
http://blog.51cto.com/qiangmzsx/2134786
互斥量可分為兩種操作模式:正常和飢餓。
在正常模式下,等待的goroutines按照FIFO(先進先出)順序排隊,但是goroutine被喚醒之后並不能立即得到mutex鎖,它需要與新到達的goroutine爭奪mutex鎖。
因為新到達的goroutine已經在CPU上運行了,所以被喚醒的goroutine很大概率是爭奪mutex鎖是失敗的。出現這樣的情況時候,被喚醒的goroutine需要排隊在隊列的前面。
如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那么它就會變為飢餓模式。
在飢餓模式中,mutex鎖直接從解鎖的goroutine交給隊列前面的goroutine。新達到的goroutine也不會去爭奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待隊列尾部排隊。
在飢餓模式下,有一個goroutine獲取到mutex鎖了,如果它滿足下條件中的任意一個,mutex將會切換回去正常模式:
1. 是等待隊列中的最后一個goroutine
2. 它的等待時間不超過1ms。
正常模式有更好的性能,因為goroutine可以連續多次獲得mutex鎖;
飢餓模式對於預防隊列尾部goroutine一致無法獲取mutex鎖的問題。
具體實現如下:
在Lock函數中
// Fast path: grab unlocked mutex.
// 1 使用原子操作修改鎖狀態為locked
if
atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
Mutex多個goroutine在任何時機都會嘗試去獲取,Mutex的state又實時在變化,各種場景有點多,這里挑典型的來說。
1) 假設當前mutex處於初始狀態,即m.state=0,那么當前goroutine會在這里會直接獲取到鎖,m.state變為locked,
則m.state = 00...001 上鎖了,Not Woken, normal狀態。
運氣好,一來就獲取到,就跟上面說的一樣,來時就在cpu里,又趕上鎖沒人占,天生自帶光環,呵呵。
Lock結束return
如果這個goroutine不釋放鎖,那么然后再來一個goroutine就鎖不上了,進入第二步
2) 緊接着一個for循環,大概就是嘗試獲取鎖,求而不得,就睡一會吧,等着被叫醒,醒了看看是不是等的時間太長餓了,餓了就進入starving,starving就會被優先調度了,沒有那運氣,就只能等了。
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state //剛才已經設置m.state=001,old也為001
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
// old=001,鎖着呢
// 然后runtime_canSpin看看能不能自旋啊,就是看傳進來的iter,每次循環都是自增
// 自旋條件:多核,GOMAXPROCS>1,至少有另外一個運行的P並且本地隊列不空。或許是害怕單核自旋,程序都停了。另外最多自旋4次,iter為4時不會再進if
我們這里考慮多核的情況,會進if
// old在每次if中會重新獲取,這里自旋的目的就是等待鎖釋放,當前占用cpu的goroutine就可以占了,go里面總是盡量讓在cpu中的goroutine占用鎖
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
// 當前awoke為false,但是沒有goroutine在等待,那么unlock時,沒必要喚醒隊列goroutine。
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin() //自旋,執行沒用的指令30次
iter++
old = m.state //old重新獲取一次state值,如果有其它goroutine釋放了,那么下次循環就不進if了
continue //自旋完再循環一次
}
//if出來后,會有兩種情況
2.1)其它goroutine unlock了,上面if判斷非Locked跳出,此時 m.state=000, old=000, awoke=false, 沒有goroutine在等待,這是最簡單的情況了
new := old
//new=000, old=000, m.state=000, awoke=false,這里初始化new,后面要設置鎖狀態,m.state設置為new
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
//new=000, 當前鎖並不是starving模式,正在運行的goroutine要占用這個鎖,如果是starving模式,當前的goroutine要去排隊,把鎖讓給隊列中快餓死的兄弟
new |= mutexLocked
//new=001, 要上鎖
}
if old&(mutexLocked|mutexStarving) != 0 {
//old=000, 當前正在跑的這個goroutine要占鎖,不會進隊列, new=001
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
//starving=false,只有goroutine在unlock喚醒后,發現等待時間過長,starving才設置為true,因為隊列中其它的goroutine都等的有點長了,所以在鎖可用時,優先給隊列中的goroutine。這個邏輯在后面,當前不進這個if,new=001
new |= mutexStarving
}
if awoke {
//awoke為false,不去喚醒等待隊列, new仍為001
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
至此new初始化完畢,new=001,要去更改Mutex的鎖狀態,真正獨占鎖了
//保險起見,以防在new設置過程中,有其它goroutine更改了鎖狀態,原子性的設置當前鎖狀態為new=001,這里就是上鎖
if
atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
//old=000,直接break,因為上面是將m.state置為上鎖,已經成功了,至此后面邏輯不走了
break // locked the mutex with CAS
//回頭看2.1,我們如果是自旋次數夠了跳出呢?如2.2邏輯
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
2.2)new := old, 此時new=001, old=001, m.state=001, awoke=false (awoke在if中設置為true的情況就不討論了,太多了。。。。)
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
//new=001
}
if old&(mutexLocked|mutexStarving) != 0 {
//old=001, 當前跑的這個goroutine要進隊列,new的第3位到第31位表示隊列中goroutine數量,這里+1
new += 1 << mutexWaiterShift
//new=1001
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
//starving=false,並不需要進入starving模式
new |= mutexStarving
}
if awoke {
//awoke=false
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
new初始化為1001, old=001
if
atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
//old=001,這里不會break,因為當前的goroutine拿不到鎖需要阻塞睡眠
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
//判斷當前goroutine是不是for循環第一次走到這里,是的話,waitStartTime=0
if waitStartTime == 0 { //queueLifo的true還是false決定了goroutine入隊列時,是排隊還是插到隊頭
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo)
//當前goroutine入等待隊列, 跳到 “注腳1”,更多說明。此時goroutine會阻塞在這,鎖釋放,如果在隊頭,才會被喚醒。
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
//喚醒時判斷是否等待時間過長,超過了1ms,就設置starving為true,“注腳2”更多說明
old = m.state
if old&mutexStarving != 0 {
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
注腳1 這的runtime_SemacquireMutex是對上面說的sema.go中semacquire1的簡單封裝,里面最后會調用goPark讓當前goroutine讓出執行權限給runtime,同時設置當前goroutine為睡眠狀態,不參與調度(表現在程序上,就是阻在那了)。
注腳2 1) 這也分兩種情況,如果沒有超1ms,starving=false
old = m.state //當前肯定是unlock了,當前goroutine才被喚醒了,所以old至少為000,我們假定為000
if old&mutexStarving != 0 //old不是starving模式,不進if
awoke = true //充置awoke和iter,重新走循環
iter = 0
///////////////////////////
下次循環中,最后會設置new=001,當前goroutine被喚醒,加鎖1,不是starving狀態。
最后會在下面這break,跳出Lock函數
if
atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
2)如果超了1ms,straving = true
old = m.state //當前肯定是unlock了,當前goroutine才被喚醒了,所以old至少為000,我們假定為000
if old&mutexStarving != 0 //old不是starving模式,不進if
awoke = true //充置awoke和iter,重新走循環
iter = 0
///////////////////////////
下次循環 new=101, 鎖處於starving模式,當前goroutine被喚醒,已加鎖
二 如果處於starving會有什么影響?主要提現在Unlock函數中
// Fast path: drop lock bit.
//先清掉lock位,假設最簡單的情況,其它位都為0,則m.state=000, new=000
new := atomic.AddInt32(&m.state, -mutexLocked)
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
//這里就是starving模式的影響,如果處於starving模式,那么直接走else,從隊列頭部喚醒一個goroutine。
if new&mutexStarving == 0 {
old := new
//old = 000
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
//如果隊列中沒有等待的goroutine或者有goroutine已經被喚醒並且搶占了鎖(這種情況就如lock中,正好處在cpu中的goroutine在自旋,正好在unlock后,馬上搶占了鎖),那么就不需要wake等待隊列了。
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
//如果隊列中有等着的,並且也沒有處在cpu中的goroutine去自旋獲取鎖,那么就抓住機會從等待隊列中喚醒一個goroutine。
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if
atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
//starving模式,直接從隊列頭取goroutine喚醒。上面lock函數中沒有分析runtime_SemacquireMutex(&m.sema, queueLifo)阻塞被喚醒后,如果lock處於是starving模式,會怎么樣,這里分析一下,注腳3
runtime_Semrelease(&m.sema, true)
}
注腳3 首先在unlock函數開頭即使清了lock位,cpu中的goroutine也不能獲取到鎖(因為判斷m.state的starving位是飢餓模式,只能隊列中等待的goroutine取獲取鎖,所以cpu中的goroutine會進入等待隊列),那么在unlock函數中runtime_Semrelease(&m.sema, true)時,會喚醒隊列中一個睡眠的goroutine。
回到lock函數中,此時m.state應為100
runtime_SemacquireMutex(&m.sema, queueLifo)
//在這被喚醒
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
//old = 100
if old&mutexStarving != 0 {
//lock處於starving中
// If this goroutine was woken and mutex is in starvation mode,
// ownership was handed off to us but mutex is in somewhat
// inconsistent state: mutexLocked is not set and we are still
// accounted as waiter. Fix that.
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift)
//先將當前等待隊列減一個
if !starving || old>>mutexWaiterShift == 1 {
//如果當前隊列空了,就把starving清0了
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
//加鎖跳出
break
}
總結:這里只簡單說了下互斥鎖,另外還有讀寫鎖,不做贅述。互斥鎖是在原子操作atomic之上實現的,后面會再詳細寫下原子操作。
這里先說幾個有意思的問題,答案不一定正確,希望大佬指正。
1 一個全局int變量,多核中一個goroutine讀,一個寫,沒有更多操作,需不需要做原子操作。
應該是不需要加的,intel P6處理器在硬件層面上是支持32位變量的load和store的原子性的。另外編譯器對於變量的讀或寫也不會編譯成多條指令。
2 一個全局int變量i, 對於多核,兩個協程都同時執行i++,需要原子操作嗎?
需要的,對於i++,是典型的讀改寫操作,對於這樣的操作,需要CAS原子操作保證原子性。
3 對於一個map,寫加原子操作,讀要不要加
如果只是讀或者寫,並且值類型是整形的,應該是不需要atomic原子操作的,這里的意思是對於整形,不會出現寫一半,或者讀一半的情況,但是不可避免的,會出現這種情況,goroutine1對map寫入1,goroutine2讀到1,在處理的過程中,goroutine1又重新賦值。