Golang的sync.WaitGroup 實現邏輯和源碼解析


方便的並發,是Golang的一大特色優勢,而使用並發,對sync包的WaitGroup不會陌生。WaitGroup主要用來做Golang並發實例即Goroutine的等待,當使用go啟動多個並發程序,通過waitgroup可以等待所有go程序結束后再執行后面的代碼邏輯,比如:

 

func Main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(10 * time.Second)
        }()

    }
    wg.Wait() // 等待在此,等所有go func里都執行了Done()才會退出
}

WaitGroup對外提供三個方法,Add(int),Done()和Wait(), 其中Done()是調用了Add(-1),一般使用方法是,先統一Add,在goroutine里並發的Done,然后Wait。

WaitGroup主要維護了2個計數器,一個是請求計數器 v,一個是等待計數器 w,二者組成一個64bit的值,請求計數器占高32bit,等待計數器占低32bit。

每次Add執行,請求計數器v加1,Done方法執行,請求計數器減1,v為0時通過信號量喚醒Wait()。

那么等待計數器拿來干嘛?是因為同一個實例的Wait()方法支持多處調用,每一次Wait()方法執行,等待計數器 w 就會加1,而當請求計數器v為0觸發Wait()時,要根據w的數量發送w份的信號量,正確的觸發所有的Wait(),這雖然不是常用的一個特性,但是在一些特殊場合是有用處的(比如多個並發都依賴於WaitGroup的實例的結束信號來進行下一個action),演示代碼如下:

func main() {
  wg := sync.WaitGroup{}
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
​    }()
  }
  time.Sleep(2 * time.Second)
  for j := 0; j < 3; j++ {
    go func(i int) {
      // 3個地方調用Wait(),通過等待j計時器,每個Wati都會被hu喚醒
      wg.Wait()
      fmt.Println("wait done now ", i)
    }(j)
  }
  time.Sleep(10 * time.Second)
  return
}
/*
輸出如下,數字出現的順序隨機
wait done now  1
wait done now  0
wait done now  2
*/

同時,WaitGroup里還對使用邏輯進行了嚴格的檢查,比如Wait()一旦開始不能Add().

下面是帶注釋的代碼,去掉了不影響代碼邏輯的trace部分:

func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    // 更新statep,statep將在wait和add中通過原子操作一起使用
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
        if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        // wait不等於0說明已經執行了Wait,此時不容許Add
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 正常情況,Add會讓v增加,Done會讓v減少,如果沒有全部Done掉,此處v總是會大於0的,直到v為0才往下走
    // 而w代表是有多少個goruntine在等待done的信號,wait中通過compareAndSwap對這個w進行加1
     if v > 0 || w == 0 {
        return
    }
    // This goroutine has set counter to 0 when waiters > 0.
    // Now there can't be concurrent mutations of state:
    // - Adds must not happen concurrently with Wait,
    // - Wait does not increment waiters if it sees counter == 0.
    // Still do a cheap sanity check to detect WaitGroup misuse.
    // 當v為0(Done掉了所有)或者w不為0(已經開始等待)才會到這里,但是在這個過程中又有一次Add,導致statep變化,panic
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // Reset waiters count to 0.
    // 將statep清0,在Wait中通過這個值來保護信號量發出后還對這個Waitgroup進行操作
    *statep = 0
    // 將信號量發出,觸發wait結束
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false)
    }
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
    statep := wg.state()
        for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        if v == 0 {
            // Counter is 0, no need to wait.
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // Increment waiters count.
        // 如果statep和state相等,則增加等待計數,同時進入if等待信號量
        // 此處做CAS,主要是防止多個goroutine里進行Wait()操作,每有一個goroutine進行了wait,等待計數就加1
        // 如果這里不相等,說明statep,在 從讀出來 到 CAS比較 的這個時間區間內,被別的goroutine改寫了,那么不進入if,回去再讀一次,這樣寫避免用鎖,更高效些
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            if race.Enabled && w == 0 {
                // Wait must be synchronized with the first Add.
                // Need to model this is as a write to race with the read in Add.
                // As a consequence, can do the write only for the first waiter,
                // otherwise concurrent Waits will race with each other.
                race.Write(unsafe.Pointer(&wg.sema))
            }
            // 等待信號量
            runtime_Semacquire(&wg.sema)
            // 信號量來了,代表所有Add都已經Done
            if *statep != 0 {
                // 走到這里,說明在所有Add都已經Done后,觸發信號量后,又被執行了Add
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM