多圖詳解Go的互斥鎖Mutex


轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com

本文使用的go的源碼時14.4

Mutex介紹

Mutex 結構體包含兩個字段:

  • 字段state:表示當前互斥鎖的狀態。

  • 字段 sema:是個信號量變量,用來控制等待 goroutine 的阻塞休眠和喚醒。

type Mutex struct {
	state int32
	sema  uint32
}

在Go的1.9版本中,為了解決等待中的 goroutine 可能會一直獲取不到鎖,增加了飢餓模式,讓鎖變得更公平,不公平的等待時間限制在 1 毫秒。

state狀態字段所表示的含義較為復雜,如下圖所示,最低三位分別表示mutexLocked、mutexWoken、mutexStarving,state總共是32位長度,所以剩下的位置,用來表示可以有1<<(32-3)個Goroutine 等待互斥鎖的釋放:

Group 1

代碼表示如下:

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
)

加鎖流程

fast path

func (m *Mutex) Lock() { 
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	} 
	m.lockSlow()
}

加鎖的時候,一開始會通過CAS看一下能不能直接獲取鎖,如果可以的話,那么直接獲取鎖成功。

lockSlow

// 等待時間
var waitStartTime int64
// 飢餓標記
starving := false
// 喚醒標記
awoke := false
// 自旋次數
iter := 0
// 當前的鎖的狀態
old := m.state
for { 
    // 鎖是非飢餓狀態,鎖還沒被釋放,嘗試自旋
	if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
		if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
			atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
			awoke = true
		}
		// 自旋
		runtime_doSpin()
		// 自旋次數加1
		iter++
		// 設置當前鎖的狀態
		old = m.state
		continue
	}
	...
}

進入到lockSlow方法之后首先會判斷以下能否可以自旋,判斷依據就是通過計算:

old&(mutexLocked|mutexStarving) == mutexLocked

可以知道當前鎖的狀態必須是上鎖,並且不能處於飢餓狀態,這個判斷才為true,然后再看看iter是否滿足次數的限制,如果都為true,那么則往下繼續。

內層if包含了四個判斷:

  • 首先判斷了awoke是不是喚醒狀態;

  • old&mutexWoken == 0為真表示沒有其他正在喚醒的節點;

  • old>>mutexWaiterShift != 0表明當前有正在等待的goroutine;

  • CAS將state的mutexWoken狀態位設置為old|mutexWoken,即為1是否成功。

如果都滿足,那么將awoke狀態設置為真,然后將自旋次數加一,並重新設置狀態。

繼續往下看:

new := old
if old&mutexStarving == 0 {
	// 如果當前不是飢餓模式,那么將mutexLocked狀態位設置1,表示加鎖
	new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
	// 如果當前被鎖定或者處於飢餓模式,則waiter加一,表示等待一個等待計數
	new += 1 << mutexWaiterShift
}
// 如果是飢餓狀態,並且已經上鎖了,那么mutexStarving狀態位設置為1,設置為飢餓狀態
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}
// awoke為true則表明當前線程在上面自旋的時候,修改mutexWoken狀態成功
if awoke { 
	if new&mutexWoken == 0 {
		throw("sync: inconsistent mutex state")
	}
	// 清除喚醒標志位
	new &^= mutexWoken
}

走到這里有兩種情況:1. 自旋超過了次數;2. 目前鎖沒有被持有。

所以第一個判斷,如果當前加了鎖,但是沒有處於飢餓狀態,也會重復設置new |= mutexLocked,即將mutexLocked狀態設置為1;

如果是old已經是飢餓狀態或者已經被上鎖了,那么需要設置Waiter加一,表示這個goroutine下面不會獲取鎖,會等待;

如果starving為真,表示當前goroutine是飢餓狀態,並且old已經被上鎖了,那么設置new |= mutexStarving,即將mutexStarving狀態位設置為1;

awoke如果在自旋時設置成功,那么在這里要new &^= mutexWoken消除mutexWoken標志位。因為后續流程很有可能當前線程會被掛起,就需要等待其他釋放鎖的goroutine來喚醒,如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該線程就無法再醒來加鎖。

繼續往下:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
	// 1.如果原來狀態沒有上鎖,也沒有飢餓,那么直接返回,表示獲取到鎖
	if old&(mutexLocked|mutexStarving) == 0 {
		break // locked the mutex with CAS
	}
	// 2.到這里是沒有獲取到鎖,判斷一下等待時長是否不為0
	// 如果不為0,那么加入到隊列頭部
	queueLifo := waitStartTime != 0
	// 3.如果等待時間為0,那么初始化等待時間
	if waitStartTime == 0 {
		waitStartTime = runtime_nanotime()
	}
	// 4.阻塞等待
	runtime_SemacquireMutex(&m.sema, queueLifo, 1)
	// 5.喚醒之后檢查鎖是否應該處於飢餓狀態
	starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
	old = m.state
	// 6.判斷是否已經處於飢餓狀態
	if old&mutexStarving != 0 { 
		if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
			throw("sync: inconsistent mutex state")
		}
		// 7.加鎖並且將waiter數減1
		delta := int32(mutexLocked - 1<<mutexWaiterShift)
		if !starving || old>>mutexWaiterShift == 1 { 
			// 8.如果當前goroutine不是飢餓狀態,就從飢餓模式切換會正常模式
			delta -= mutexStarving
		}
		// 9.設置狀態
		atomic.AddInt32(&m.state, delta)
		break
	}
	awoke = true
	iter = 0
} else {
	old = m.state
}

到這里,首先會CAS設置新的狀態,如果設置成功則往下走,否則返回之后循環設置狀態。設置成功之后:

  1. 首先會判斷old狀態,如果沒有飢餓,也沒有獲取到鎖,那么直接返回,因為這種情況在進入到這段代碼之前會將new狀態設置為mutexLocked,表示已經獲取到鎖。這里還判斷了一下old狀態不能為飢餓狀態,否則也不能獲取到鎖;
  2. 判斷waitStartTime是否已經初始化過了,如果是新的goroutine來搶占鎖,那么queueLifo會返回false;如果不是新的goroutine來搶占鎖,那么加入到等待隊列頭部,這樣等待最久的 goroutine 優先能夠獲取到鎖;
  3. 如果等待時間為0,那么初始化等待時間;
  4. 阻塞等待,當前goroutine進行休眠;
  5. 喚醒之后檢查鎖是否應該處於飢餓狀態,並設置starving變量值;
  6. 判斷是否已經處於飢餓狀態,如果不處於飢餓狀態,那么這里直接進入到下一個for循環中獲取鎖;
  7. 加鎖並且將waiter數減1,這里我看了一會,沒用懂什么意思,其實需要分兩步來理解,相當於state+mutexLocked,然后state再將waiter部分的數減一;
  8. 如果當前goroutine不是飢餓狀態或者waiter只有一個,就從飢餓模式切換會正常模式;
  9. 設置狀態;

下面用圖例來解釋:

這部分的圖解是休眠前的操作,休眠前會根據old的狀態來判斷能不能直接獲取到鎖,如果old狀態沒有上鎖,也沒有飢餓,那么直接break返回,因為這種情況會在CAS中設置加上鎖;

接着往下判斷,waitStartTime是否等於0,如果不等於,說明不是第一次來了,而是被喚醒后來到這里,那么就不能直接放到隊尾再休眠了,而是要放到隊首,防止長時間搶不到鎖;

Group 5

下面這張圖是處於喚醒后的示意圖,如何被喚醒的可以直接到跳到解鎖部分看完再回來。

被喚醒一開始是需要判斷一下當前的starving狀態以及等待的時間如果超過了1ms,那么會將starving設置為true;

接下來會有一個if判斷, 這里有個細節,因為是被喚醒的,所以判斷前需要重新獲取一下鎖,如果當前不是飢餓模式,那么會直接返回,然后重新進入到for循環中;

如果當前是處於飢餓模式,那么會計算一下delta為加鎖,並且當前的goroutine是可以直接搶占鎖的,所以需要將waiter減一,如果starving不為飢餓,或者等待時間沒有超過1ms,或者waiter只有一個了,那么還需要將delta減去mutexStarving,表示退出飢餓模式;

最后通過AddInt32將state加上delta,這里之所以可以直接加上,因為這時候state的mutexLocked值肯定為0,並且mutexStarving位肯定為1,並且在獲取鎖之前至少還有當前一個goroutine在等待隊列中,所以waiter可以直接減1。

Group 6

解鎖流程

fast path

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}
 	//返回一個state被減后的值	
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 { 
        //如果返回的state值不為0,那么進入到unlockSlow中
		m.unlockSlow(new)
	}
}

這里主要就是AddInt32重新設置state的mutexLocked位為0,然后判斷新的state值是否不為0,不為0則調用unlockSlow方法。

unlockSlow

Group 7

unlockSlow方法里面也分為正常模式和飢餓模式下的解鎖:

func (m *Mutex) unlockSlow(new int32) {
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
    // 正常模式
	if new&mutexStarving == 0 {
		old := new
		for { 
			// 如果沒有 waiter,或者已經有在處理的情況,直接返回
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			} 
			// waiter 數減 1,mutexWoken 標志設置上,通過 CAS 更新 state 的值
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
				// 直接喚醒等待隊列中的 waiter
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
			old = m.state
		}
	} else { // 飢餓模式
		// 直接喚醒等待隊列中的 waiter
		runtime_Semrelease(&m.sema, true, 1)
	}
}

在正常模式下,如果沒有 waiter,或者mutexLocked、mutexStarving、mutexWoken有一個不為零說明已經有其他goroutine在處理了,直接返回;如果互斥鎖存在等待者,那么通過runtime_Semrelease直接喚醒等待隊列中的 waiter;

在飢餓模式,直接調用runtime_Semrelease方法將當前鎖交給下一個正在嘗試獲取鎖的等待者,等待者被喚醒后會得到鎖。

總結

Mutex的設計非常的簡潔的,從代碼可以看出為了設計出這么簡潔的代碼state一個字段可以當4個字段使用。並且為了解決goroutine飢餓問題,在1.9 中 Mutex 增加了飢餓模式讓鎖變得更公平,不公平的等待時間限制在 1 毫秒,但同時,代碼也變得越來越難懂了,所以要理解它上面的思想需要慢慢的廢些時間細細的體會一下了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM