轉載:https://zhuanlan.zhihu.com/p/27917262
以一個簡單的channel應用開始,使用goroutine和channel實現一個任務隊列,並行處理多個任務。
func main(){
//帶緩沖的channel
ch := make( chan Task, 3)
//啟動固定數量的worker
for i := 0; i< numWorkers; i++ {
go worker(ch)
}
//發送任務給worker
hellaTasks := getTaks()
for _, task := range hellaTasks {
ch <- task
}
...
}
func worker(ch chan Task){
for {
//接受任務
task := <- ch
process(task)
}
}
從上面的代碼可以看出,使用golang的goroutine和channel可以很容易的實現一個生產者-消費者模式的任務隊列,相比Java, c++簡潔了很多。channel可以天然的實現了下面四個特性:
- goroutine安全
- 在不同的goroutine之間存儲和傳輸值 - 提供FIFO語義(buffered channel提供)
- 可以讓goroutine block/unblock
那么channel是怎么實現這些特性的呢?下面我們看看當我們調用make來生成一個channel的時候都做了些什么。
1. make chan
上述任務隊列的例子第三行,使用make創建了一個長度為3的帶緩沖的channel,channel在底層是一個hchan結構體,位於src/runtime/chan.go里。其定義如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
make函數在創建channel的時候會在該進程的heap區申請一塊內存,創建一個hchan結構體,返回執行該內存的指針,所以獲取的的ch變量本身就是一個指針,在函數之間傳遞的時候是同一個channel。
hchan結構體使用一個環形隊列來保存groutine之間傳遞的數據(如果是緩存channel的話),使用兩個list保存像該chan發送和從該chan接收數據的goroutine,還有一個mutex來保證操作這些結構的安全。
2. 發送和接收
向channel發送和從channel接收數據主要涉及hchan里的四個成員變量,借用Kavya ppt里的圖示,來分析發送和接收的過程。
還是以前面的任務隊列為例:
//G1
func main(){
...
for _, task := range hellaTasks {
ch <- task //sender
}
...
}
//G2
func worker(ch chan Task){
for {
//接受任務
task := <- ch //recevier
process(task)
}
}
其中G1是發送者,G2是接收,因為ch是長度為3的帶緩沖channel,初始的時候hchan結構體的buf為空,sendx和recvx都為0,當G1向ch里發送數據的時候,會首先對buf加鎖,然后將要發送的數據copy到buf里,並增加sendx的值,最后釋放buf的鎖。然后G2消費的時候首先對buf加鎖,然后將buf里的數據copy到task變量對應的內存里,增加recvx,最后釋放鎖。整個過程,G1和G2沒有共享的內存,底層通過hchan結構體的buf,使用copy內存的方式進行通信,最后達到了共享內存的目的,這完全符合CSP的設計理念
一般情況下,G2的消費速度應該是慢於G1的,所以buf的數據會越來越多,這個時候G1再向ch里發送數據,這個時候G1就會阻塞,那么阻塞到底是發生了什么呢?
3. Goroutine Pause/Resume
goroutine是Golang實現的用戶空間的輕量級的線程,有runtime調度器調度,與操作系統的thread有多對一的關系,相關的數據結構如下圖:

其中M是操作系統的線程,G是用戶啟動的goroutine,P是與調度相關的context,每個M都擁有一個P,P維護了一個能夠運行的goutine隊列,用於該線程執行。
當G1向buf已經滿了的ch發送數據的時候,當runtine檢測到對應的hchan的buf已經滿了,會通知調度器,調度器會將G1的狀態設置為waiting, 移除與線程M的聯系,然后從P的runqueue中選擇一個goroutine在線程M中執行,此時G1就是阻塞狀態,但是不是操作系統的線程阻塞,所以這個時候只用消耗少量的資源。
那么G1設置為waiting狀態后去哪了?怎們去resume呢?我們再回到hchan結構體,注意到hchan有個sendq的成員,其類型是waitq,查看源碼如下:
type hchan struct {
...
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
...
}
//
type waitq struct {
first *sudog
last *sudog
}
實際上,當G1變為waiting狀態后,會創建一個代表自己的sudog的結構,然后放到sendq這個list中,sudog結構中保存了channel相關的變量的指針(如果該Goroutine是sender,那么保存的是待發送數據的變量的地址,如果是receiver則為接收數據的變量的地址,之所以是地址,前面我們提到在傳輸數據的時候使用的是copy的方式)

當G2從ch中接收一個數據時,會通知調度器,設置G1的狀態為runnable,然后將加入P的runqueue里,等待線程執行.

4. wait empty channel
前面我們是假設G1先運行,如果G2先運行會怎么樣呢?如果G2先運行,那么G2會從一個empty的channel里取數據,這個時候G2就會阻塞,和前面介紹的G1阻塞一樣,G2也會創建一個sudog結構體,保存接收數據的變量的地址,但是該sudog結構體是放到了recvq列表里,當G1向ch發送數據的時候,runtime並沒有對hchan結構體題的buf進行加鎖,而是直接將G1里的發送到ch的數據copy到了G2 sudog里對應的elem指向的內存地址!

5. 總結
Golang的一大特色就是其簡單高效的天然並發機制,使用goroutine和channel實現了CSP模型。理解channel的底層運行機制對靈活運用golang開發並發程序有很大的幫助,看了Kavya的分享,然后結合golang runtime相關的源碼(源碼開源並且也是golang實現簡直良心!),對channel的認識更加的深刻,當然還有一些地方存在一些疑問,比如goroutine的調度實現相關的,還是要潛心膜拜大神們的源碼!
