Golang使用Groutine和channels實現了CSP(Communicating Sequential Processes)模型,channles在goroutine的通信和同步中承擔着重要的角色。在GopherCon 2017中,Golang專家Kavya深入介紹了 Go Channels 的內部機制,以及運行時調度器和內存管理系統是如何支持Channel的
以一個簡單的channel應用開始,使用goroutine和channel實現一個任務隊列,並行處理多個任務。
func main(){
    //帶緩沖的channel
    ch := make(chan Task, 3)
    //啟動固定數量的worker
    for i := 0; i< numWorkers; i++ {
        go worker(ch)
    }
    //發送任務給worker
    hellaTasks := getTaks()
    for _, task := range hellaTasks {
        ch <- task
    }
    ...
}
func worker(ch chan Task){
    for {
       //接受任務
       task := <- ch
       process(task)
    }
}
 
        從上面的代碼可以看出,使用golang的goroutine和channel可以很容易的實現一個生產者-消費者模式的任務隊列,相比java, c++簡潔了很多。channel可以天然的實現了下面四個特性: 
- goroutine安全 
- 在不同的goroutine之間存儲和傳輸值 
- 提供FIFO語義(buffered channel提供) 
- 可以讓goroutine block/unblock
那么channel是怎么實現這些特性的呢?下面我們看看當我們調用make來生成一個channel的時候都做了些什么。
make chan
上述任務隊列的例子第三行,使用make創建了一個長度為3的帶緩沖的channel,channel在底層是一個hchan結構體,位於src/runtime/chan.go里。其定義如下:
type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
 
        
make函數在創建channel的時候會在該進程的heap區申請一塊內存,創建一個hchan結構體,返回執行該內存的指針,所以獲取的的ch變量本身就是一個指針,在函數之間傳遞的時候是同一個channel。
hchan結構體使用一個環形隊列來保存groutine之間傳遞的數據(如果是緩存channel的話),使用兩個list保存像該chan發送和從改chan接收數據的goroutine,還有一個mutex來保證操作這些結構的安全。
向channel發送和從channel接收數據主要涉及hchan里的四個成員變量,借用Kavya ppt里的圖示,來分析發送和接收的過程。
send

發送到channel上面去,還有buf則放入。沒有則放入到對應的sendq等待隊列中。
recv

從buf中取數據。沒有則放入到對應的recvq等待隊列中。
關於阻塞隊列
關於放入阻塞隊列到底是如何實現的。需要了解到GO的調度。Go語言調度有3個比較重要的概念。對於OS來說,一般是進程調度,線程調度。里面有調度器,調度算法等來實現,現在比較經典的是公平調度算法,詳細見 https://blog.csdn.net/u012279631/article/details/77677266。go語言相當於把這個權利自己拿過來了。在用戶態自己調用。在線程之下掛載了協程 稱之為G(goroutine),而線程稱之為M(machine 即底層),具體調度者稱之為P(context)。這是因為進程切換上下文耗費的系統資源大,創建線程的時候耗費8K內存。當在高並發的情況下去處理對應的事務,如果用線程去處理仍然對資源非常浪費。從這個角度來講,golang就是為了處理高並發而特定的一種語言。
線程和協程映射關系
P從runable隊列中取到相應的G,放到M中取執行。

阻塞情形
channel隊列滿 無法放入阻塞
當channel已經滿了,仍然有數據發送到channel中時。

原本狀態,G1在running中。
 
發現無法放入channel中,導致阻塞。
 
這時候會從runable隊列中取下一個goroutine,放到M中去執行。
 
再回頭看channel相應的改變。因為buffer已經滿了。所以會把它放在sendq中。
 
結構如下:

當有goroutine去取對應buffer時,清空一個buffer。就會把sendx里面的elem內容copy到對應的channel buffer中。然后把goroutine狀態設置為run,並且放回到runable隊列中。

channel隊列空 無法讀取阻塞
這里用了非常聰明的方式減少了一次內存的拷貝。

當G1發送內容到channel的時候,首先查看recvq隊列是否有阻塞的goroutine。如果有則直接從G1copy到G2。優化了從G1 -> channel ->G2這個步驟。
參考鏈接:
https://speakerdeck.com/kavya719/understanding-channels
https://tiancaiamao.gitbooks.io/go-internals/content/zh/07.1.html
--------------------- 
還是以前面的任務隊列為例:
//G1
func main(){
    ...
    for _, task := range hellaTasks {
        ch <- task    //sender
    }
    ...
}
//G2
func worker(ch chan Task){
    for {
       //接受任務
       task := <- ch  //recevier
       process(task)
    }
}
 
        
其中G1是發送者,G2是接收,因為ch是長度為3的帶緩沖channel,初始的時候hchan結構體的buf為空,sendx和recvx都為0,當G1向ch里發送數據的時候,會首先對buf加鎖,然后將要發送的數據copy到buf里,並增加sendx的值,最后釋放buf的鎖。然后G2消費的時候首先對buf加鎖,然后將buf里的數據copy到task變量對應的內存里,增加recvx,最后釋放鎖。整個過程,G1和G2沒有共享的內存,底層通過hchan結構體的buf,使用copy內存的方式進行通信,最后達到了共享內存的目的,這完全符合CSP的設計理念
一般情況下,G2的消費速度應該是慢於G1的,所以buf的數據會越來越多,這個時候G1再向ch里發送數據,這個時候G1就會阻塞,那么阻塞到底是發生了什么呢?
Goroutine Pause/Resume
goroutine是Golang實現的用戶空間的輕量級的線程,有runtime調度器調度,與操作系統的thread有多對一的關系.如前面圖所示,
M是操作系統的線程,G是用戶啟動的goroutine,P是與調度相關的context,每個M都擁有一個P,P維護了一個能夠運行的goutine隊列,用於該線程執行。
當G1向buf已經滿了的ch發送數據的時候,當runtine檢測到對應的hchan的buf已經滿了,會通知調度器,調度器會將G1的狀態設置為waiting, 移除與線程M的聯系,然后從P的runqueue中選擇一個goroutine在線程M中執行,此時G1就是阻塞狀態,但是不是操作系統的線程阻塞,所以這個時候只用消耗少量的資源。
那么G1設置為waiting狀態后去哪了?怎們去resume呢?我們再回到hchan結構體,注意到hchan有個sendq的成員,其類型是waitq,查看源碼如下:
Go 
type hchan struct { 
... 
recvq waitq // list of recv waiters 
sendq waitq // list of send waiters 
... 
} 
// 
type waitq struct { 
first *sudog 
last *sudog 
} 
 
        
實際上,當G1變為waiting狀態后,會創建一個代表自己的sudog的結構,然后放到sendq這個list中,sudog結構中保存了channel相關的變量的指針(如果該Goroutine是sender,那么保存的是待發送數據的變量的地址,如果是receiver則為接收數據的變量的地址,之所以是地址,前面我們提到在傳輸數據的時候使用的是copy的方式
當G2從ch中接收一個數據時,會通知調度器,設置G1的狀態為runnable,然后將加入P的runqueue里,等待線程執行. 
前面我們是假設G1先運行,如果G2先運行會怎么樣呢?如果G2先運行,那么G2會從一個empty的channel里取數據,這個時候G2就會阻塞,和前面介紹的G1阻塞一樣,G2也會創建一個sudog結構體,保存接收數據的變量的地址,但是該sudog結構體是放到了recvq列表里,當G1向ch發送數據的時候,runtime並沒有對hchan結構體題的buf進行加鎖,而是直接將G1里的發送到ch的數據copy到了G2 sudog里對應的elem指向的內存地址!
chan的分類
分為帶緩存和不帶緩存這2類,尤其需要關注帶緩存的用法,防止掉坑里。
- 不帶緩存:make(chan 數據類型)
 - 帶緩存: make(chan 數據類型,長度)
 
例如定義一個帶緩存的chan:
ch := make(chan int,2)
這里我們定義個緩存長度為2的chan,當我們已經往chan中寫入了2個數據,當再次寫入第三個數據的時候就會發送阻塞,直到其他人從該chan中讀取了數據,那么才可以再次寫入數據,帶緩存的chan類似於一個隊列,當隊列滿的時候是無法寫入數據的。
chan的關閉 
chan可以通過close關閉,關閉后的chan是無法寫入數據的,但是可以讀取數據。
Go語言如何判斷一個chan被關閉
當一個chanel被關閉后,再取出不會阻塞,而是返回零值
package main
 
import "fmt"
 
func main() {
    c := make(chan int, 5)
    c <- 123
    close(c)
     
    fmt.Println(<-c)
    fmt.Println(<-c)
 
        
輸出
123 0
判斷的方法是否關閉方法就是接收第二個參數,如下
package main
 
import "fmt"
 
func main() {
    c := make(chan int, 10)
    c <- 123
    close(c)
 
    var res int
    var ok bool
 
    res, ok = <-c
    fmt.Println(res, ok)
 
    res, ok = <-c
    fmt.Println(res, ok) //此時ok為false
}
 
        輸出:
123 true 0 false
問題: 無緩存chan,在一個goroutine向chan塞了數據之后,然后當前goroutine就是堵塞,必須由另外一個goutine來取走chan數據就會接着走下面的流程。為什么必須要另外一個goroutine才能取走數據呢,為啥不能自己塞自己取,比如在main goroutine 向chan 塞數據后,然后立馬再去拿出來,就會報錯產生死鎖。
是因為當chan堵塞的時候,會把當前導致堵塞的goroutine 置為一個waiting的狀態,也就說當前的這個goroutine自己已經不可能再接受chan的數據了,必須是由另外的一個goroutine 接收之后,才能繼續走下面的流程代碼。
