Golang Sync.WaitGroup 使用及原理


Golang Sync.WaitGroup 使用及原理

使用

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println("Hello WaitGroup!")
		}()
	}
	wg.Wait()
}

實現

首先看 waitgroup 到底是什么數據結構

type WaitGroup struct {
	noCopy noCopy
	state1 [3]uint32
}

nocopy 避免這個結構體被復制的一個技巧,可以告訴go vet工具違反了復制使用的規則
state1 [3]uint32 字段中包含了 waitgroup 的所有狀態信息, 根據標准庫上自帶的注釋簡單翻譯是:state1 由 12 個字節組成,其中將8字節看作64位值,其中高32位存放的是 counter 計數器, 代表目前還未完成的 goroutine個數,低32位存放的是 waiter 計數器, 可以理解成下面這個結構體

type WaitGroup struct {
	// 代表目前尚未完成的個數
	// WaitGroup.Add(n) 將會導致 counter += n
	// WaitGroup.Done() 將導致 counter--
	counter uint32

	// 目前已調用 WaitGroup.Wait 的 goroutine 的個數
	waiter  uint32

	// 對應於 golang 中 runtime 內部的信號量的實現
	// runtime_Semacquire 表示增加一個信號量,並掛起當前 goroutine
	// runtime_Semrelease 表示減少一個信號量,並喚醒 sema 上其中一個正在等待的 goroutine
	sema    uint32
}

整個使用流程為:

  1. 當調用 WaitGroup.Add(n) 時,counter 將會自增: counter += n
  2. 當調用 WaitGroup.Wait() 時,會將 waiter++。同時調用 runtime_Semacquire(semap), 增加信號量,並掛起當前 goroutine。
  3. 當調用 WaitGroup.Done() 時,將會 counter--。如果自減后的 counter 等於 0,說明 WaitGroup 的等待過程已經結束,則需要調用 runtime_Semrelease 釋放信號量,喚醒正在 WaitGroup.Wait 的 goroutine。

源碼中是如何拆分 state 字段的

func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {  
   if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {  
      // 如果地址是64bit對齊的,數組前兩個元素做state,后一個元素做信號量
	  return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]  
   } else {
      // 如果地址是32bit對齊的,數組后兩個元素用來做state
      // 它可以用來做64bit的原子操作,第一個元素32bit用來做信號量
      return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]  
   }  
}

由於我們能使用到的就是 waitgroup.Add(), waitgroup.Done(), waitgroup.Wait() 這三個方法,就按這三個方法分析

Add(), Done()

Add 方法主要操作的是 state 的計數部分。你可以為計數值增加一個 delta 值,內部通過原子操作把這個值加到計數值上。需要注意的是,這個 delta 也可以是個負數,相當於為計數值減去一個值,Done 方法內部其實就是通過 Add(-1) 實現的。

func (wg *WaitGroup) Add(delta int) {  
   // 獲取拆開后的 state 字段 
   statep, semap := wg.state()  
   ...
   ...
   ...
   // 在剛剛說的 int64 的高32位上加傷傳進來的 delta 的值, 這一步是原子操作
   state := atomic.AddUint64(statep, uint64(delta)<<32)  
   // 加好后,獲取 counter 也就是 v, 和 waiter 也就是 w 的值
   // 此時 int64 變為兩個 int32
   v := int32(state >> 32)  
   w := uint32(state)  
   // 如果 v 變為負數了,程序異常
   if v < 0 {  
      panic("sync: negative WaitGroup counter")  
   }  
   // 在 wait 沒結束之前, 不允許調用 Add 方法
   if w != 0 && delta > 0 && v == int32(delta) {  
      panic("sync: WaitGroup misuse: Add called concurrently with Wait")  
   }

   // 調用 add() 之后, 還有正在執行的 goroutine 或者 waiter 等於 0, 正常返回
   if v > 0 || w == 0 {  
      return  
   }  
   // 下面就是非正常返回, 理解到的就是 v 已經等於 0 了,執行釋放操作
   // 首先就是將 counter 和 waiter 全部重置為 0
 *statep = 0  
   // 然后循環調用還在等待的 waiter, 釋放信號量
 for ; w != 0; w-- {  
      runtime_Semrelease(semap, false, 0)  
   }  
}

wait()

Wait 方法的實現邏輯是:不斷檢查 state 的值。如果其中的計數值變為了 0,那么說明所有的任務已完成,調用者不必再等待,直接返回。如果計數值大於 0,說明此時還有任務沒完成,那么調用者就變成了等待者,需要加入 waiter 隊列,並且阻塞住自己。

func (wg *WaitGroup) Wait() {
   // 獲取信號量和兩個計數值
   statep, semap := wg.state()  

   // 不停的循環檢查 counter 和 waiter
   for {
      // 先原子性的取出 counter 和 waiter
      state := atomic.LoadUint64(statep)  
      v := int32(state >> 32)  
      w := uint32(state)
      if v == 0 {  
         // counter 已經沒有了,函數可以返回
         return  
 }  
      // 將 waiter 數 + 1
 if atomic.CompareAndSwapUint64(statep, state, state+1) {  
         // 放到信號量隊列, 並且阻塞住自己
         runtime_Semacquire(semap)
         // 如果被喚醒,檢查 兩個計數是否已經為0 了, 如果不為0 ,則觸發恐慌
         if *statep != 0 {  
            panic("sync: WaitGroup is reused before previous Wait has returned")  
         }
          // 函數返回
         return  
 }  
   }  
}

總結

  1. 保證計數器不能為負值
  2. 保證 Add() 方法全部調用完成之后再調用 Wait()
  3. waitgroup 可以重復使用
  4. atomic 原子操作代替鎖, 提高並發性
  5. 合並兩個 int32 為一個 int64 提高讀取存入數據性能
  6. 對於不希望被復制的結構體, 可以使用 noCopy 字段

reference

https://www.cyhone.com/articles/golang-waitgroup/
https://time.geekbang.org/column/intro/100061801?tab=catalog


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM