Golang 並發簡介


並發概要

隨着多核CPU的普及, 為了更快的處理任務, 出現了各種並發編程的模型, 主要有以下幾種:

模型名稱 優點 缺點
多進程 簡單, 隔離性好, 進程間幾乎無影響 開銷最大
多線程 目前使用最多的方式, 開銷比多進程小 高並發模式下, 效率會有影響
異步 相比多線程而言, 可以減少線程的數量 編碼要求高, 需要對流程分割合理
協程 用戶態線程, 不需要操作系統來調度, 所以輕量, 開銷極小 需要語言支持

協程介紹

協程是個抽象的概念, 可以映射到到操作系統層面的進程, 線程等概念.
由於協程是用戶態的線程, 不用操作系統來調度, 所以不受操作系統的限制, 可以輕松的創建百萬個, 因此也被稱為 "輕量級線程".

在 golang 中, 協程不是由庫實現的, 而是受語言級別支持的, 因此, 在 golang 中, 使用協程非常方便.
下面通過例子演示在 golang 中, 如何使用協程來完成並發操作.

golang 並發

實現方式

golang 中, 通過 go 關鍵字可以非常簡單的啟動一個協程, 幾乎沒有什么學習成本.
當然並發編程中固有的業務上的困難依然存在(比如並發時的同步, 超時等), 但是 golang 在語言級別給我們提供了優雅簡潔的解決這些問題的途徑.

理解了 golang 中協程的使用, 會給我們寫並發程序時帶來極大的便利.
首先以一個簡單的例子開始 golang 的並發編程.

package main

import (
     "fmt"
     "time"
)

func main() {
     for i := 0; i < 10; i++ {
             go sum(i, i+10)
     }

     time.Sleep(time.Second * 5)
}

func sum(start, end int) int {
     var sum int = 0
     for i := start; i < end; i++ {
             sum += i
     }

     fmt.Printf("Sum from %d to %d is %d\n", start, end, sum)
     return sum
}

執行結果如下: (同時啟動10個協程做累加運算, 10個協程的執行順序可能會不一樣)

$ go run main.go
Sum from 0 to 10 is 45
Sum from 6 to 16 is 105
Sum from 7 to 17 is 115
Sum from 2 to 12 is 65
Sum from 8 to 18 is 125
Sum from 1 to 11 is 55
Sum from 9 to 19 is 135
Sum from 3 to 13 is 75
Sum from 4 to 14 is 85
Sum from 5 to 15 is 95

通過 go 關鍵字啟動協程之后, 主進程並不會等待協程的執行, 而是繼續執行直至結束.
本例中, 如果沒有 time.Sleep(time.Second * 5) 等待5秒的話, 那么主進程不會等待那10個協程的運行結果, 直接就結束了.
主進程結束也會導致那10個協程的執行中斷, 所以, 如果去掉 time.Sleep 這行代碼, 可能屏幕上什么顯示也沒有.

簡單示例

實際使用協程時, 我們一般會等待所有協程執行完成(或者超時)后, 才會結束主進程, 但是不會用 time.Sleep 這種方式,
因為主進程並不知道協程什么時候會結束, 沒法設置等待時間.

這時, 就看出 golang 中的 channel 機制所帶來的好處了. 下面用 channel 來改造上面的 time.Sleep

package main

import "fmt"

func main() {
     var ch = make(chan string)

     for i := 0; i < 10; i++ {
             go sum(i, i+10, ch)
     }

     for i := 0; i < 10; i++ {
             fmt.Print(<-ch)
     }
}

func sum(start, end int, ch chan string) {

     var sum int = 0
     for i := start; i < end; i++ {
             sum += i
     }

     ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum)
}

程序執行結果和上面一樣, 因為是並發的緣故, 可能輸出的 sum 順序可能會不一樣.

$ go run main.go
Sum from 9 to 19 is 135
Sum from 0 to 10 is 45
Sum from 5 to 15 is 95
Sum from 6 to 16 is 105
Sum from 7 to 17 is 115
Sum from 2 to 12 is 65
Sum from 8 to 18 is 125
Sum from 3 to 13 is 75
Sum from 1 to 11 is 55
Sum from 4 to 14 is 85

golang 的 chan 可以是任意類型的, 上面的例子中定義的是 string 型.
從上面的程序可以看出, 往 chan 中寫入數據之后, 協程會阻塞在那里, 直到在某個地方將 chan 中的值讀取出來, 協程才會繼續運行下去.

上面的例子中, 我們啟動了10個協程, 每個協程都往 chan 中寫入了一個字符串, 然后在 main 函數中, 依次讀取 chan 中的字符串, 並在屏幕上打印出來.
通過 golang 中的 chan, 不僅實現了主進程 和 協程之間的通信, 而且不用像 time.Sleep 那樣不可控(因為你不知道要 Sleep 多長時間).

並發時的緩沖

上面的例子中, 所有協程使用的是同一個 chan, chan 的容量默認只有 1, 當某個協程向 chan 中寫入數據時, 其他協程再次向 chan 中寫入數據時, 其實是阻塞的.
等到 chan 中的數據被讀出之后, 才會再次讓某個其他協程寫入, 因為每個協程都執行的非常快, 所以看不出來.

改造下上面的例子, 加入些 Sleep 代碼, 延長每個協程的執行時間, 我們就可以看出問題, 代碼如下:

package main

import (
     "fmt"
     "time"
)

func main() {
     var ch = make(chan string)

     for i := 0; i < 5; i++ {
             go sum(i, i+10, ch)
     }

     for i := 0; i < 10; i++ {
             time.Sleep(time.Second * 1)
             fmt.Print(<-ch)
     }
}

func sum(start, end int, ch chan string) int {
     ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String())
     var sum int = 0
     for i := start; i < end; i++ {
             sum += i
     }
     time.Sleep(time.Second * 10)
     ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String())
     return sum
}

執行結果如下:

$ go run main.go
Sum from 4 to 14 is starting at 2015-10-13 13:59:56.025633342 +0800 CST
Sum from 3 to 13 is starting at 2015-10-13 13:59:56.025608644 +0800 CST
Sum from 0 to 10 is starting at 2015-10-13 13:59:56.025508327 +0800 CST
Sum from 2 to 12 is starting at 2015-10-13 13:59:56.025574486 +0800 CST
Sum from 1 to 11 is starting at 2015-10-13 13:59:56.025593711 +0800 CST
Sum from 4 to 14 is 85 at 2015-10-13 14:00:07.030611465 +0800 CST
Sum from 3 to 13 is 75 at 2015-10-13 14:00:08.031926629 +0800 CST
Sum from 0 to 10 is 45 at 2015-10-13 14:00:09.036724803 +0800 CST
Sum from 2 to 12 is 65 at 2015-10-13 14:00:10.038125044 +0800 CST
Sum from 1 to 11 is 55 at 2015-10-13 14:00:11.040366206 +0800 CST

為了演示 chan 的阻塞情況, 上面的代碼中特意加了一些 time.Sleep 函數.

  • 每個執行 Sum 函數的協程都會運行 10 秒
  • main函數中每隔 1 秒讀一次 chan 中的數據

從打印結果我們可以看出, 所有協程幾乎是同一時間開始的, 說明了協程確實是並發的.
其中, 最快的協程(Sum from 4 to 14…)執行了 11 秒左右, 為什么是 11 秒左右呢?
說明它阻塞在了 Sum 函數中的第一行上, 等了 1 秒之后, main 函數開始讀出 chan 中數據后才繼續運行.
它自身運行需要 10 秒, 加上等待的 1 秒, 正好 11 秒左右.

最慢的協程執行了 15 秒左右, 這個也很好理解, 總共啟動了 5 個協程, main 函數每隔 1 秒 讀出一次 chan, 最慢的協程等待了 5 秒,
再加上自身執行了 10 秒, 所以一共 15 秒左右.

到這里, 我們很自然會想到能否增加 chan 的容量, 從而使得每個協程盡快執行, 完成自己的操作, 而不用等待, 消除由於 main 函數的處理所帶來的瓶頸呢?
答案是當然可以, 而且在 golang 中實現還很簡單, 只要在創建 chan 時, 指定 chan 的容量就行.

package main

import (
     "fmt"
     "time"
)

func main() {
     var ch = make(chan string, 10)

     for i := 0; i < 5; i++ {
             go sum(i, i+10, ch)
     }

     for i := 0; i < 10; i++ {
             time.Sleep(time.Second * 1)
             fmt.Print(<-ch)
     }
}

func sum(start, end int, ch chan string) int {
     ch <- fmt.Sprintf("Sum from %d to %d is starting at %s\n", start, end, time.Now().String())
     var sum int = 0
     for i := start; i < end; i++ {
             sum += i
     }
     time.Sleep(time.Second * 10)
     ch <- fmt.Sprintf("Sum from %d to %d is %d at %s\n", start, end, sum, time.Now().String())
     return sum
}

執行結果如下:

$ go run main.go
Sum from 0 to 10 is starting at 2015-10-13 14:22:14.64534265 +0800 CST
Sum from 2 to 12 is starting at 2015-10-13 14:22:14.645382961 +0800 CST
Sum from 3 to 13 is starting at 2015-10-13 14:22:14.645408947 +0800 CST
Sum from 4 to 14 is starting at 2015-10-13 14:22:14.645417257 +0800 CST
Sum from 1 to 11 is starting at 2015-10-13 14:22:14.645427028 +0800 CST
Sum from 1 to 11 is 55 at 2015-10-13 14:22:24.6461138 +0800 CST
Sum from 3 to 13 is 75 at 2015-10-13 14:22:24.646330223 +0800 CST
Sum from 2 to 12 is 65 at 2015-10-13 14:22:24.646325521 +0800 CST
Sum from 4 to 14 is 85 at 2015-10-13 14:22:24.646343061 +0800 CST
Sum from 0 to 10 is 45 at 2015-10-13 14:22:24.64634674 +0800 CST

從執行結果可以看出, 所有協程幾乎都是 10秒完成的. 所以在使用協程時, 記住可以通過使用緩存來進一步提高並發性.

並發時的超時

並發編程, 由於不能確保每個協程都能及時響應, 有時候協程長時間沒有響應, 主進程不可能一直等待, 這時候就需要超時機制.
在 golang 中, 實現超時機制也很簡單.

package main

import (
     "fmt"
     "time"
)

func main() {
     var ch = make(chan string, 1)
     var timeout = make(chan bool, 1)

     go sum(1, 10, ch)
     go func() {
             time.Sleep(time.Second * 5) // 5 秒超時
             timeout <- true
     }()

     select {
     case sum := <-ch:
             fmt.Print(sum)
     case <-timeout:
             fmt.Println("Sorry, TIMEOUT!")
     }
}

func sum(start, end int, ch chan string) int {
     var sum int = 0
     for i := start; i < end; i++ {
             sum += i
     }
     time.Sleep(time.Second * 10)
     ch <- fmt.Sprintf("Sum from %d to %d is %d\n", start, end, sum)
     return sum
}

通過一個匿名函數來控制超時, 然后同時啟動 計算 sum 的協程和timeout協程, 在 select 中看誰先結束,
如果 timeout 結束后, 計算 sum 的協程還沒有結束的話, 就會進入超時處理.

上例中, timeout 只有5秒, sum協程會執行10秒, 所以執行結果如下:

$ go run main.go
Sorry, TIMEOUT!

修改 time.Sleep(time.Second * 5) 為 time.Sleep(time.Second * 15) 的話, 就會看到 sum 協程的執行結果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM