直接上代碼:
package main import ( "fmt" "runtime" "strconv" "sync" ) func say(str string) { for i := 0; i < 5; i++ { runtime.Gosched() fmt.Println(str) } } func sayStat(str string, ch chan int64) { for i := 0; i < 5000; i++ { runtime.Gosched() fmt.Println(str) ch <- int64(i) } close(ch) } func sayStat_2_Worker(str string, ch chan int64) { sum := 0 for i := 0; i < 5000; i++ { runtime.Gosched() fmt.Println(str) sum += i } ch <- int64(sum) // close(ch) } func gen(done <-chan struct{}, nums ...int) <-chan int { out := make(chan int) go func() { defer close(out) for _, i := range nums { select { case out <- i: case <-done: return } } }() return out } func square(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for c := range in { select { case out <- c * c: case <-done: return } } }() return out } func merge(done <-chan struct{}, ins ...<-chan int) <-chan int { var wg sync.WaitGroup wg.Add(len(ins)) out := make(chan int) // ERROR: http://studygolang.com/articles/7994 // REF: "for"聲明中的迭代變量和閉包 // for _, in := range ins { // go func() { // for c := range in { // out <- c // } // wg.Done() // }() // } // Solution1: New func Outline // ff := func(in <-chan int) { // for c := range in { // out <- c // } // wg.Done() // } // for _, in := range ins { // go ff(in) // } // Solution2: Inline func with parameter // for _, in := range ins { // go func(in <-chan int) { // for c := range in { // out <- c // } // wg.Done() // }(in) // } // Solution3: Inline func with parameter copy bak for _, in := range ins { in_copy := in go func() { defer wg.Done() for c := range in_copy { select { case out <- c: case <-done: return } } }() } go func() { wg.Wait() close(out) }() return out } func genNew(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out } func main() { // DEFAULT VALUE: NUMBER OF CPU CORE fmt.Println(runtime.GOMAXPROCS(-1)) runtime.Gosched() fmt.Println(runtime.GOMAXPROCS(-1)) fmt.Println(runtime.NumCPU()) // go say("hello") // say("world") ch := make(chan int64) go sayStat("hello", ch) // go sayStat("hello", ch) // sayStat("world", ch) var stat int64 = 0 for c := range ch { fmt.Println(c) stat += c } fmt.Println(stat) // 12497500 // // DEAD LOCK ! // cc := make(chan int) // // NO GOROUTINE RECEIVE THE UNBUFFERED CHANNEL DATA ! // cc <- 888 // fmt.Println(<-cc) stat = 0 cc := make(chan int64) worker_num := 2 for i := 0; i < worker_num; i++ { go sayStat_2_Worker("TEST-"+strconv.Itoa(i), cc) } for i := 0; i < worker_num; i++ { stat += <-cc } close(cc) fmt.Println(stat) // 12497500 * 2 = 24995000 // out := square(gen(1, 2, 3, 4, 5)) // for c := range out { // fmt.Println(c) // } done := make(chan struct{}) // defer close(done) out_new := gen(done, 1, 2, 3, 4, 5) c1 := square(done, out_new) c2 := square(done, out_new) // for r1 := range c1 { // fmt.Println(r1) // } // for r2 := range c2 { // fmt.Println(r2) // } // for r := range merge(c1, c2) { // fmt.Println(r) // } mg := merge(done, c1, c2) fmt.Println(<-mg) fmt.Println(<-mg) fmt.Println(<-mg) close(done) // fmt.Println(<-mg) // fmt.Println(<-mg) // fmt.Println(<-mg) // fmt.Println(<-mg) for { if msg, closed := <-mg; !closed { fmt.Println("<-mg has closed!") return } else { fmt.Println(msg) } } // gen_new := genNew(1, 2, 3, 4, 5) // // close(gen_new) // for gn := range gen_new { // fmt.Println(gn) // } }
簡介
Go語言的並發原語允許開發者以類似於 Unix Pipe 的方式構建數據流水線 (data pipelines),數據流水線能夠高效地利用 I/O和多核 CPU 的優勢。
本文要講的就是一些使用流水線的一些例子,流水線的錯誤處理也是本文的重點。
閱讀建議
數據流水線充分利用了多核特性,代碼層面是基於 channel 類型 和 go 關鍵字。
channel 和 go 貫穿本文的始終。如果你對這兩個概念不太了解,建議先閱讀之前公眾號發布的兩篇文章:Go 語言內存模型(上/下)。
如果你對操作系統中"生產者"和"消費者"模型比較了解的話,也將有助於對本文中流水線的理解。
本文中絕大多數講解都是基於代碼進行的。換句話說,如果你看不太懂某些代碼片段,建議補全以后,在機器或play.golang.org 上運行一下。對於某些不明白的細節,可以手動添加一些語句以助於理解。
由於 Go語言並發模型 的英文原文 Go Concurrency Patterns: Pipelines and cancellation 篇幅比較長,本文只包含 理論推導和簡單的例子。
下一篇文章我們會對 "並行MD5" 這個現實生活的例子進行詳細地講解。
什么是 "流水線" (pipeline)?
對於"流水線"這個概念,Go語言中並沒有正式的定義,它只是很多種並發方式的一種。這里我給出一個非官方的定義:一條流水線是 是由多個階段組成的,相鄰的兩個階段由 channel 進行連接;每個階段是由一組在同一個函數中啟動的 goroutine 組成。在每個階段,這些 goroutine 會執行下面三個操作:
-
通過 inbound channels 從上游接收數據
-
對接收到的數據執行一些操作,通常會生成新的數據
-
將新生成的數據通過 outbound channels 發送給下游
除了第一個和最后一個階段,每個階段都可以有任意個 inbound 和 outbound channel。
顯然,第一個階段只有 outbound channel,而最后一個階段只有 inbound channel。
我們通常稱第一個階段為"生產者"
或"源頭"
,稱最后一個階段為"消費者"
或"接收者"
。
首先,我們通過一個簡單的例子來演示這個概念和其中的技巧。后面我們會更出一個真實世界的例子。
流水線入門:求平方數
假設我們有一個流水線,它由三個階段組成。
第一階段是 gen 函數,它能夠將一組整數轉換為channel,channel 可以將數字發送出去。
gen 函數首先啟動一個 goroutine,該goroutine 發送數字到 channel,當數字發送完時關閉channel。
代碼如下:
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out }
第二階段是 sq 函數,它從 channel 接收一個整數,然后返回 一個channel,返回的channel可以發送 接收到整數的平方。當它的 inbound channel 關閉,並且把所有數字均發送到下游時,會關閉 outbound channel。代碼如下:
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out }
main 函數 用於設置流水線並運行最后一個階段。最后一個階段會從第二階段接收數字,並逐個打印出來,直到來自於上游的 inbound channel關閉。代碼如下:
func main() { // 設置流水線 c := gen(2, 3) out := sq(c) // 消費輸出結果 fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 }
由於 sq 函數的 inbound channel 和 outbound channel 類型一樣,所以組合任意個 sq 函數。比如像下面這樣使用:
func main() {
// 設置流水線並消費輸出結果 for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } }
如果我們稍微修改一下 gen 函數,便可以模擬 haskell的惰性求值。有興趣的讀者可以自己折騰一下。
流水線進階:扇入和扇出
扇出:同一個 channel 可以被多個函數讀取數據,直到channel關閉。
這種機制允許將工作負載分發到一組worker,以便更好地並行使用 CPU 和 I/O。
扇入:多個 channel 的數據可以被同一個函數讀取和處理,然后合並到一個 channel,直到所有 channel都關閉。
下面這張圖對 扇入 有一個直觀的描述:
我們修改一下上個例子中的流水線,這里我們運行兩個 sq 實例,它們從同一個 channel 讀取數據。這里我們引入一個新函數 merge 對結果進行"扇入"操作:
func main() {
in := gen(2, 3) // 啟動兩個 sq 實例,即兩個goroutines處理 channel "in" 的數據 c1 := sq(in) c2 := sq(in) // merge 函數將 channel c1 和 c2 合並到一起,這段代碼會消費 merge 的結果 for n := range merge(c1, c2) { fmt.Println(n) // 打印 4 9, 或 9 4 } }
merge 函數 將多個 channel 轉換為一個 channel,它為每一個 inbound channel 啟動一個 goroutine,用於將數據
拷貝到 outbound channel。
merge 函數的實現見下面代碼 (注意 wg 變量):
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 為每一個輸入channel cs 創建一個 goroutine output // output 將數據從 c 拷貝到 out,直到 c 關閉,然后 調用 wg.Done output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // 啟動一個 goroutine,用於所有 output goroutine結束時,關閉 out // 該goroutine 必須在 wg.Add 之后啟動 go func() { wg.Wait() close(out) }() return out }
在上面的代碼中,每個 inbound channel 對應一個 output
函數。所有 output
goroutine 被創建以后,merge 啟動一個額外的 goroutine,
這個goroutine會等待所有 inbound channel 上的發送操作結束以后,關閉 outbound channel。
對已經關閉的channel 執行發送操作(ch<-)會導致異常,所以我們必須保證所有的發送操作都在關閉channel之前結束。
sync.WaitGroup 提供了一種組織同步的方式。
它保證 merge 中所有 inbound channel (cs ...<-chan int) 均被正常關閉, output goroutine 正常結束后,關閉 out channel。
停下來思考一下
在使用流水線函數時,有一個固定的模式:
-
在一個階段,當所有發送操作 (ch<-) 結束以后,關閉 outbound channel
-
在一個階段,goroutine 會持續從 inbount channel 接收數據,直到所有 inbound channel 全部關閉
在這種模式下,每一個接收階段都可以寫成 range
循環的方式,
從而保證所有數據都被成功發送到下游后,goroutine能夠立即退出。
在現實中,階段並不總是接收所有的 inbound 數據。有時候是設計如此:接收者可能只需要數據的一個子集就可以繼續執行。
更常見的情況是:由於前一個階段返回一個錯誤,導致該階段提前退出。
這兩種情況下,接收者都不應該繼續等待后面的值被傳送過來。
我們期望的結果是:當后一個階段不需要數據時,上游階段能夠停止生產。
在我們的例子中,如果一個階段不能消費所有的 inbound 數據,試圖發送這些數據的 goroutine 會永久阻塞。看下面這段代碼片段:
// 只消費 out 的第一個數據 out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // 由於我們不再接收 out 的第二個數據 // 其中一個 goroutine output 將會在發送時被阻塞 }
顯然這里存在資源泄漏。一方面goroutine 消耗內存和運行時資源,另一方面goroutine 棧中的堆引用會阻止 gc 執行回收操作。 既然goroutine 不能被回收,那么他們必須自己退出。
我們重新整理一下流水線中的不同階段,保證在下游階段接收數據失敗時,上游階段也能夠正常退出。
一個方式是使用帶有緩沖的管道作為 outbound channel。緩存可以存儲固定個數的數據。
如果緩存沒有用完,那么發送操作會立即返回。看下面這段代碼示例:
c := make(chan int, 2) // 緩沖大小為 2 c <- 1 // 立即返回 c <- 2 // 立即返回 c <- 3 // 該操作會被阻塞,直到有一個 goroutine 執行 <-c,並接收到數字 1
如果在創建 channel 時就知道要發送的值的個數,使用buffer就能夠簡化代碼。
仍然使用求平方數的例子,我們對 gen 函數進行重寫。我們將這組整型數拷貝到一個
緩沖 channel中,從而避免創建一個新的 goroutine:
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out }
回到 流水線中被阻塞的 goroutine,我們考慮讓 merge 函數返回一個緩沖管道:
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int, 1) // 在本例中存儲未讀的數據足夠了 // ... 其他部分代碼不變 ...
盡管這種方法解決了這個程序中阻塞 goroutine的問題,但是從長遠來看,它並不是好辦法。緩存大小選擇為1 是建立在兩個前提之上:
-
我們已經知道 merge 函數有兩個 inbound channel
-
我們已經知道下游階段會消耗多少個值
這段代碼很脆弱。如果我們在傳入一個值給 gen 函數,或者下游階段讀取的值變少,goroutine會再次被阻塞。
為了從根本上解決這個問題,我們需要提供一種機制,讓下游階段能夠告知上游發送者停止接收的消息。下面我們看下這種機制。
顯式取消 (Explicit cancellation)
當 main 函數決定退出,並停止接收 out 發送的任何數據時,它必須告訴上游階段的 goroutine 讓它們放棄
正在發送的數據。 main 函數通過發送數據到一個名為 done 的channel實現這樣的機制。 由於有兩個潛在的
發送者被阻塞,它發送兩個值。如下代碼所示:
func main() { in := gen(2, 3) // 啟動兩個運行 sq 的goroutine // 兩個goroutine的數據均來自於 in c1 := sq(in) c2 := sq(in) // 消耗 output 生產的第一個值 done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // 告訴其他發送者,我們將要離開 // 不再接收它們的數據 done <- struct{}{} done <- struct{}{} }
發送數據的 goroutine 使用一個 select 表達式代替原來的操作,select 表達式只有在接收到 out 或 done
發送的數據后,才會繼續進行下去。 done 的值類型為 struct{} ,因為它發送什么值不重要,重要的是它發送沒發送:
接收事件發生意味着 channel out 的發送操作被丟棄。 goroutine output 基於 inbound channel c 繼續執行
循環,所以上游階段不會被阻塞。(后面我們會討論如何讓循環提前退出)。 使用 done channel 方式實現的merge 函數如下:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 為 cs 的的每一個 輸入channel // 創建一個goroutine。output函數將 // 數據從 c 拷貝到 out,直到c關閉, // 或者接收到 done 信號; // 然后調用 wg.Done() output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ...
這種方法有一個問題:每一個下游的接收者需要知道潛在被阻上游發送者的個數,然后向這些發送者發送信號讓它們提前退出。時刻追蹤這些數目是一項繁瑣且易出錯的工作。
我們需要一種方式能夠讓未知數目、且個數不受限制的goroutine 停止向下游發送數據。在Go語言中,我們可以通過關閉一個
channel 實現,因為在一個已關閉 channel 上執行接收操作(<-ch)總是能夠立即返回,返回值是對應類型的零值
。關於這點的細節,點擊這里查看。
換句話說,我們只要關閉 done channel,就能夠讓解開對所有發送者的阻塞。對一個管道的關閉操作事實上是對所有接收者的廣播信號。
我們把 done channel 作為一個參數傳遞給每一個 流水線上的函數,通過 defer 表達式聲明對 done channel的關閉操作。因此,所有從 main 函數作為源頭被調用的函數均能夠收到 done 的信號,每個階段都能夠正常退出。 使用 done 對main函數重構以后,代碼如下:
func main() { // 設置一個 全局共享的 done channel, // 當流水線退出時,關閉 done channel // 所有 goroutine接收到 done 的信號后, // 都會正常退出。 done := make(chan struct{}) defer close(done) in := gen(done, 2, 3) // 將 sq 的工作分發給兩個goroutine // 這兩個 goroutine 均從 in 讀取數據 c1 := sq(done, in) c2 := sq(done, in) // 消費 outtput 生產的第一個值 out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // defer 調用時,done channel 會被關閉。 }
現在,流水線中的每個階段都能夠在 done channel 被關閉時返回。merge 函數中的 output 代碼也能夠順利返回,因為它知道 done channel關閉時,上游發送者 sq 會停止發送數據。 在 defer 表達式執行結束時,所有調用鏈上的 output 都能保證 wg.Done() 被調用:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // 為 cs 的每一個 channel 創建一個 goroutine // 這個 goroutine 運行 output,它將數據從 c // 拷貝到 out,直到 c 關閉,或者 接收到 done // 的關閉信號。人啊后調用 wg.Done() output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ...
同樣的原理, done channel 被關閉時,sq 也能夠立即返回。在defer表達式執行結束時,所有調用鏈上的 sq 都能保證 out channel 被關閉。代碼如下:
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out }
這里,我們給出幾條構建流水線的指導:
-
當所有發送操作結束時,每個階段都關閉自己的 outbound channels
-
每個階段都會一直從 inbound channels 接收數據,直到這些 channels 被關閉,或發送者解除阻塞狀態。
流水線通過兩種方式解除發送者的阻塞:
-
提供足夠大的緩沖保存發送者發送的數據
-
接收者放棄 channel 時,顯式地通知發送者。
結論
本文介紹了Go 語言中構建數據流水線的一些技巧。流水線的錯誤處理比較復雜,流水線的每個階段都可能阻塞向下游發送數據,
下游階段也可能不再關注上游發送的數據。上面我們介紹了通過關閉一個channel,向流水線中的所有 goroutine 發送一個 "done" 信號;也定義了
構建流水線的正確方法。
下一篇文章,我們將通過一個 並行 md5 的例子來說明本文所講的一些理念和技巧。
原作者 Sameer Ajmani,翻譯 Oscar
下期預告:Go語言並發模型:以並行md5計算為例。英文原文鏈接
相關鏈接