理解channel 工作原理以及源碼


Go 的並發特性  
  • goroutines : 獨立執行每個任務,並 可能並行 執行
  • channels : 用於 goroutines 之間的通訊、同步
  • 一個簡單的事務處理的例子  
    對於下面這樣的非並發的程序:
            
            
            
                    
    func main() {
    tasks := getTasks()
    // 處理每個任務
    for _, task := range tasks {
    process(task)
    }
    }
    將其轉換為 Go 的並發模式很容易,使用典型的 Task Queue 的模式:
            
            
            
                    
    func main() {
    // 創建帶緩沖的 channel
    ch := make ( chan Task, 3 )
    // 運行固定數量的 workers
    for i := 0 ; i < numWorkers; i++ {
    go worker(ch)
    }
    // 發送任務到 workers
    hellaTasks := getTasks()
    for _, task := range hellaTasks {
    ch <- task
    }
    ...
    }
    func worker(ch chan Task) {
    for {
    // 接收任務
    task := <-ch
    process(task)
    }
    }
    channels 的特性  
    • goroutine-safe,多個 goroutine 可以同時訪問一個 channel 而不會出現競爭問題
    • 可以用於在 goroutine 之間存儲傳遞
    • 其語義是先入先出(FIFO)
    • 可以導致 goroutine 的 block 和 unblock
    • 解析  
      構造 channel  
                     
                     
                     
                             
      // 帶緩沖的 channel
      ch := make ( chan Task, 3 )
      // 無緩沖的 channel
      ch := make ( chan Tass)

      回顧前面提到的 channel 的特性,特別是前兩個。如果忽略內置的 channel,讓你設計一個具有 goroutines-safe 並且可以用來存儲、傳遞值的東西,你會怎么做?很多人可能覺得或許可以用一個帶鎖的隊列來做。沒錯,事實上,channel 內部就是一個帶鎖的隊列。

      https://golang.org/src/runtime/chan.go

                     
                     
                     
                             
      type hchan struct {
      ...
      buf unsafe.Pointer // 指向一個環形隊列
      ...
      sendx uint // 發送 index
      recvx uint // 接收 index
      ...
      lock mutex // 互斥量
      }

      buf 的具體實現很簡單,就是一個環形隊列的實現。sendx  recvx 分別用來記錄發送、接收的位置。然后用一個 lock 互斥鎖來確保無競爭冒險。

      對於每一個 ch := make(chan Task, 3) 這類操作,都會在中,分配一個空間,建立並初始化一個 hchan 結構變量,而 ch 則是指向這個 hchan 結構的指針

      因為 ch 本身就是個指針,所以我們才可以在 goroutine 函數調用的時候直接將 ch 傳遞過去,而不用再 &ch 取指針了,所以所有使用同一個 ch 的 goroutine 都指向了同一個實際的內存空間。

      發送、接收  

      為了方便描述,我們用 G1 表示 main() 函數的 goroutine,而 G2 表示 worker 的 goroutine。

                     
                     
                     
                             
      // G1
      func main() {
      ...
      for _, task := range tasks {
      ch <- task
      }
      ...
      }
                     
                     
                     
                             
      // G2
      func worker(ch chan Task) {
      for {
      task :=<-ch
      process(task)
      }
      }
      簡單的發送、接收  

      那么 G1 中的 ch <- task0 具體是怎么做的呢?

      1. 獲取鎖
      2. enqueue(task0)(這里是內存復制 task0)
      3. 釋放鎖

      這一步很簡單,接下來看 G2  t := <- ch 是如何讀取數據的。

      1. 獲取鎖
      2. t = dequeue()(同樣,這里也是內存復制)
      3. 釋放鎖

      這一步也非常簡單。但是我們從這個操作中可以看到,所有 goroutine 中共享的部分只有這個 hchan 的結構體,而所有通訊的數據都是內存復制。這遵循了 Go 並發設計中很核心的一個理念:

      “Do not communicate by sharing memory;instead, share memory by communicating.”

      阻塞和恢復  
      發送方被阻塞  

      假設 G2 需要很長時間的處理,在此期間,G1 不斷的發送任務:

      1. ch <- task1
      2. ch <- task2
      3. ch <- task3

      但是當再一次 ch <- task4 的時候,由於 ch 的緩沖只有 3 個,所以沒有地方放了,於是 G1 被 block 了,當有人從隊列中取走一個 Task 的時候,G1 才會被恢復。這是我們都知道的,不過我們今天關心的不是發生了什么,而是如何做到的?

      goroutine 的運行時調度  

      首先,goroutine 不是操作系統線程,而是用戶空間線程。因此 goroutine 是由 Go runtime 來創建並管理的,而不是 OS,所以要比操作系統線程輕量級。

      當然,goroutine 最終還是要運行於某個線程中的,控制 goroutine 如何運行於線程中的是 Go runtime 中的 scheduler (調度器)。

      Go 的運行時調度器是 M:N 調度模型,既 N 個 goroutine,會運行於 M 個 OS 線程中。換句話說,一個 OS 線程中,可能會運行多個 goroutine。

      Go 的 M:N 調度中使用了3個結構:

      • M: OS 線程
      • G: goroutine
      • P: 調度上下文
        • P 擁有一個運行隊列,里面是所有可以運行的 goroutine 及其上下文

      要想運行一個 goroutine - G,那么一個線程 M,就必須持有一個該 goroutine 的上下文 P

      goroutine 被阻塞的具體過程  

      那么當 ch <- task4 執行的時候,channel 中已經滿了,需要pause G1。這個時候,:

      1. G1 會調用運行時的 gopark
      2. 然后 Go 的運行時調度器就會接管
      3.  G1 的狀態設置為 waiting
      4. 斷開 G1  M 之間的關系(switch out),因此 G1 脫離 M,換句話說,M 空閑了,可以安排別的任務了。
      5.  P 的運行隊列中,取得一個可運行的 goroutine G
      6. 建立新的 G  M 的關系(Switch in),因此 G 就准備好運行了。
      7. 當調度器返回的時候,新的 G 就開始運行了,而 G1 則不會運行,也就是 block 了。

      從上面的流程中可以看到,對於 goroutine 來說,G1 被阻塞了,新的 G 開始運行了;而對於操作系統線程 M 來說,則根本沒有被阻塞。

      我們知道 OS 線程要比 goroutine 要沉重的多,因此這里盡量避免 OS 線程阻塞,可以提高性能。

      goroutine 恢復執行的具體過程  

      前面理解了阻塞,那么接下來理解一下如何恢復運行。不過,在繼續了解如何恢復之前,我們需要先進一步理解 hchan 這個結構。因為,當 channel 不在滿的時候,調度器是如何知道該讓哪個 goroutine 繼續運行呢?而且 goroutine 又是如何知道該從哪取數據呢?

       hchan 中,除了之前提到的內容外,還定義有 sendq  recvq 兩個隊列,分別表示等待發送、接收的 goroutine,及其相關信息。

                            
                            
                            
                                    
      type hchan struct {
      ...
      buf unsafe.Pointer // 指向一個環形隊列
      ...
      sendq waitq // 等待發送的隊列
      recvq waitq // 等待接收的隊列
      ...
      lock mutex // 互斥量
      }

      其中 waitq 是一個鏈表結構的隊列,每個元素是一個 sudog 的結構,其定義大致為:

                            
                            
                            
                                    
      type sudog struct {
      g *g // 正在等候的 goroutine
      elem unsafe.Pointer // 指向需要接收、發送的元素
      ...
      }

      https://golang.org/src/runtime/runtime2.go?h=sudog#L270

      所以在之前的阻塞 G1 的過程中,實際上:

      1. G1 給自己創建一個 sudog 的變量
      2. 然后追加到 sendq 的等候隊列中,方便將來的 receiver 來使用這些信息恢復 G1

      這些都是發生在調用調度器之前

      那么現在開始看一下如何恢復。

       G2 調用 t := <- ch 的時候,channel 的狀態是,緩沖是滿的,而且還有一個 G1 在等候發送隊列里,然后 G2 執行下面的操作:

      1. G2 先執行 dequeue() 從緩沖隊列中取得 task1  t
      2. G2  sendq 中彈出一個等候發送的 sudog
      3. 將彈出的 sudog 中的 elem 的值 enqueue()  buf 
      4. 將彈出的 sudog 中的 goroutine,也就是 G1,狀態從 waiting 改為 runnable
        1. 然后,G2 需要通知調度器 G1 已經可以進行調度了,因此調用 goready(G1)
        2. 調度器將 G1 的狀態改為 runnable
        3. 調度器將 G1 壓入 P 的運行隊列,因此在將來的某個時刻調度的時候,G1 就會開始恢復運行。
        4. 返回到 G2

      注意,這里是由 G2 來負責將 G1  elem 壓入 buf 的,這是一個優化。這樣將來 G1 恢復運行后,就不必再次獲取鎖、enqueue()、釋放鎖了。這樣就避免了多次鎖的開銷。

      如果接收方先阻塞呢?  

      更酷的地方是接收方先阻塞的流程。

      如果 G2 先執行了 t := <- ch,此時 buf 是空的,因此 G2 會被阻塞,他的流程是這樣:

      1. G2 給自己創建一個 sudog 結構變量。其中 g 是自己,也就是 G2,而 elem 則指向 t
      2. 將這個 sudog 變量壓入 recvq 等候接收隊列
      3. G2 需要告訴 goroutine,自己需要 pause 了,於是調用 gopark(G2)
        1. 和之前一樣,調度器將其 G2 的狀態改為 waiting
        2. 斷開 G2  M 的關系
        3.  P 的運行隊列中取出一個 goroutine
        4. 建立新的 goroutine 和 M 的關系
        5. 返回,開始繼續運行新的 goroutine

      這些應該已經不陌生了,那么當 G1 開始發送數據的時候,流程是什么樣子的呢?

      G1 可以將 enqueue(task),然后調用 goready(G2)。不過,我們可以更聰明一些。

      我們根據 hchan 結構的狀態,已經知道 task 進入 buf 后,G2 恢復運行后,會讀取其值,復制到 t 中。那么 G1 可以根本不走 bufG1 可以直接把數據給 G2

      Goroutine 通常都有自己的棧,互相之間不會訪問對方的棧內數據,除了 channel。這里,由於我們已經知道了 t 的地址(通過 elem指針),而且由於 G2 不在運行,所以我們可以很安全的直接賦值。當 G2 恢復運行的時候,既不需要再次獲取鎖,也不需要對 buf 進行操作。從而節約了內存復制、以及鎖操作的開銷。

      總結  
      • goroutine-safe

        • hchan 中的 lock mutex
      • 存儲、傳遞值,FIFO

        • 通過 hchan 中的環形緩沖區來實現
      • 導致 goroutine 的阻塞和恢復

        • hchan 中的 sendqrecvq,也就是 sudog 結構的鏈表隊列
        • 調用運行時調度器 (gopark(), goready())
        • 其它 channel 的操作  
          無緩沖 channel  

          無緩沖的 channel 行為就和前面說的直接發送的例子一樣:

          • 接收方阻塞 → 發送方直接寫入接收方的棧
          • 發送方阻塞 → 接受法直接從發送方的 sudog 中讀取
          • select  

            https://golang.org/src/runtime/select.go

            1. 先把所有需要操作的 channel 上鎖
            2. 給自己創建一個 sudog,然后添加到所有 channel 的 sendqrecvq(取決於是發送還是接收)
            3. 把所有的 channel 解鎖,然后 pause 當前調用 select 的 goroutine(gopark()
            4. 然后當有任意一個 channel 可用時,select 的這個 goroutine 就會被調度執行。
            5. resuming mirrors the pause sequence
            6. 為什么 Go 會這樣設計?  
              Simplicity  

              更傾向於帶鎖的隊列,而不是無鎖的實現。

              “性能提升不是憑空而來的,是隨着復雜度增加而增加的。” - dvyokov

              后者雖然性能可能會更好,但是這個優勢,並不一定能夠戰勝隨之而來的實現代碼的復雜度所帶來的劣勢。

              Performance  
              • 調用 Go 運行時調度器,這樣可以保持 OS 線程不被阻塞

              跨 goroutine 的棧讀、寫。

              • 可以讓 goroutine 醒來后不必獲取鎖
              • 可以避免一些內存復制

              當然,任何優勢都會有其代價。這里的代價是實現的復雜度,所以這里有更復雜的內存管理機制、垃圾回收以及棧收縮機制。

              在這里性能的提高優勢,要比復雜度的提高帶來的劣勢要大。

              所以在 channel 實現的各種代碼中,我們都可以見到這種 simplicity vs performance 的權衡后的結果。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM