進一步認識golang中的並發


如果你成天與編程為伍,那么並發這個名詞對你而言一定特別耳熟。需要並發的場景太多了,例如一個聊天程序,如果你想讓這個聊天程序能夠同時接收信息和發送信息,就一定會用到並發,無論那是什么樣的並發。

 

並發的意義就是:讓一個程序同時做多件事情!

理解這一點非常重要,是的,並發的目的只是為了能讓程序同時做另一件事情而已,並發的目的並不是讓程序運行的更快(如果是多核處理器,而且任務可以分成相互獨立的部分,那么並發確實可以讓事情解決的更快)。記得我學C++那時候開始接觸並發,還以為每開一個線程程序就會加速一倍呢。。。。

 

golang從語言級別上對並發提供了支持,而且在啟動並發的方式上直接添加了語言級的關鍵字。我並不會很多語言,而且也沒有很多的項目經驗,可能從我嘴里說出的比較不會非常客觀,但是起碼和C/C++(不考慮C++11)利用系統API來操作線程的方式相比,golang的並發機制運用起來就非常舒適了,不必非要按照固定的格式來定義線程函數,也不必因為啟動線程的時候只能給線程函數傳遞一個參數而煩惱。和Java相比的話,Go的優點就是並發的部分不必非得實現成一個class,而且更加輕量(其實我也不知道到底為什么更輕量^_^)。

 

因為最近自己想寫一個小開源項目,而且其中的關鍵部分會用到很多並發機制,於是開始重溫習Go的並發相關的知識。從我學習Go到現在已經將近1年了,覺得現在再重新看Go的並發時收獲頗多,因為畢竟寫了不少Go的小程序,遇到過許多解釋不通的現象和困惑,借着這次溫故知新的機會,把學習來的新經驗趕緊記錄下來,分享給各位網友尤其是喜歡Go的朋友們。

 

 

並發的啟動

這篇文章關於並發的啟動我就一概而過了,如果要讓一個函數並發運行,只需一個關鍵字"go":

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. func Afuntion(para1, para2, para3, ...) {  
  2.     // Do some process  
  3.     // ...  
  4. }  
  5.   
  6. func main() {  
  7.     go Afuntion(para1, para2, para3, ...) //只需加一個go前綴,Afunction()就會並發運行  
  8. }  

 

go的並發啟動非常簡單,幾乎沒有什么額外的准備工作,要並發的函數和一般的函數沒有什么區別,參數隨意,啟動的時候只需要加一個go關鍵之即可。

 

 

當然,並發的啟動沒什么好講的,並發最精髓的部分在於這些協程(協程類似於線程,但是是更輕量的線程)的調度

我沒法以一個資深的老專家向你全方位的講解調度的各個方面,但是我可以把我遇到過的一些場景和我所用過的調度方法(所以絕對是能用的)分享給你。

 

go提供了sync包channel機制來解決協程間的同步與通信。channel的用法非常靈活,使用的方式多種多樣,而且官網的Effective Go中給出了channel的一種並發以外的方式。我們先來介紹sync包提供的調度支持吧。

 

 

sync.WaitGroup

sync包中的WaitGroup實現了一個類似任務隊列的結構,你可以向隊列中加入任務,任務完成后就把任務從隊列中移除,如果隊列中的任務沒有全部完成,隊列就會觸發阻塞以阻止程序繼續運行,具體用法參考如下代碼:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. // 代碼粘上就可以跑通  
  2. package main  
  3.   
  4. import (  
  5.     "fmt"  
  6.     "sync"  
  7. )  
  8.   
  9. var waitgroup sync.WaitGroup  
  10.   
  11. func Afunction(shownum int) {  
  12.     fmt.Println(shownum)  
  13.     waitgroup.Done() //任務完成,將任務隊列中的任務數量-1,其實.Done就是.Add(-1)  
  14. }  
  15.   
  16. func main() {  
  17.     for i := 0; i < 10; i++ {  
  18.         waitgroup.Add(1) //每創建一個goroutine,就把任務隊列中任務的數量+1  
  19.         go Afunction(i)  
  20.     }  
  21.     waitgroup.Wait() //.Wait()這里會發生阻塞,直到隊列中所有的任務結束就會解除阻塞  
  22. }  

 

