go學習筆記 sync/mutex源碼


Mutex 是一個互斥鎖,可以創建為其他結構體的字段;零值為解鎖狀態。Mutex 類型的鎖和線程無關,可以由不同的線程加鎖和解鎖。

在一個goroutine獲得 Mutex 后,其他goroutine只能等到這個goroutine釋放該Mutex使用Lock() 加鎖后,不能再繼續對其加鎖,直到利用 Unlock() 解鎖后才能再加鎖
在 Lock() 之前使用 Unlock() 會導致 panic 異常
已經鎖定的 Mutex 並不與特定的 goroutine 相關聯,這樣可以利用一個 goroutine 對其加鎖,再利用其他 goroutine 對其解鎖
在同一個 goroutine 中的 Mutex 解鎖之前再次進行加鎖,會導致死鎖
適用於讀寫不確定,並且只有一個讀或者寫的場景

package sync
 
import (
    "internal/race"
    "sync/atomic"
    "unsafe"
)
// 被定義在 runtime 包中,src/runtime/panic.go 的 sync_throw 方法
func throw(string) // provided by runtime 
 
// A Mutex is a mutual exclusion lock. mutex 是一個互斥鎖
// The zero value for a Mutex is an unlocked mutex. 零值是沒有被上鎖的互斥鎖。
// A Mutex must not be copied after first use. 首次使用后,不得復制互斥鎖。
type Mutex struct {
    // 將一個32位整數拆分為
    // 當前阻塞的goroutine數目(29位)|飢餓狀態(1位)|喚醒狀態(1位)|鎖狀態(1位) 的形式,來簡化字段設計
    state int32
    sema  uint32  // 信號量
}
 
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
    Lock()
    Unlock()
}
 
const (
    //用最后一位表示當前對象鎖的狀態,0-未鎖住 1-已鎖住
    mutexLocked = 1 << iota // mutex is locked  1 表示是否被鎖定  0001 
 
    //用倒數第二位表示當前對象是否被喚醒 0- 未喚醒 1-喚醒  
    //【注意: 未被喚醒並不是指休眠,而是指為了讓所能被設置被喚醒的一個初始值】
    mutexWoken  //2 表示是否被喚醒  0010 
 
    //用倒數第三位表示當前對象是否為飢餓模式,0為正常模式,1為飢餓模式。
    mutexStarving  //4 表示是否飢餓   0100
 
    //3 表示 從倒數第四位往前的bit位表示在排隊等待的goroutine數目(共對於 32位中占用 29 位)
    mutexWaiterShift = iota
 
    // Mutex fairness.
    //
    // Mutex can be in 2 modes of operations: normal and starvation.
    // In normal mode waiters are queued in FIFO order, but a woken up waiter
    // does not own the mutex and competes with new arriving goroutines over
    // the ownership. New arriving goroutines have an advantage -- they are
    // already running on CPU and there can be lots of them, so a woken up
    // waiter has good chances of losing. In such case it is queued at front
    // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
    // it switches mutex to the starvation mode.
    //
    // In starvation mode ownership of the mutex is directly handed off from
    // the unlocking goroutine to the waiter at the front of the queue.
    // New arriving goroutines don't try to acquire the mutex even if it appears
    // to be unlocked, and don't try to spin. Instead they queue themselves at
    // the tail of the wait queue.
    //
    // If a waiter receives ownership of the mutex and sees that either
    // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
    // it switches mutex back to normal operation mode.
    //
    // Normal mode has considerably better performance as a goroutine can acquire
    // a mutex several times in a row even if there are blocked waiters.
    // Starvation mode is important to prevent pathological cases of tail latency.
    /** 互斥量可分為兩種操作模式:正常和飢餓。
    【正常模式】,等待的goroutines按照FIFO(先進先出)順序排隊,但是goroutine被喚醒之后並不能
      立即得到mutex鎖,它需要與新到達的goroutine爭奪mutex鎖。因為新到達的goroutine已經在CPU 
     上運行了,所以被喚醒的goroutine很大概率是爭奪mutex鎖是失敗 的。出現這樣的情況時候,被喚 
     醒goroutine需要排隊在隊列的前面。如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那么 
     它就會變為飢餓模式。在飢餓模式中,mutex鎖直接從解鎖的goroutine交給隊列前面的goroutine。 
     新 達到的goroutine也不會去爭奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待隊列尾部排隊。
    【飢餓模式】,鎖的所有權將從unlock的gorutine直接交給交給等待隊列中的第一個。新來的 
      goroutine將不會嘗試去獲得鎖,即使鎖看起來是unlock狀態, 也不會去嘗試自旋操作,而是放在等 
      待隊列的尾部。如果有一個等待的goroutine獲取到mutex鎖了,如果它滿足下條件中的任意一個, 
     mutex將會切換回去正常模式:
    1. 是等待隊列中的最后一個goroutine
    2. 它的等待時間不超過1ms。
    正常模式:有更好的性能,因為goroutine可以連續多次獲得mutex鎖;
    飢餓模式:能阻止尾部延遲的現象,對於預防隊列尾部goroutine一致無法獲取mutex鎖的問題。
    */
    starvationThresholdNs = 1e6  // 1ms
)
 
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
/*
如果鎖已經在使用中,則調用goroutine 直到互斥鎖可用為止。
在此之前我們必須先說下 四個重要的方法;
【runtime_canSpin】:在 src/runtime/proc.go 中被實現 sync_runtime_canSpin;表示 比較保守的自旋,
                    golang中自旋鎖並不會一直自旋下去,在runtime包中runtime_canSpin方法做了一些限制,
                    傳遞過來的iter大等於4或者cpu核數小等於1,最大邏輯處理器大於1,至少有個本地的P隊列,
                    並且本地的P隊列可運行G隊列為空。
【runtime_doSpin】:在src/runtime/proc.go 中被實現 sync_runtime_doSpin;表示會調用procyield函數,
                    該函數也是匯編語言實現。函數內部循環調用PAUSE指令。PAUSE指令什么都不做,
                    但是會消耗CPU時間,在執行PAUSE指令時,CPU不會對它做不必要的優化。
【runtime_SemacquireMutex】:在 src/runtime/sema.go中被實現sync_runtime_SemacquireMutex;
                            表示通過信號量 阻塞當前協程
【runtime_Semrelease】: 在src/runtime/sema.go 中被實現 sync_runtime_Semrelease
*/
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    // 如果m.state為0,說明當前的對象還沒有被鎖住,進行原子性賦值操作設置為mutexLocked狀態,CompareAnSwapInt32返回true
    // 否則說明對象已被其他goroutine鎖住,不會進行原子賦值操作設置,CopareAndSwapInt32返回false
    /**
    如果mutext的state沒有被鎖,也沒有等待/喚醒的goroutine, 鎖處於正常狀態,那么獲得鎖,返回.
    比如鎖第一次被goroutine請求時,就是這種狀態。或者鎖處於空閑的時候,也是這種狀態
     */
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }
    /** 在鎖定沒有成功的時候,才會往下面走 */
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}
 
func (m *Mutex) lockSlow() {
        /**
        首先判斷是否已經加鎖並處於正常模式,
        將原先鎖的state & (1 和 4 | 的結果,目的就是為了檢驗 state 是處於 1 還是 4 狀態, 還                是兩者都是.
        如果與1相等,則說明此時處於 正常模式並且已經加鎖,而后判斷當前協程是否可以自旋。
        如果可以自旋,則通過右移三位判斷是否還有協程正在等待這個鎖,
        如果有,並通過 低2位 判斷是否該所處於被喚醒狀態,
        如果並沒有,則將其狀態量設為被喚醒的狀態,之后進行自旋,直到該協程自旋數量達到上限,
        或者當前鎖被解鎖,
        或者當前鎖已經處於 飢餓模式
 */
    // 標記本goroutine的等待時間   開始等待時間戳
    var waitStartTime int64 
    // 本goroutine是否已經處於飢餓狀態 ,飢餓模式標識 true: 飢餓  false: 未飢餓
    starving := false 
    // 本goroutine是否已喚醒 被喚醒標識  true: 被喚醒   flase: 未被喚醒
    awoke := false
    iter := 0 // 自旋次數
    old := m.state / 保存當前對象鎖狀態,做對比用
    for {
        // Don't spin in starvation mode, ownership is handed off to waiters
        // so we won't be able to acquire the mutex anyway.
        // 不要在飢餓模式下自旋,將鎖的控制權交給阻塞任務,否則無論如何當前goroutine都無法獲得互斥鎖。
        /**
         相當於xxxx...x0xx & 0101 = 01,當前對象鎖被使用
         old & (是否鎖定|是否飢餓) == 是否鎖定
         runtime_canSpin() 表示 是否可以自旋。runtime_canSpin返回true,可以自旋。
        即: 判斷當前goroutine是否可以進入自旋鎖
        第一個條件:是state已被鎖,但是不是飢餓狀態。如果時飢餓狀態,自旋時沒有用的,鎖的擁有權直接交給了等待隊列的第一個。
        第二個條件:是還可以自旋,多核、壓力不大並且在一定次數內可以自旋, 具體的條件可以參考`sync_runtime_canSpin`的實現。
        如果滿足這兩個條件,不斷自旋來等待鎖被釋放、或者進入飢餓狀態、或者不能再自旋。
         */
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // Active spinning makes sense.
            // Try to set mutexWoken flag to inform Unlock
            // to not wake other blocked goroutines.
            // 主動旋轉是有意義的。試着設置 mutexWoken (鎖喚醒)標志,告知解鎖,不喚醒其他阻塞的goroutines。
            // old&mutexWoken == 0 再次確定是否被喚醒: xxxx...xx0x & 0010 = 0
            // old>>mutexWaiterShift != 0 查看是否有goroution在排隊
            // tomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 將對象鎖在老狀態上追加喚醒狀態:xxxx...xx0x | 0010 = xxxx...xx1x
 
            // 如果當前標識位 awoke為 未被喚醒 && (old 也為 未被喚醒) && 有正在等待的 goroutine && 則修改 old 為 被喚醒
            // 且修改標識位 awoke 為 true 被喚醒
            /**
            自旋的過程中如果發現state還沒有設置woken標識,則設置它的woken標識, 並標記自己為被喚醒。
             */
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true // 更改標識位為 喚醒true
            }
            // 進入自旋鎖后當前goroutine並不掛起,仍然在占用cpu資源,所以重試一定次數后,不會再進入自旋鎖邏輯
            runtime_doSpin()
            iter++ // 累加自旋次數
            old = m.state // 保存mutex對象即將被設置成的狀態
            continue
        }
        // 以下代碼是不使用**自旋**的情況
        /**
         到了這一步, state的狀態可能是:
         1. 鎖還沒有被釋放,鎖處於正常狀態
         2. 鎖還沒有被釋放, 鎖處於飢餓狀態
         3. 鎖已經被釋放, 鎖處於正常狀態
         4. 鎖已經被釋放, 鎖處於飢餓狀態
         並且本gorutine的 awoke可能是true, 也可能是false (其它goutine已經設置了state的woken標識)
         new 復制 state的當前狀態, 用來設置新的狀態
         old 是鎖當前的狀態
         */
 
        new := old
 
        /** 下面的幾個 if 分別是並列語句,來判斷如何設置state的new狀態 */
        /**
        如果old state狀態不是飢餓狀態, new state設置鎖,嘗試通過CAS獲取鎖,
        如果old state狀態是飢餓狀態, 則不設置new state的鎖,因為飢餓狀態下鎖直接轉給等待隊列的第一個.
         */
        // 不要試圖獲得飢餓goroutine的互斥鎖,新來的goroutines必須排隊。
        // 對象鎖飢餓位被改變 為 1 ,說明處於飢餓模式
        // xxxx...x0xx & 0100 = 0xxxx...x0xx
        // Don't try to acquire starving mutex, new arriving goroutines must queue.
        /**【一】如果是正常狀態 (如果是正常,則可以競爭到鎖) */
        if old&mutexStarving == 0 {
            // xxxx...x0xx | 0001 = xxxx...x0x1,將標識對象鎖被鎖住
            new |= mutexLocked
        }
        /** 【二】處於飢餓且鎖被占用 狀態下  */
        // xxxx...x1x1 & (0001 | 0100) => xxxx...x1x1 & 0101 != 0;當前mutex處於飢餓模式並且鎖已被占用,新加入進來的goroutine放到隊列后面,所以 等待者數目 +1
        if old&(mutexLocked|mutexStarving) != 0 {
            // 更新阻塞goroutine的數量,表示mutex的等待goroutine數目加1
            // 首先,如果此時還是由於別的協程的占用無法獲得鎖或者處於飢餓模式,都在其state加8表示有新的協程正在處於等待狀態
            new += 1 << mutexWaiterShift
        }
        // The current goroutine switches mutex to starvation mode.
        // But if the mutex is currently unlocked, don't do the switch.
        // Unlock expects that starving mutex has waiters, which will not
        // be true in this case.
        /**
        如果之前由於自旋而將該鎖喚醒,那么此時將其低二位的狀態量重置為0 (即 未被喚醒)。
        之后判斷starving是否為true,如果為true說明在上一次的循環中,
        鎖需要被定義為 飢餓模式,那么在這里就將相應的狀態量低3位設置為1表示進入飢餓模式
        */
        /***
        【三】
        如果當前goroutine已經處於飢餓狀態 (表示當前 goroutine 的飢餓標識位 starving), 並且old state的已被加鎖,
        將new state的狀態標記為飢餓狀態, 將鎖轉變為飢餓狀態.
         */
        // 當前的goroutine將互斥鎖轉換為飢餓模式。但是,如果互斥鎖當前沒有解鎖,就不要打開開關,設置mutex狀態為飢餓模式。Unlock預期有飢餓的goroutine
        // old&mutexLocked != 0  xxxx...xxx1 & 0001 != 0;鎖已經被占用
        // 如果 飢餓且已被鎖定
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving // 【追加】飢餓狀態
        }
        /**
        【四】
        如果本goroutine已經設置為喚醒狀態, 需要清除new state的喚醒標記, 因為本goroutine要么獲得了鎖,要么進入休眠,總之state的新狀態不再是woken狀態.
         */
        // 如果 goroutine已經被喚醒,因此需要在兩種情況下重設標志
        if awoke {
            // The goroutine has been woken from sleep,
            // so we need to reset the flag in either case.
            // xxxx...xx0x & 0010 == 0,如果喚醒標志為與awoke的值不相協調就panic
            // 即 state 為 未被喚醒
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
        // new & (^mutexWoken) => xxxx...xxxx & (^0010) => xxxx...xxxx & 1101 = xxxx...xx0x
            // 設置喚醒狀態位0,被未喚醒【只是為了,下次被可被設置為i被喚醒的初識化標識,而不是指休眠】
            new &^= mutexWoken
        }
        /**
        之后嘗試通過cas將new的state狀態量賦值給state,
        如果失敗,則重新獲得其state在下一步循環重新重復上述的操作。
        如果成功,首先判斷已經阻塞時間(通過標記本goroutine的等待時間waitStartTime ),如果為零,則從現在開始記錄
        */
        // 將新的狀態賦值給state,注意new的鎖標記不一定是true, 也可能只是標記一下鎖的state是飢餓狀態
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            /**
             如果old state的狀態是未被鎖狀態,並且鎖不處於飢餓狀態,
             那么當前goroutine已經獲取了鎖的擁有權,返回
             */
            // xxxx...x0x0 & 0101 = 0,表示可以獲取對象鎖 (即還是判斷之前的狀態,鎖不是飢餓 也不是被被鎖定,鎖已經可用了)
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            // 以下的操作都是為了判斷是否從【飢餓模式】中恢復為【正常模式】
            // 判斷處於FIFO還是LIFO模式
            // 如果等待時間不為0 那么就是 LIFO
            // 在正常模式下,等待的goroutines按照FIFO(先進先出)順序排隊
            //  設置/計算本goroutine的等待時間
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime() // 更新等待時間
            }
            // 通過runtime_SemacquireMutex()通過信號量將當前協程阻塞
            // 函數 runtime_SemacquireMutex 定義在 sema.go
            /**
            既然未能獲取到鎖, 那么就使用 [sleep原語] 阻塞本goroutine
            如果是新來的goroutine,queueLifo=false, 加入到等待隊列的尾部,耐心等待
            如果是喚醒的goroutine, queueLifo=true, 加入到等待隊列的頭部
             */
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)
 
            // 當之前調用 runtime_SemacquireMutex 方法將當前新進來爭奪鎖的協程掛起后,如果協程被喚醒,那么就會繼續下面的流程
            // 如果當前 飢餓狀態標識為飢餓||當前時間-開始等待時間>1ms 則 都切換為飢餓狀態標識
            //使用 [sleep原語] 之后,此goroutine被喚醒,計算當前goroutine是否已經處於飢餓狀態.
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            // 刷新下 中轉變量
            /** 得到當前的鎖狀態 */
            old = m.state
 
            /**
            如果當前的state已經是飢餓狀態
            那么鎖應該處於Unlock狀態,那么應該是鎖被直接交給了本goroutine
             */
            if old&mutexStarving != 0 {
                // If this goroutine was woken and mutex is in starvation mode,
                // ownership was handed off to us but mutex is in somewhat
                // inconsistent state: mutexLocked is not set and we are still
                // accounted as waiter. Fix that.
                 /**
                 如果當前的state已被鎖,或者已標記為喚醒, 或者等待的隊列中為空,那么state是一個非法狀態
                  */
                // xxxx...xx11 & 0011 != 0 又可能是被鎖定,又可能是被喚醒 或者 沒有等待的goroutine
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // delta 表示當前狀態下的等待數
                // 否則下一次的循環中將該鎖設置為 飢餓模式。
                // 如果已經是這個模式,那么就會將 狀態量的等待數 減1
                /**
                當前goroutine用來設置鎖,並將等待的goroutine數減1.
                lock狀態 -一個gorotine數,表示狀態 delta == (lock + (減去一個等待goroutine數))
                 */
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
 
                // 並判斷當前如果已經沒有等待的協程,就沒有必要繼續維持飢餓模式,同時也沒必要繼續執行該循環(當前只有一個協程在占用鎖)
                /**
                如果本goroutine並不處於飢餓狀態,或者它是最后一個等待者,
                那么我們需要把鎖的state狀態設置為正常模式.
                 */
                if !starving || old>>mutexWaiterShift == 1 {
                    // Exit starvation mode.
                    // Critical to do it here and consider wait time.
                    // Starvation mode is so inefficient, that two goroutines
                    // can go lock-step infinitely once they switch mutex
                    // to starvation mode.
                    // 退出飢餓模式。
                    // 在這里做到並考慮等待時間至關重要。
                    // 飢餓模式是如此低效,一旦將互斥鎖切換到飢餓模式,兩個goroutine就可以無限鎖定。
                    delta -= mutexStarving
                }
                // 設置新state, 因為已經獲得了鎖,退出、返回
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 修改為本goroutine 是否被喚醒標識位
            /**
            如果當前的鎖是正常模式,本goroutine被喚醒,自旋次數清零,從for循環開始處重新開始
             */
            awoke = true
            iter = 0
        } else {
            // 如果CAS不成功,重新獲取鎖的state, 從for循環開始處重新開始 繼續上述動作
            old = m.state
        }
    }
 
    if race.Enabled {
        race.Acquire(unsafe.Pointer(m))
    }
}
 
// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
// 解鎖一個未被鎖定的互斥鎖時,是會報錯
// 鎖定的互斥鎖與特定的goroutine無關。
// 允許一個goroutine鎖定Mutex然后
// 安排另一個goroutine解鎖它。
func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }
 
    // Fast path: drop lock bit.
    /** 如果state不是處於鎖的狀態, 那么就是Unlock根本沒有加鎖的mutex, panic  */
    // state -1 標識解鎖 (移除鎖定標記)
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        // Outlined slow path to allow inlining the fast path.
        // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
        m.unlockSlow(new)
    }
}
 
func (m *Mutex) unlockSlow(new int32) {
    /**
    釋放了鎖,還得需要通知其它等待者
    被通知的 goroutine 會去做下面的事情
    鎖如果處於飢餓狀態,直接交給等待隊列的第一個, 喚醒它,讓它去獲取鎖
    鎖如果處於正常狀態,則需要喚醒對頭的goroutine 讓它和新來的goroutine去競爭鎖,當然極大幾率為失敗,
        這時候 被喚醒的goroutine需要排隊在隊列的前面 (然后自旋)。如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那么它就會變為飢餓模式
     */
    // 再次校驗下 標識,new state如果是正常狀態, 驗證鎖狀態是否符合
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    // xxxx...x0xx & 0100 = 0 ;判斷是否處於正常模式
    if new&mutexStarving == 0 {
        old := new // 記錄緩存值
        for {
            // If there are no waiters or a goroutine has already
            // been woken or grabbed the lock, no need to wake anyone.
            // In starvation mode ownership is directly handed off from unlocking
            // goroutine to the next waiter. We are not part of this chain,
            // since we did not observe mutexStarving when we unlocked the mutex above.
            // So get off the way.
 
            // 如果沒有等待的goroutine或goroutine不處於空閑,則無需喚醒任何人
            // 在飢餓模式下,鎖的所有權直接從解鎖goroutine交給下一個 正在等待的goroutine (等待隊列中的第一個)。
            // 注意: old&(mutexLocked|mutexWoken|mutexStarving) 中,因為在最上面已經 -mutexLocked 並且進入了 if new&mutexStarving == 0
            // 說明目前 只有在還有goroutine 或者 被喚醒的情況下才會 old&(mutexLocked|mutexWoken|mutexStarving) != 0
            // 即:當休眠隊列內的等待計數為 0  或者 是正常但是 處於被喚醒或者被鎖定狀態,退出
            // old&(mutexLocked|mutexWoken|mutexStarving) != 0     xxxx...x0xx & (0001 | 0010 | 0100) => xxxx...x0xx & 0111 != 0
            /**
             如果沒有等待的goroutine, 或者鎖不處於空閑的狀態,直接返回.
             */
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            // Grab the right to wake someone.
            // 減少等待goroutine個數,並添加 喚醒標識
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            /** 設置新的state, 這里通過信號量去喚醒一個阻塞的goroutine去獲取鎖. */
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 釋放鎖,發送釋放信號 (解除 阻塞信號量)
                runtime_Semrelease(&m.sema, false, 1)
                return
            }
            // 賦值給中轉變量,然后啟動下一輪
            old = m.state
        }
    } else {
        // Starving mode: handoff mutex ownership to the next waiter, and yield
        // our time slice so that the next waiter can start to run immediately.
        // Note: mutexLocked is not set, the waiter will set it after wakeup.
        // But mutex is still considered locked if mutexStarving is set,
        // so new coming goroutines won't acquire it.
        /**
        飢餓模式下:
        直接將鎖的擁有權傳給等待隊列中的第一個.
        注意:
        此時state的mutexLocked還沒有加鎖,喚醒的goroutine會設置它。
        在此期間,如果有新的goroutine來請求鎖, 因為mutex處於飢餓狀態, mutex還是被認為處於鎖狀態,
        新來的goroutine不會把鎖搶過去.
         */
        runtime_Semrelease(&m.sema, true, 1)
    }
}

Mutex鎖分為正常模式和飢餓模式。一開始默認處於正常模式。在正常模式中,每個新加入競爭鎖行列的協程都會直接參與到鎖的競爭當中來,而處於飢餓模式時,所有新進入的協程都會直接被放入等待隊列中掛起,直到其所在隊列之前的協程全部執行完畢。

在正常模式中協程的掛起等待時間如果大於某個值,就會進入飢餓模式。
其中,state用來保存mutex的狀態量,低一位表示是否上鎖,低二位表示當前鎖對象是否被喚醒,低三位表示該鎖是否處於飢餓狀態,而其余位表示當前正被該鎖阻塞的協程數。而sema則是作為信號量來作為阻塞的依據。

上述 32位的整數映射到state 字段上的情景為:

state: |32|31|...| |3|2|1|
 
         \__________/ | | |
              | | | |
              | | | mutex的占用狀態(1被占用,0可用)
 
              | | |
              | | mutex的當前goroutine是否被喚醒
              | |
              | 飢餓位,0正常,1飢餓
 
              |
               等待喚醒以嘗試鎖定的goroutine的計數,0表示沒有等待者

上述很的運算都是位運算,原因是:鎖在同一時刻可能具備多個狀態,還有一個原因就是state字段 只有 低位的三位是用來控制狀態的,而其他的位都是用來做計數的,所以不能直接賦值操作,而是用了位運算賦值。

【正常模式】,等待的goroutines按照 FIFO(先進先出)順序排隊,但是goroutine被喚醒之后並不能立即得到mutex鎖,它需要與新到達的goroutine爭奪mutex鎖。 因為新到達的goroutine已經在CPU上運行了,所以被喚醒的goroutine很大概率是爭奪mutex鎖是失敗的。出現這樣的情況時候,被喚醒的goroutine需要排隊在隊列的前面。 如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那么它就會變為 飢餓模式。 在飢餓模式中,mutex鎖直接從解鎖的goroutine交給隊列前面的goroutine。新達到的goroutine也不會去爭奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待隊列尾部排隊。

【飢餓模式】,有一個goroutine獲取到mutex鎖了,如果它滿足下條件中的任意一個,mutex將會切換回去正常模式: 1. 是等待隊列中的最后一個goroutine 。2. 它的等待時間不超過1ms。 正常模式有更好的性能,因為goroutine可以連續多次獲得mutex鎖; 飢餓模式需要預防隊列尾部goroutine一直無法獲取mutex鎖的問題。

嘗試獲取mutex的goroutine也有狀態,有可能它是新來的goroutine,也有可能是被喚醒的goroutine, 可能是處於正常狀態的goroutine, 也有可能是處於飢餓狀態的goroutine

Lock ()

Unlock

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM