31 | sync.WaitGroup和sync.Once
我們在前幾次講的互斥鎖、條件變量和原子操作都是最基本重要的同步工具。在 Go 語言中,除了通道之外,它們也算是最為常用的並發安全工具了。
說到通道,不知道你想過沒有,之前在一些場合下里,我們使用通道的方式看起來都似乎有些蹩腳。
比如:聲明一個通道,使它的容量與我們手動啟用的 goroutine 的數量相同,之后再利用這個通道,讓主 goroutine 等待其他 goroutine 的運行結束。
這一步更具體地說就是:讓其他的 goroutine 在運行結束之前,都向這個通道發送一個元素值,並且,讓主 goroutine 在最后從這個通道中接收元素值,接收的次數需要與其他的 goroutine 的數量相同。
這就是下面的coordinateWithChan函數展示的多 goroutine 協作流程。
func coordinateWithChan() {
sign := make(chan struct{}, 2)
num := int32(0)
fmt.Printf("The number: %d [with chan struct{}]\n", num)
max := int32(10)
go addNum(&num, 1, max, func() {
sign <- struct{}{}
})
go addNum(&num, 2, max, func() {
sign <- struct{}{}
})
<-sign
<-sign
}
其中的addNum函數的聲明在 demo65.go 文件中。addNum函數會把它接受的最后一個參數值作為其中的defer函數。
我手動啟用的兩個 goroutine 都會調用addNum函數,而它們傳給該函數的最后一個參數值(也就是那個既無參數聲明,也無結果聲明的函數)都只會做一件事情,那就是向通道sign發送一個元素值。
看到coordinateWithChan函數中最后的那兩行代碼了嗎?重復的兩個接收表達式<-sign,是不是看起來很丑陋?
前導內容:sync包的WaitGroup類型
其實,在這種應用場景下,我們可以選用另外一個同步工具,即:sync包的WaitGroup類型。它比通道更加適合實現這種一對多的 goroutine 協作流程。
sync.WaitGroup類型(以下簡稱WaitGroup類型)是開箱即用的,也是並發安全的。同時,與我們前面討論的幾個同步工具一樣,它一旦被真正使用就不能被復制了。
WaitGroup類型擁有三個指針方法:Add、Done和Wait。你可以想象該類型中有一個計數器,它的默認值是0。我們可以通過調用該類型值的Add方法來增加,或者減少這個計數器的值。
一般情況下,我會用這個方法來記錄需要等待的 goroutine 的數量。相對應的,這個類型的Done方法,用於對其所屬值中計數器的值進行減一操作。我們可以在需要等待的 goroutine 中,通過defer語句調用它。
而此類型的Wait方法的功能是,阻塞當前的 goroutine,直到其所屬值中的計數器歸零。如果在該方法被調用的時候,那個計數器的值就是0,那么它將不會做任何事情。
你可能已經看出來了,WaitGroup類型的值(以下簡稱WaitGroup值)完全可以被用來替換coordinateWithChan函數中的通道sign。下面的coordinateWithWaitGroup函數就是它的改造版本。
func coordinateWithWaitGroup() {
var wg sync.WaitGroup
wg.Add(2)
num := int32(0)
fmt.Printf("The number: %d [with sync.WaitGroup]\n", num)
max := int32(10)
go addNum(&num, 3, max, wg.Done)
go addNum(&num, 4, max, wg.Done)
wg.Wait()
}
很明顯,整體代碼少了好幾行,而且看起來也更加簡潔了。這里我先聲明了一個WaitGroup類型的變量wg。然后,我調用了它的Add方法並傳入了2,因為我會在后面啟用兩個需要等待的 goroutine。
由於wg變量的Done方法本身就是一個既無參數聲明,也無結果聲明的函數,所以我在go語句中調用addNum函數的時候,可以直接把該方法作為最后一個參數值傳進去。
在coordinateWithWaitGroup函數的最后,我調用了wg的Wait方法。如此一來,該函數就可以等到那兩個 goroutine 都運行結束之后,再結束執行了。
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
func main() {
coordinateWithChan()
fmt.Println()
coordinateWithWaitGroup()
}
func coordinateWithChan() {
sign := make(chan struct{}, 2)
num := int32(0)
fmt.Printf("The number: %d [with chan struct{}]\n", num)
max := int32(10)
go addNum(&num, 1, max, func() {
sign <- struct{}{}
})
go addNum(&num, 2, max, func() {
sign <- struct{}{}
})
<-sign
<-sign
}
func coordinateWithWaitGroup() {
var wg sync.WaitGroup
wg.Add(2)
num := int32(0)
fmt.Printf("The number: %d [with sync.WaitGroup]\n", num)
max := int32(10)
go addNum(&num, 3, max, wg.Done)
go addNum(&num, 4, max, wg.Done)
wg.Wait()
}
// addNum 用於原子地增加numP所指的變量的值。
func addNum(numP *int32, id, max int32, deferFunc func()) {
defer func() {
deferFunc()
}()
for i := 0; ; i++ {
currNum := atomic.LoadInt32(numP)
if currNum >= max {
break
}
newNum := currNum + 2
time.Sleep(time.Millisecond * 200)
if atomic.CompareAndSwapInt32(numP, currNum, newNum) {
fmt.Printf("The number: %d [%d-%d]\n", newNum, id, i)
} else {
fmt.Printf("The CAS operation failed. [%d-%d]\n", id, i)
}
}
}
以上就是WaitGroup類型最典型的應用場景了。不過不能止步於此,對於這個類型,我們還是有必要再深入了解一下的。我們一起看下面的問題。
問題:sync.WaitGroup類型值中計數器的值可以小於0嗎?
這里的典型回答是:不可以。
問題解析
為什么不可以呢,我們解析一下。之所以說WaitGroup值中計數器的值不能小於0,是因為這樣會引發一個 panic。 不適當地調用這類值的Done方法和Add方法都會如此。別忘了,我們在調用Add方法的時候是可以傳入一個負數的。
實際上,導致WaitGroup值的方法拋出 panic 的原因不只這一種。
你需要知道,在我們聲明了這樣一個變量之后,應該首先根據需要等待的 goroutine,或者其他事件的數量,調用它的Add方法,以使計數器的值大於0。這是確保我們能在后面正常地使用這類值的前提。
如果我們對它的Add方法的首次調用,與對它的Wait方法的調用是同時發起的,比如,在同時啟用的兩個 goroutine 中,分別調用這兩個方法,那么就有可能會讓這里的Add方法拋出一個 panic。
這種情況不太容易復現,也正因為如此,我們更應該予以重視。所以,雖然WaitGroup值本身並不需要初始化,但是盡早地增加其計數器的值,還是非常有必要的。
另外,你可能已經知道,WaitGroup值是可以被復用的,但需要保證其計數周期的完整性。這里的計數周期指的是這樣一個過程:該值中的計數器值由0變為了某個正整數,而后又經過一系列的變化,最終由某個正整數又變回了0。
也就是說,只要計數器的值始於0又歸為0,就可以被視為一個計數周期。在一個此類值的生命周期中,它可以經歷任意多個計數周期。但是,只有在它走完當前的計數周期之后,才能夠開始下一個計數周期。
(sync.WaitGroup 的計數周期)
因此,也可以說,如果一個此類值的Wait方法在它的某個計數周期中被調用,那么就會立即阻塞當前的 goroutine,直至這個計數周期完成。在這種情況下,該值的下一個計數周期,必須要等到這個Wait方法執行結束之后,才能夠開始。
如果在一個此類值的Wait方法被執行期間,跨越了兩個計數周期,那么就會引發一個 panic。
例如,在當前的 goroutine 因調用此類值的Wait方法,而被阻塞的時候,另一個 goroutine 調用了該值的Done方法,並使其計數器的值變為了0。
這會喚醒當前的 goroutine,並使它試圖繼續執行Wait方法中其余的代碼。但在這時,又有一個 goroutine 調用了它的Add方法,並讓其計數器的值又從0變為了某個正整數。此時,這里的Wait方法就會立即拋出一個 panic。
縱觀上述會引發 panic 的后兩種情況,我們可以總結出這樣一條關於WaitGroup值的使用禁忌,即:不要把增加其計數器值的操作和調用其Wait方法的代碼,放在不同的 goroutine 中執行。換句話說,要杜絕對同一個WaitGroup值的兩種操作的並發執行。
除了第一種情況外,我們通常需要反復地實驗,才能夠讓WaitGroup值的方法拋出 panic。再次強調,雖然這不是每次都發生,但是在長期運行的程序中,這種情況發生的概率還是不小的,我們必須要重視它們。
如果你對復現這些異常情況感興趣,那么可以參看sync代碼包中的 waitgroup_test.go 文件。其中的名稱以TestWaitGroupMisuse為前綴的測試函數,很好地展示了這些異常情況的發生條件。你可以模仿這些測試函數自己寫一些測試代碼,執行一下試試看。
知識擴展
問題:sync.Once類型值的Do方法是怎么保證只執行參數函數一次的?
與sync.WaitGroup類型一樣,sync.Once類型(以下簡稱Once類型)也屬於結構體類型,同樣也是開箱即用和並發安全的。由於這個類型中包含了一個sync.Mutex類型的字段,所以,復制該類型的值也會導致功能的失效。
Once類型的Do方法只接受一個參數,這個參數的類型必須是func(),即:無參數聲明和結果聲明的函數。
該方法的功能並不是對每一種參數函數都只執行一次,而是只執行“首次被調用時傳入的”那個函數,並且之后不會再執行任何參數函數。
所以,如果你有多個只需要執行一次的函數,那么就應該為它們中的每一個都分配一個sync.Once類型的值(以下簡稱Once值)。
Once類型中還有一個名叫done的uint32類型的字段。它的作用是記錄其所屬值的Do方法被調用的次數。不過,該字段的值只可能是0或者1。一旦Do方法的首次調用完成,它的值就會從0變為1。
你可能會問,既然done字段的值不是0就是1,那為什么還要使用需要四個字節的uint32類型呢?
原因很簡單,因為對它的操作必須是“原子”的。Do方法在一開始就會通過調用atomic.LoadUint32函數來獲取該字段的值,並且一旦發現該值為1,就會直接返回。這也初步保證了“Do方法,只會執行首次被調用時傳入的函數”。
不過,單憑這樣一個判斷的保證是不夠的。因為,如果有兩個 goroutine 都調用了同一個新的Once值的Do方法,並且幾乎同時執行到了其中的這個條件判斷代碼,那么它們就都會因判斷結果為false,而繼續執行Do方法中剩余的代碼。
在這個條件判斷之后,Do方法會立即鎖定其所屬值中的那個sync.Mutex類型的字段m。然后,它會在臨界區中再次檢查done字段的值,並且僅在條件滿足時,才會去調用參數函數,以及用原子操作把done的值變為1。
如果你熟悉 GoF 設計模式中的單例模式的話,那么肯定能看出來,這個Do方法的實現方式,與那個單例模式有很多相似之處。它們都會先在臨界區之外,判斷一次關鍵條件,若條件不滿足則立即返回。這通常被稱為 “快路徑”,或者叫做“快速失敗路徑”。
如果條件滿足,那么到了臨界區中還要再對關鍵條件進行一次判斷,這主要是為了更加嚴謹。這兩次條件判斷常被統稱為(跨臨界區的)“雙重檢查”。
由於進入臨界區之前,肯定要鎖定保護它的互斥鎖m,顯然會降低代碼的執行速度,所以其中的第二次條件判斷,以及后續的操作就被稱為“慢路徑”或者“常規路徑”。
別看Do方法中的代碼不多,但它卻應用了一個很經典的編程范式。我們在 Go 語言及其標准庫中,還能看到不少這個經典范式及它衍生版本的應用案例。
下面我再來說說這個Do方法在功能方面的兩個特點。
第一個特點,由於Do方法只會在參數函數執行結束之后把done字段的值變為1,因此,如果參數函數的執行需要很長時間或者根本就不會結束(比如執行一些守護任務),那么就有可能會導致相關 goroutine 的同時阻塞。
例如,有多個 goroutine 並發地調用了同一個Once值的Do方法,並且傳入的函數都會一直執行而不結束。那么,這些 goroutine 就都會因調用了這個Do方法而阻塞。因為,除了那個搶先執行了參數函數的 goroutine 之外,其他的 goroutine 都會被阻塞在鎖定該Once值的互斥鎖m的那行代碼上。
第二個特點,Do方法在參數函數執行結束后,對done字段的賦值用的是原子操作,並且,這一操作是被掛在defer語句中的。因此,不論參數函數的執行會以怎樣的方式結束,done字段的值都會變為1。
也就是說,即使這個參數函數沒有執行成功(比如引發了一個 panic),我們也無法使用同一個Once值重新執行它了。所以,如果你需要為參數函數的執行設定重試機制,那么就要考慮Once值的適時替換問題。
在很多時候,我們需要依據Do方法的這兩個特點來設計與之相關的流程,以避免不必要的程序阻塞和功能缺失。
package main
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
func main() {
// 示例1。
var counter uint32
var once sync.Once
once.Do(func() {
atomic.AddUint32(&counter, 1)
})
fmt.Printf("The counter: %d\n", counter)
once.Do(func() {
atomic.AddUint32(&counter, 2)
})
fmt.Printf("The counter: %d\n", counter)
fmt.Println()
// 示例2。
once = sync.Once{}
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
once.Do(func() {
for i := 0; i < 3; i++ {
fmt.Printf("Do task. [1-%d]\n", i)
time.Sleep(time.Second)
}
})
fmt.Println("Done. [1]")
}()
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 500)
once.Do(func() {
fmt.Println("Do task. [2]")
})
fmt.Println("Done. [2]")
}()
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 500)
once.Do(func() {
fmt.Println("Do task. [3]")
})
fmt.Println("Done. [3]")
}()
wg.Wait()
fmt.Println()
// 示例3。
once = sync.Once{}
wg.Add(2)
go func() {
defer wg.Done()
defer func() {
if p := recover(); p != nil {
fmt.Printf("fatal error: %v\n", p)
}
}()
once.Do(func() {
fmt.Println("Do task. [4]")
panic(errors.New("something wrong"))
//fmt.Println("Done. [4]")
})
}()
go func() {
defer wg.Done()
time.Sleep(time.Millisecond * 500)
once.Do(func() {
fmt.Println("Do task. [5]")
})
fmt.Println("Done. [5]")
}()
wg.Wait()
}
總結
sync代碼包的WaitGroup類型和Once類型都是非常易用的同步工具。它們都是開箱即用和並發安全的。
利用WaitGroup值,我們可以很方便地實現一對多的 goroutine 協作流程,即:一個分發子任務的 goroutine,和多個執行子任務的 goroutine,共同來完成一個較大的任務。
在使用WaitGroup值的時候,我們一定要注意,千萬不要讓其中的計數器的值小於0,否則就會引發 panic。
另外,我們最好用“先統一Add,再並發Done,最后Wait”這種標准方式,來使用WaitGroup值。 尤其不要在調用Wait方法的同時,並發地通過調用Add方法去增加其計數器的值,因為這也有可能引發 panic。
Once值的使用方式比WaitGroup值更加簡單,它只有一個Do方法。同一個Once值的Do方法,永遠只會執行第一次被調用時傳入的參數函數,不論這個函數的執行會以怎樣的方式結束。
只要傳入某個Do方法的參數函數沒有結束執行,任何之后調用該方法的 goroutine 就都會被阻塞。只有在這個參數函數執行結束以后,那些 goroutine 才會逐一被喚醒。
Once類型使用互斥鎖和原子操作實現了功能,而WaitGroup類型中只用到了原子操作。 所以可以說,它們都是更高層次的同步工具。它們都基於基本的通用工具,實現了某一種特定的功能。sync包中的其他高級同步工具,其實也都是這樣的。
思考題
今天的思考題是:在使用WaitGroup值實現一對多的 goroutine 協作流程時,怎樣才能讓分發子任務的 goroutine 獲得各個子任務的具體執行結果?
筆記源碼
https://github.com/MingsonZheng/go-core-demo
本作品采用知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議進行許可。
歡迎轉載、使用、重新發布,但務必保留文章署名 鄭子銘 (包含鏈接: http://www.cnblogs.com/MingsonZheng/ ),不得用於商業目的,基於本文修改后的作品務必以相同的許可發布。