我們可以利用sync.WaitGroup來滿足這樣的情況:

 

        ▲某個地方需要創建多個goroutine,並且一定要等它們都執行完畢后再繼續執行接下來的操作。

是的,WaitGroup最大的優點就是.Wait()可以阻塞到隊列中的任務都完畢后才解除阻塞。

 

channel
channel是一種golang內置的類型,英語的直譯為"通道",其實,它真的就是一根管道,而且是一個先進先出的數據結構

 

我們能對channel進行的操作只有4種:

(1) 創建chennel (通過make()函數)

(2) 放入數據 (通過 channel <- data 操作) 

(3) 取出數據 (通過 <-channel 操作)

(4)  關閉channel (通過close()函數)

 

但是channel有一些非常給力的性質需要你牢記,請一定要記住並理解好它們:

(1) channel是一種阻塞管道,是自動阻塞的。意思就是,如果管道滿了,一個對channel放入數據的操作就會阻塞,直到有某個routine從channel中取出數據,這個放入數據的操作才會執行。相反同理,如果管道是空的,一個從channel取出數據的操作就會阻塞,直到某個routine向這個channel中放入數據,這個取出數據的操作才會執行。這事channel最重要的一個性質,沒有之一。

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func main() {  
  4.     ch := make(chan int, 3)  
  5.     ch <- 1  
  6.     ch <- 1  
  7.     ch <- 1  
  8.     ch <- //這一行操作就會發生阻塞,因為前三行的放入數據的操作已經把channel填滿了  
  9. }  

 

 

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func main() {  
  4.     ch := make(chan int, 3)  
  5.     <-ch //這一行會發生阻塞,因為channel才剛創建,是空的,沒有東西可以取出  
  6. }  

 

 

(2)channel分為有緩沖的channel和無緩沖的channel。兩種channel的創建方法如下:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. ch := make(chan int) //無緩沖的channel,同等於make(chan int, 0)  
  2. ch := make(chan int, 5) //一個緩沖區大小為5的channel  

 

 

操作一個channel時一定要注意其是否帶有緩沖,因為有些操作會觸發channel的阻塞導致死鎖。下面就來解釋這些需要注意的情景。

首先來看一個一個例子,這個例子是兩段只有主函數不同的代碼:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. import "fmt"  
  4.   
  5. func Afuntion(ch chan int) {  
  6.     fmt.Println("finish")  
  7.     <-ch  
  8. }  
  9.   
  10. func main() {  
  11.     ch := make(chan int) //無緩沖的channel  
  12.     go Afuntion(ch)  
  13.     ch <- 1  
  14.       
  15.     // 輸出結果:  
  16.     // finish  
  17. }  

 

 

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. import "fmt"  
  4.   
  5. func Afuntion(ch chan int) {  
  6.     fmt.Println("finish")  
  7.     <-ch  
  8. }  
  9.   
  10. func main() {  
  11.     ch := make(chan int) //無緩沖的channel  
  12.     //只是把這兩行的代碼順序對調一下  
  13.     ch <- 1  
  14.     go Afuntion(ch)  
  15.   
  16.     // 輸出結果:  
  17.     // 死鎖,無結果  
  18. }  

 

 

前一段代碼最終會輸出"finish"並正常結束,但是后一段代碼會發生死鎖。為什么會出現這種現象呢,咱們把上面兩段代碼的邏輯跑一下。

 

第一段代碼:

        1. 創建了一個無緩沖channel

        2. 啟動了一個goroutine,這個routine中對channel執行取出操作,但是因為這時候channel為空,所以這個取出操作發生阻塞,但是主routine可沒有發生阻塞,它還在繼續運行呢

        3. 主goroutine這時候繼續執行下一行,往channel中放入了一個數據

        4. 這時阻塞的那個routine檢測到了channel中存在數據了,所以接觸阻塞,從channel中取出數據,程序就此完畢

 

第二段代碼:

        1.  創建了一個無緩沖的channel

        2.  主routine要向channel中放入一個數據,但是因為channel沒有緩沖,相當於channel一直都是滿的,所以這里會發生阻塞。可是下面的那個goroutine還沒有創建呢,主routine在這里一阻塞,整個程序就只能這么一直阻塞下去了,然后。。。然后就沒有然后了。。死鎖!

※從這里可以看出,對於無緩沖的channel,放入操作和取出操作不能再同一個routine中,而且應該是先確保有某個routine對它執行取出操作,然后才能在另一個routine中執行放入操作。

 

對於帶緩沖的channel,就沒那么多講究了,因為有緩沖空間,所以只要緩沖區不滿,放入操作就不會阻塞,同樣,只要緩沖區不空,取出操作就不會阻塞。而且,帶有緩沖的channel的放入和取出可以用在同一個routine中。

但是,並不是說有了緩沖就可以隨意使用channel的放入和取出了,我們一定要注意放入和取出的速率問題。下面我們就舉個例子來說明這種問題:

我們經常會用利用channel自動阻塞的性質來控制當前運行的goroutine的總數量,如下:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. import (  
  4.     "fmt"  
  5. )  
  6.   
  7. func Afunction(ch chan int) {  
  8.     fmt.Println("finish")  
  9.     <-ch //goroutine執行完了就從channel取出一個數據  
  10. }  
  11.   
  12. func main() {  
  13.     ch := make(chan int, 10)  
  14.     for i := 0; i < 1000; i++ {  
  15.         //每當創建goroutine的時候就向channel中放入一個數據,如果里面已經有10個數據了,就會  
  16.         //阻塞,由此我們將同時運行的goroutine的總數控制在<=10個的范圍內  
  17.         ch <- 1  
  18.         go Afunction(ch)  
  19.     }  
  20.     // 這里只是示范個例子,當然,接下來應該有些更加周密的同步操作  
  21. }  

 

 

上面這種channel的使用方式幾乎經常會用到,但是再看一下接下來這段代碼,它和上面這種使用channel的方式幾乎一樣,但是它會造成問題:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func Afunction(ch chan int) {  
  4.     ch <- 1  
  5.     ch <- 1  
  6.     ch <- 1  
  7.     ch <- 1  
  8.     ch <- 1  
  9.   
  10.     <-ch  
  11. }  
  12.   
  13. func main() {  
  14.     //主routine的操作同上面那段代碼  
  15.     ch := make(chan int, 10)  
  16.     for i := 0; i < 100; i++ {  
  17.         ch <- 1  
  18.         go Afunction(ch)  
  19.     }  
  20.   
  21.     // 這段代碼運行的結果為死鎖  
  22. }  

 

 

 

上面這段運行和之前那一段基本上原理是一樣的,但是運行后卻會發生死鎖。為什么呢?其實總結起來就一句話,"放得太快,取得太慢了"。

 

按理說,我們應該在我們主routine中創建子goroutine並每次向channel中放入數據,而子goroutine負責從channel中取出數據。但是我們的這段代碼在創建了子goroutine后,每個routine會向channel中放入5個數據。這樣,每向channel中放入6個數據才會執行一次取出操作,這樣一來就可能會有某一時刻,channel已經滿了,但是所有的routine都在執行放入操作(因為它們當前執行放入操作的概率是執行取出操作的6倍),這樣一來,所有的routine都阻塞了,從而導致死鎖。

 

在使用帶緩沖的channel時一定要注意放入與取出的速率問題。

 

(3)關閉后的channel可以取數據,但是不能放數據。而且,channel在執行了close()后並沒有真的關閉,channel中的數據全部取走之后才會真正關閉。

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func main() {  
  4.     ch := make(chan int, 5)  
  5.     ch <- 1  
  6.     ch <- 1  
  7.     close(ch)  
  8.     ch <- //不能對關閉的channel執行放入操作  
  9.           
  10.         // 會觸發panic  
  11. }  

 

 

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func main() {  
  4.     ch := make(chan int, 5)  
  5.     ch <- 1  
  6.     ch <- 1  
  7.     close(ch)  
  8.     <-ch //只要channel還有數據,就可能執行取出操作  
  9.   
  10.         //正常結束  
  11. }  

 

 

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. import "fmt"  
  4.   
  5. func main() {  
  6.     ch := make(chan int, 5)  
  7.     ch <- 1  
  8.     ch <- 1  
  9.     ch <- 1  
  10.     ch <- 1  
  11.     close(ch)  //如果執行了close()就立即關閉channel的話,下面的循環就不會有任何輸出了  
  12.     for {  
  13.         data, ok := <-ch  
  14.         if !ok {  
  15.             break  
  16.         }  
  17.         fmt.Println(data)  
  18.     }  
  19.       
  20.     // 輸出:  
  21.     // 1  
  22.     // 1  
  23.     // 1  
  24.     // 1  
  25.     //   
  26.     // 調用了close()后,只有channel為空時,channel才會真的關閉  
  27. }  



 

 

使用channel控制goroutine數量

channel的性質到這里就介紹完了,但是看上去,channel的使用似乎比WaitGroup要注意更多的細節,那么有什么理由一定要用channel來實現同步呢?channel相比WaitGroup有一個很大的優點,就是channel不僅可以實現協程的同步,而且可以控制當前正在運行的goroutine的總數。

下面就介紹幾種利用channel控制goroutine數量的方法:

一.如果任務數量是固定的:

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. func Afunction(ch chan int) {  
  4.     ch <- 1  
  5. }  
  6.   
  7. func main() {  
  8.     var (  
  9.         ch        chan int = make(chan int, 20) //可以同時運行的routine數量為20  
  10.         dutycount int      = 500  
  11.     )  
  12.     for i := 0; i < dutycount; i++ {  
  13.         go Afunction(ch)  
  14.     }  
  15.   
  16.     //知道了任務總量,可以像這樣利用固定循環次數的循環檢測所有的routine是否工作完畢  
  17.     for i := 0; i < dutycount; i++ {  
  18.         <-ch  
  19.     }  
  20. }  

 


二.如果任務的數量不固定

 

 

[java]  view plain  copy
 
 在CODE上查看代碼片派生到我的代碼片
  1. package main  
  2.   
  3. import (  
  4.     "fmt"  
  5. )  
  6.   
  7. func Afunction(routineControl chan int, feedback chan string) {  
  8.     defer func() {  
  9.         <-routineControl  
  10.         feedback <- "finish"  
  11.     }()  
  12.   
  13.     // do some process  
  14.     // ...  
  15. }  
  16.   
  17. func main() {  
  18.     var (  
  19.         routineCtl chan int    = make(chan int, 20)  
  20.         feedback   chan string = make(chan string, 10000)  
  21.   
  22.         msg      string  
  23.         allwork  int  
  24.         finished int  
  25.     )  
  26.     for i := 0; i < 1000; i++ {  
  27.         routineCtl <- 1  
  28.         allwork++  
  29.         go Afunction(routineCtl, feedback)  
  30.     }  
  31.   
  32.     for {  
  33.         msg = <-feedback  
  34.         if msg == "finish" {  
  35.             finished++  
  36.         }  
  37.         if finished == allwork {  
  38.             break  
  39.         }  
  40.     }  
  41. }  



 

 

如果轉載請注明出處:http://blog.csdn.NET/gophers/article/details/24665419


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM