一.概述
簡而言之,所謂並發編程是指在一台處理器上"同時"處理多個任務。
通常程序會被編寫為一個順序執行並完成一個獨立任務的代碼。如果沒有特別的需求,最好總是這樣寫代碼,因為這種類型的程序通常很容易寫,也很容易維護。不過也有一些情況下,並行執行多個任務會有更大的好處。一個例子是,Web服務器需要在各自獨立的套接字(socket)上同時接受多個數據請求。每個套接字的請求都是獨立的,可以完全獨立於其他套接字進行處理。具有並行執行多個請求的能力可以顯著提高這類系統的性能。考慮到這一點,Go語言的語法和運行時直接內置了對並發的支持。
宏觀的並發是指在一段時間內,有多個程序在同時運行。
並發在微觀上,是指在同一時刻只能有一條指令執行,但多個程序指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若干段,使多個程序快速交替的執行。
1.1 並行和並發
並行:指在同一時刻,有多條指令在多個處理器上同時執行。
並發:指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若干段,通過cpu時間片輪轉使多個進程快速交替的執行。
並發(concurrency)不是並行(parallelism)。並行是讓不同的代碼片段同時在不同的物理處理器上執行。並行的關鍵是同時做很多事情,而並發是指同時管理很多事情,這些事情可能只做了一半就被暫停去做別的事情了。在很多情況下,並發的效果比並行好,因為操作系統和硬件的總資源一半很少,但能支持系統同時做很多事情。這種"使用較少資源做更多的事情"的哲學,也是指導Go語言設計的哲學。
二.常見的並發編程基礎
2.1 進程並發
(1)程序與進程
程序:是指編譯好的二進制文件,在磁盤上,不占用系統資源(內存、打開的文件、設備、鎖....),是一個靜態的實體。
進程:是指一個程序在運行時所需要和維護的資源的集合,是一個動態的實體。
進程和程序並不是一一對應的,一個程序執行在不同的數據集上就成為不同的進程,可以用進程控制塊(PCB)來唯一地標識每個進程。而這一點正是程序無法做到的,由於程序沒有和數據產生直接的聯系,既使是執行不同的數據的程序,他們的指令的集合依然是一樣的,所以無法唯一地標識出這些運行於不同數據集上的程序。一般來說,一個進程肯定有一個與之對應的程序,而且只有一個。而一個程序有可能沒有與之對應的進程(因為它沒有執行),也有可能有多個進程與之對應(運行在幾個不同的數據集上)。
(2)進程地址空間
地址空間就是每個進程所能訪問的內存地址范圍。
這個地址范圍不是真實的,是虛擬地址的范圍,有時甚至會超過實際物理內存的大小。
現代的操作系統中進程都是在保護模式下運行的,地址空間其實是操作系統給進程用的一段連續的虛擬內存空間。
地址空間最終會通過虛擬內訓映射管理單元映射到物理內存上,因為內核操作的是物理內存。
雖然地址空間的范圍很大,但是進程也不一定有權限訪問全部的地址空間(一般都是只能訪問地址空間中的一些地址區間),
進程能夠訪問的那些地址區間也稱為 內存區域。
進程如果訪問了有效內存區域以外的內容就會報 “段錯誤” 信息。
代碼段:程序代碼在內存中的映射,存放函數體的二進制代碼。
初始化過的數據(Data):在程序運行初已經對變量進行初始化的數據。
未初始化過的數據(BSS):在程序運行初未對變量進行初始化的數據。
棧 (Stack):存儲局部、臨時變量,函數調用時,存儲函數的返回指針,用於控制函數的調用和返回。在程序塊開始時自動分配內存,結束時自動釋放內存,其操作方式類似於數據結構中的棧。
堆 (Heap):存儲動態內存分配,需要程序員手工分配,手工釋放.注意它與數據結構中的堆是兩回事,分配方式類似於鏈表。
每個進程都有自己的地址空間。對32位進程來說,由於32位指針可以表示從0x00000000到0xFFFFFFFF之間的任一值,地址空間的大小為4GB。對64位進程來說,由於64位指針可以表示從0x00000000'00000000到0xFFFFFFFF'FFFFFFFF之間的任一值, 地址空間大小為16GB。其實這個地址空間是不存在的,也就是我們所說的進程虛擬內存空間。
操作系統內核為每個被創建的進程都建立一個PCB(進程控制塊或進程描述符)來保存與其相關的信息,PCB存在於進程的高 1 G空間,也就是內核空間中。
(3)進程的狀態
進程基本的狀態有5種。分別為初始態,就緒態,運行態,掛起態與終止態。其中初始態為進程准備階段,常與就緒態結合來看。
(4)進程並發
在使用進程實現並發時會出現什么問題呢?
1:系統開銷比較大,占用資源比較多,開啟進程數量比較少。
2:在unix/linux系統下,還會產生"孤兒進程"和"僵屍進程"。
在操作系統運行過程中,可以產生很多的進程。在unix/linux系統中,正常情況下,子進程是通過父進程fork創建的,子進程再創建新的進程。並且父進程永遠無法預測子進程到底什么時候結束。當一個進程完成它的工作終止之后,它的父進程需要調用系統調用取得子進程的終止狀態。
孤兒進程:
父進程先於子進程結束,則子進程成為孤兒進程,子進程的父進程成為init進程,稱為init進程領養孤兒進程。
僵屍進程:
子進程終止,父進程尚未回收,子進程殘留資源(PCB)存放於內核中,變成僵屍(Zombie)進程。
守護進程:
永久運行在系統中,不占用控制終端。不與前台用戶進行交互。通常采用以d結尾命名方法
2.2 線程並發
LWP:light weight process 輕量級的進程,本質仍是進程(Linux下)。
進程:獨立地址空間,擁有PCB
線程:有獨立的PCB,但沒有獨立的地址空間(共享)
區別:在於是否共享地址空間。獨居(進程);合租(線程)。
線程:最小的執行單位
進程:最小分配資源單位,可看成是只有一個線程的進程。
一個線程是一個執行空間,這個空間會被操作系統調度來運行函數中所寫的代碼。每個進程至少包含一個線程,每個進程的初識線程被稱作主線程。因為執行這個線程的空間是應用程序本身的空間,所以在主線程終止時,應用程序也會終止。操作系統將線程調度到某個處理器上運行,這個處理器並不一定是進程所在的處理器。不同的操作系統使用的線程調度算法一般都不一樣,但這種不同會被操作系統屏蔽,並不會展示給程序員。
(1)線程同步
同步即協同步調,按預定的先后次序運行。
線程同步,指一個線程發出某一功能調用時,在沒有得到結果之前,該調用不返回。同時其它線程為保證數據一致性,不能調用該功能。
舉例1:銀行存款5000。櫃台,折:取3000;提款機,卡:取3000。剩余:2000
舉例2:內存中100字節,線程T1欲填入全1,線程T2欲填入全0。但如果T1執行了50個字節失去cpu,T2執行,會將T1寫過的內容覆蓋。當T1再次獲得cpu繼續 從失去cpu的位置向后寫入1,當執行結束,內存中的100字節,既不是全1,也不是全0。
產生的現象叫做"與時間有關的錯誤"(time related)。為了避免這種數據混亂,線程需要同步。
"同步"的目的,是為了避免數據混亂,解決與時間有關的錯誤。實際上,不僅線程間需要同步,進程間、信號間等等都需要同步機制。
因此,所有"多個控制流,共同操作一個共享資源"的情況,都需要同步,同步的方式一般是加鎖(這個會在后面介紹道)。
2.3 協成並發
協程:coroutine。也叫輕量級線程。
與傳統的系統級線程和進程相比,協程最大的優勢在於“輕量級”。可以輕松創建上萬個而不會導致系統資源衰竭。而線程和進程通常很難超過1萬個。這也是協程別稱“輕量級線程”的原因。
一個線程中可以有任意多個協程,但某一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源。
多數語言在語法層面並不直接支持協程,而是通過庫的方式支持,但用庫的方式支持的功能也並不完整,比如僅僅提供協程的創建、銷毀與切換等能力。如果在這樣的輕量級線程中調用一個同步IO 操作,比如網絡通信、本地文件讀寫,都會阻塞其他的並發執行輕量級線程,從而無法真正達到輕量級線程本身期望達到的目標。
在協程中,調用一個任務就像調用一個函數一樣,消耗的系統資源最少!但能達到進程、線程並發相同的效果。
在一次並發任務中,進程、線程、協程均可以實現。從系統資源消耗的角度出發來看,進程相當多,線程次之,協程最少。
三.Go並發
Go 在語言級別支持協程,叫goroutine。
goroutine是Go語言並發設計的核心,有人稱之為go程。Goroutine從量級上看很像協程,它比線程更小,十幾個goroutine可能體現在底層就是五六個線程,Go語言內部幫你實現了這些goroutine之間的內存共享。執行goroutine只需極少的棧內存(大概是4~5KB),當然會根據相應的數據伸縮。也正因為如此,可同時運行成千上萬個並發任務。goroutine比thread更易用、更高效、更輕便。
一般情況下,一個普通計算機跑幾十個線程就有點負載過大了,但是同樣的機器卻可以輕松地讓成百上千個goroutine進行資源競爭。
Go語言中的並發指的是能讓某個函數獨立於其他函數運行的能力。當一個函數創建為goroutine時,Go會將其視為一個獨立的工作單元。這個單元會被調度到可用的邏輯處理器上執行。Go語言運行時的調度器是一個復雜的軟件,能管理被創建的所有goroutine並為其分配執行時間。這個調度器在操作系統之上,將操作系統線程和語言運行時的邏輯處理器綁定,並在邏輯處理器上運行goroutine。調度器在任何給定的時間,都會全面控制哪個goroutine要在哪個邏輯處理器上運行。
Go語言的並發同步模型來自一個叫做通信順序進程(Communicating Sequential Process,CSP)的范型(paradigm)。CSP是一種消息傳遞模型,通過在goroutine之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。用於在goroutine之間同步和傳遞數據的關鍵數據類型叫做通道(channel,這個會在后面講到)。使用通道可以使編寫並發程序更容易,也能夠讓並發程序更少出錯。
操作系統會在物理處理器上調度線程來運行,而Go語言在運行時會在邏輯處理器上調度goroutine來運行。每個邏輯處理器都會分別綁定到單個操作系統線程。在1.5版本上,Go語言的運行默認會為每個可用的物理處理器分配一個邏輯處理器。在1.5版本之前的版本中,默認給整個應用程序只分配一個邏輯處理器。這些邏輯處理器會用於執行所用被創建的goroutine。即便只有一個邏輯處理器,Go也可以以神奇的效率和性能,並發調度無數個goroutine。
在下圖中,可以看到操作系統線程,邏輯處理器和本地運行隊列之間的關系。如果創建一個goroutine並准備運行,這個goroutine就會被放到調度器的全局運行隊列中。之后,調度器就會將這些隊列中的goroutine分配給一個邏輯處理器,並放到這個邏輯處理器對應的本地運行隊列中。本地運行隊列中的goroutine會一直等待直到自己被分配到邏輯處理器執行。
有時,正在運行的goroutine需要執行一個阻塞的系統調用,如打開一個文件。當這類調用發生時,線程和goroutine會從邏輯處理器上分離,該線程會繼續阻塞,等待系統調用的返回。與此同時,這個邏輯處理器就會失去了用來運行的線程。所以,調度器會創建一個新線程,並將其綁定到該邏輯處理器上。之后,調度器會從本地運行隊列里選擇另一個goruntine來運行。一旦被阻塞的系統調用執行完並返回,對應的goruntine就會放回到本地運行隊列中,而之前的線程會保存好,以便之后可以繼續使用。
(1)創建goroutine
只需在函數調⽤語句前添加go 關鍵字,就可創建並發執⾏單元。開發⼈員無需了解任何執⾏細節,調度器會自動將其安排到合適的系統線程上執行。
在並發編程中,我們通常想將一個過程切分成幾塊,然后讓每個goroutine各自負責一塊工作,當一個程序啟動時,主函數在一個單獨的goroutine中運行,我們叫它main goroutine。新的goroutine會用go語句來創建。而go語言的並發設計,讓我們很輕松就可以達成這一目的。
示例如下:
import ( "fmt" "time" ) func singing() { for i := 0;i < 5;i++{ fmt.Println("----正在唱歌:人猿泰山----") time.Sleep(time.Millisecond * 30) } } func danceing() { for j := 0;j < 5;j++{ fmt.Println("====正在跳舞:趙四街舞====") time.Sleep(time.Millisecond * 30) } } func main() { go singing() go danceing() }
但這時我們執行發現並沒有內容輸出,是我們的語法有什么問題嗎,並不是,是因為在主go程啟動兩個子go程后,主go程就結束了,goroutine退出后,其它的工作goroutine也會自動退出 ,所以就沒有結果輸出了,這就是goruntine的特性:主
為了防止這種現象,我們需要主go程后於子go程結束,我們暫時先可以在主go程中加上死循環,等后面介紹過通道后,就可以用通道來實現控制主子go程結束的先后循序。
package main import ( "fmt" "runtime" "time" ) func singing() { for i := 0;i < 5;i++{ fmt.Println("----正在唱歌:人猿泰山----") time.Sleep(time.Millisecond * 30) } } func danceing() { for j := 0;j < 5;j++{ fmt.Println("====正在跳舞:趙四街舞====") time.Sleep(time.Millisecond * 30) } } func main() { go singing() go danceing()
//保證主go程不先於子go程結束 for{ runtime.GC() } }
結果如下:
----正在唱歌:人猿泰山---- ====正在跳舞:趙四街舞==== ----正在唱歌:人猿泰山---- ====正在跳舞:趙四街舞==== ====正在跳舞:趙四街舞==== ----正在唱歌:人猿泰山---- ====正在跳舞:趙四街舞==== ----正在唱歌:人猿泰山---- ----正在唱歌:人猿泰山---- ====正在跳舞:趙四街舞====
通過發現程序中的子go程是並行執行的。
(2)Goexit()函數
調用runtime.Goexit() 將立即終止當前goroutine 執⾏,調度器確保所有已注冊defer 延遲調用被執行。
import ( "fmt" "runtime" "time" ) func test() { defer fmt.Println("子go程結束") fmt.Println("子go程即將結束") runtime.Goexit() } func main() { //匿名子go程 go func() { for i := 0;i < 10;i++{ fmt.Println(i) if i == 5{ test() } time.Sleep(time.Millisecond * 100) } }() for { runtime.GC() } }
結果如下:
0 1 2 3 4 5 子go程即將結束 子go程結束
四.channel
channel是Go語言中的一個核心類型,可以把它看成管道。並發核心單元通過它就可以發送或者接收數據進行通訊,這在一定程度上又進一步降低了編程的難度。
channel是一個數據類型,主要用來解決go程的同步問題以及go程之間數據共享(數據傳遞)的問題。
goroutine運行在相同的地址空間,因此訪問共享內存必須做好同步。goroutine 奉行通過通信來共享內存,而不是共享內存來通信。
引⽤類型channel可用於多個goroutine 通訊。其內部實現了同步,確保並發安全。
4.1 定義channel變量
和map類似,channel也一個對應make創建的底層數據結構的引用。
當我們復制一個channel或用於函數參數傳遞時,我們只是拷貝了一個channel引用,因此調用者和被調用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。
定義一個channel時,也需要定義發送到channel的值的類型。channel可以使用內置的make()函數來創建:
make(chan Type) //等價於make(chan Type, 0) make(chan Type, capacity)
chan是創建channel所需使用的關鍵字。Type 代表指定channel收發數據的類型。
例子:
ch1 := make(chan int) ch2 := make(chan string,0)
當參數capacity=0 時,channel是無緩沖阻塞讀寫的;當capacity > 0 時,channel 有緩沖、是非阻塞的,直到寫滿capacity個元素才阻塞寫入。
channel非常像生活中的管道,一邊可以存放東西,另一邊可以取出東西。channel通過操作符<- 來接收和發送數據,發送和接收數據語法:
讀channel:
寫channel:
channel的特性:
通道中的數據只能讀取一次,不能重復讀。先進先出。
讀端 和 寫端在不同的 goroutine 之間。
讀端讀,寫端不在線,讀端阻塞。寫端寫,讀端不在線,寫端阻塞。
默認情況下,channel接收和發送數據都是阻塞的,除非另一端已經准備好,這樣就使得goroutine同步變的更加的簡單,而不需要顯式的lock。
示例如下:
import ( "fmt" "time" ) func main() { ch := make(chan string) go func() { defer fmt.Println("子go程結束,寫數據給主go程") for i := 0;i < 3;i++{ fmt.Println(i) time.Sleep(time.Second * 2) } ch <- "子go程打印3次數據完畢" }() str := <- ch fmt.Println("主go程接收到數據:",str) }
結果如下:
0 1 2 子go程結束,寫數據給主go程 主go程接收到數據: 子go程打印3次數據完畢
我們發現主go程在子go程輸出完三次數據后才結束,我們並沒有在主go程中添加死循環來讓主go程后於子go程結束,只是通過通道實現了控制兩個go程到執行順序。
通道channel不僅可以實現goruntine之間的同步,還可以實現goruntine之間的數據通信,示例如下:
import "fmt" func main() { //通道ch1:用於兩個goruntine之間傳遞數據 ch1 := make(chan int) //通道ch2:協調兩個goruntine之間使用stdout ch2 := make(chan bool) //定義匿名子go程 go func() { for i := 0;i < 3;i++{ ch1 <- i fmt.Println("子go程向主go程傳遞:",i) ch2 <- false } }() //因為子go程向主go程傳遞3次數據,所以主go程要循環3次接收 for j := 0;j < 3;j++{ num := <- ch1 <- ch2 fmt.Println("主go程讀到:",num) } }
結果如下:
子go程向主go程傳遞: 0 主go程讀到: 0 子go程向主go程傳遞: 1 主go程讀到: 1 子go程向主go程傳遞: 2 主go程讀到: 2
上面的程序定義通道ch2的目的是為協調主子go程使用標准輸出的順序,子go程先使用標准輸出,因為在這里標准輸出是公共資源,多個go程調用公共資源需要同步,否則就會發生競爭。
4.2 無緩沖channel
無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何數據值的通道。
這種類型的通道要求發送goroutine和接收goroutine同時准備好,才能完成發送和接收操作。否則,通道會導致先執行發送或接收操作的goroutine 阻塞等待。
這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。
阻塞:由於某種原因數據沒有到達,當前go程(線程)持續處於等待狀態,直到條件滿足,才解除阻塞。
同步:在兩個或多個go程(線程)間,保持數據內容一致性的機制。
下圖展示兩個goroutine 如何利用無緩沖的通道來共享一個值:
在第1步:兩個goruntine都到達通道,但哪個都沒有開始執行發送或者接受。
在第2步:左側的goruntine將它的手伸進了通道,這模擬了向通道發送數據的行為。這時,這個goruntine會在通道中被鎖住,知道交換完成。
在第3步:右側的goroutine 將它的手放入通道,這模擬了從通道里接收數據。這個goroutine 一樣也會在通道中被鎖住,直到交換完成。
在第4 步和第5 步,進行交換,並最終,在第6 步,兩個goroutine都將它們的手從通道里拿出來,這模擬了被鎖住的goroutine 得到釋放。兩個goroutine 現在都可以去做其他事情了。
無緩沖的channel創建格式:
make(chan Type) //等價於make(chan Type, 0)
如果沒有指定緩沖區容量,那么該通道就是同步的,因此會阻塞到發送者准備好發送和接收者准備好接收。
示例代碼:
import ( "fmt" ) func main() { //定義無緩沖channel ch1 := make(chan int) //等價於:ch := make(chan int,0) ch2 := make(chan bool) fmt.Println("len=",len(ch1),"cap=",cap(ch1)) go func() { for i := 0;i < 5;i++{ ch1 <- i fmt.Println("---- len=",len(ch1),"cap=",cap(ch1),"i=",i) ch2 <- false } }() for j := 0;j < 5;j++{ num := <- ch1 <- ch2 fmt.Println("==== len=",len(ch1),"cap=",cap(ch1),"num=",num) } }
結果如下:
len= 0 cap= 0 ---- len= 0 cap= 0 i= 0 ==== len= 0 cap= 0 num= 0 ---- len= 0 cap= 0 i= 1 ==== len= 0 cap= 0 num= 1 ---- len= 0 cap= 0 i= 2 ==== len= 0 cap= 0 num= 2 ---- len= 0 cap= 0 i= 3 ==== len= 0 cap= 0 num= 3 ---- len= 0 cap= 0 i= 4 ==== len= 0 cap= 0 num= 4
4.3 有緩沖的channel
有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個數據值的通道。
這種類型的通道並不強制要求goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也不同。
只有通道中沒有要接收的值時,接收動作才會阻塞。
只有通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。
這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的goroutine 會在同一時間進行數據交換;有緩沖的通道沒有這種保證。
示例圖如下:
在第1 步,右側的goroutine 正在從通道接收一個值。
在第2 步,右側的這個goroutine獨立完成了接收值的動作,而左側的goroutine 正在發送一個新值到通道里。
在第3 步,左側的goroutine 還在向通道發送新值,而右側的goroutine 正在從通道接收另外一個值。這個步驟里的兩個操作既不是同步的,也不會互相阻塞。
最后,在第4 步,所有的發送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。
有緩沖的channel創建格式:
make(chan Type, capacity)
如果給定了一個緩沖區容量,通道就是異步的。只要緩沖區有未使用空間用於發送數據,或還包含可以接收的數據,那么其通信就會無阻塞地進行。
借助函數len(ch)求取緩沖區中剩余元素個數,cap(ch) 求取緩沖區元素容量大小。
示例如下:
import (
"fmt"
"time"
)
func main() {
//定義有緩沖channel,初識容量為3
ch1 := make(chan int,3)
ch2 := make(chan bool)
fmt.Println("len=",len(ch1),"cap=",cap(ch1))
go func() {
for i := 0;i < 7;i++{
ch1 <- i
fmt.Println("---- len=",len(ch1),"cap=",cap(ch1),"i=",i)
}
ch2 <- false
}()
time.Sleep(time.Second * 3)
for j := 0;j < 7;j++{
num := <- ch1
fmt.Println("==== len=",len(ch1),"cap=",cap(ch1),"num=",num)
}
<-ch2
}
結果如下:
len= 0 cap= 3 ---- len= 1 cap= 3 i= 0 ---- len= 2 cap= 3 i= 1 ---- len= 3 cap= 3 i= 2 ==== len= 3 cap= 3 num= 0 ---- len= 3 cap= 3 i= 3 ==== len= 2 cap= 3 num= 1 ==== len= 2 cap= 3 num= 2 ==== len= 1 cap= 3 num= 3 ==== len= 0 cap= 3 num= 4 ---- len= 3 cap= 3 i= 4 ---- len= 0 cap= 3 i= 5 ---- len= 1 cap= 3 i= 6 ==== len= 1 cap= 3 num= 5 ==== len= 0 cap= 3 num= 6
4.4 關閉channel
如果發送者知道,沒有更多的值需要發送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現。
示例如下:
import "fmt" func main() { ch := make(chan int) go func() { for i := 0;i < 5;i++{ ch <- i } close(ch) }() for{ if data,status := <- ch;status{ fmt.Println(data) }else { break } } fmt.Println("Finished") }
結果如下:
0 1 2 3 4 Finished
注意:
channel不像文件一樣需要經常去關閉,只有當你確實沒有任何發送數據了,或者你想顯式的結束range循環之類的,才去關閉channel;
關閉channel后,無法向channel 再發送數據(引發panic 錯誤后導致接收立即返回零值);
關閉channel后,可以繼續從channel接收數據;
對於nil channel,無論收發都會被阻塞。
也可以使用range來迭代不斷操作channel:
import ( "fmt" ) func main() { ch := make(chan int) go func() { for i := 0;i < 5;i++{ ch <- i } close(ch) }() for data := range ch{ fmt.Println(data) } fmt.Println("Finished") }
4.5 單項channel
默認情況下,通道channel是雙向的,也就是,既可以往里面發送數據也可以同里面接收數據。
但是,我們經常見一個通道作為參數進行傳遞而只希望對方是單向使用的,要么只讓它發送數據,要么只讓它接收數據,這時候我們可以指定通道的方向。
單向channel變量的聲明非常簡單,如下:
var ch1 chan int // ch1是一個正常的channel,是雙向的 var ch2 chan<- float64 // ch2是單向channel,只用於寫float64數據 var ch3 <-chan int // ch3是單向channel,只用於讀int數據
chan<- 表示數據進入管道,要把數據寫進管道,對於調用者就是輸出。
<-chan 表示數據從管道出來,對於調用者就是得到管道的數據,當然就是輸入。
可以將channel 隱式轉換為單向隊列,只收或只發,不能將單向channel 轉換為普通channel:
c := make(chan int, 3) var send chan<- int = c // send-only var recv <-chan int = c // receive-only send <- 1 //<-send //invalid operation: <-send (receive from send-only type chan<- int) <-recv //recv <- 2 //invalid operation: recv <- 2 (send to receive-only type <-chan int)
單項channel示例如下:
import ( "fmt" ) func sendto(out chan <- int) { for i := 0;i < 5;i++{ out <- i } close(out) } func receivefrom(in <- chan int) { for data := range in{ fmt.Println("從子go程接收到:",data) } } func main() { ch := make(chan int) go sendto(ch) receivefrom(ch) }
結果如下:
從子go程接收到: 0 從子go程接收到: 1 從子go程接收到: 2 從子go程接收到: 3 從子go程接收到: 4
(1)生產者和消費者模型
單向channel最典型的應用是“生產者消費者模型”
所謂“生產者消費者模型”: 某個模塊(函數等)負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、go程、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。
單單抽象出生產者和消費者,還夠不上是生產者/消費者模型。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。大概的結構如下圖:
舉一個寄信的例子來輔助理解一下,假設你要寄一封平信,大致過程如下:
1.把信寫好——相當於生產者制造數據
2.把信放入郵筒——相當於生產者把數據放入緩沖區
3.郵遞員把信從郵筒取出——相當於消費者把數據取出緩沖區
4.郵遞員把信拿去郵局做相應的處理——相當於消費者處理數據
那么,這個緩沖區有什么用呢?為什么不讓生產者直接調用消費者的某個函數,直接把數據傳遞過去,而畫蛇添足般的設置一個緩沖區呢?
緩沖區的好處大概如下:
1:解耦
假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會直接影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合度也就相應降低了。
接着上述的例子,如果不使用郵筒(緩沖區),須得把信直接交給郵遞員。那你就必須要認識誰是郵遞員。這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識下一個郵遞員(相當於消費者變化導致修改生產者代碼)。而郵筒相對來說比較固定,你依賴它的成本也比較低(相當於和緩沖區之間的弱耦合)。
2:處理並發
生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者只能無端浪費時間。
使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。
其實最當初這個生產者消費者模式,主要就是用來處理並發問題的。
從寄信的例子來看。如果沒有郵筒,你得拿着信傻站在路口等郵遞員過來收(相當於生產者阻塞);又或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。
3:緩存
如果生產者制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。
假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。
示例如下:
import "fmt" //定義生產者 func producer(in chan <- int) { for i := 0;i < 10;i++{ fmt.Println("------生產了:",i) in <- i } close(in) } //定義消費者 func consumer(out <- chan int) { for data := range out{ fmt.Println("======消費了:",data*data) } } func main() { //定義公共區(緩沖區) ch := make(chan int,5) //生成生產者 go producer(ch) //生成消費者 consumer(ch) }
結果如下:
------生產了: 0 ------生產了: 1 ======消費了: 0 ======消費了: 1 ------生產了: 2 ------生產了: 3 ------生產了: 4 ------生產了: 5 ------生產了: 6 ======消費了: 4 ======消費了: 9 ======消費了: 16 ======消費了: 25 ======消費了: 36 ------生產了: 7 ------生產了: 8 ------生產了: 9 ======消費了: 49 ======消費了: 64 ======消費了: 81
簡單說明:首先創建一個雙向的channel,然后開啟一個新的goroutine,把雙向通道作為參數傳遞到producer方法中,同時轉成只寫通道。子go程開始執行循環,向只寫通道中添加數據,這就是生產者。主go程,直接調用consumer方法,該方法將雙向通道轉成只讀通道,通過循環每次從通道中讀取數據,這就是消費者。
注意:channel作為參數傳遞,是引用傳遞。
4.6 定時器
(1)time.Timer
Timer是一個定時器。代表未來的一個單一事件,你可以告訴timer你要等待多長時間。
type Timer struct { C <-chan Time r runtimeTimer }
它提供一個channel,在定時時間到達之前,沒有數據寫入timer.C會一直阻塞。直到定時時間到,系統會自動向timer.C 這個channel中寫入當前時間,阻塞即被解除。
示例如下:
func main() { //創建定時器,指定定時時長 timer := time.NewTimer(time.Second * 3) fmt.Println(time.Now().Format("2006-01-02 15:04:05")) // 從 timer的 C 中讀. 定時時間到達后,系統會自動寫入當前時間到 C 中 t := <- timer.C fmt.Println(t.Format("2006-01-02 15:04:05")) }
結果如下:
2019-07-19 20:19:36 2019-07-19 20:19:39
time.After()可以合並上面兩個步驟
func main() { fmt.Println(time.Now().Format("2006-01-02 15:04:05")) //把3秒后的時間寫入到t中 t := <- time.After(time.Second * 3) fmt.Println(t.Format("2006-01-02 15:04:05")) }
結果如下:
2019-07-19 20:22:31 2019-07-19 20:22:34
time.Stop()可以停止定時器
func main() { timer := time.NewTimer(time.Second * 5) fmt.Println(time.Now().Format("2006-01-02 15:04:05")) //停止計時器 timer.Stop() fmt.Println(time.Now().Format("2006-01-02 15:04:05")) }
結果如下:
2019-07-19 20:27:06 2019-07-19 20:27:06
timer.Reset()可以重置定時器
func main() { timer := time.NewTimer(time.Second * 5) fmt.Println(time.Now().Format("2006-01-02 15:04:05")) //重制計時器 timer.Reset(time.Second * 2) t := <- timer.C fmt.Println(t.Format("2006-01-02 15:04:05")) }
結果如下:
2019-07-19 20:31:45 2019-07-19 20:31:47
(2)time.Ticker
Ticker是一個周期觸發定時的計時器,它會按照一個時間間隔往channel發送系統當前時間,而channel的接收者可以以固定的時間間隔從channel中讀取事件。
func main() { //控制主子go程結束的先后順序 ch := make(chan bool) timer := time.NewTicker(time.Second * 1) i := 0 go func() { for{ <-timer.C i++ fmt.Println("i = ",i) if i == 5{ timer.Stop() ch <- false runtime.Goexit() } } }() <-ch }
結果如下:
i = 1 i = 2 i = 3 i = 4 i = 5
五.select
Go里面提供了一個關鍵字select,通過select可以監聽channel上的數據流動。
有時候我們希望能夠借助channel發送或接收數據,並避免因為發送或者接收導致的阻塞,尤其是當channel沒有准備好寫或者讀時。select語句就可以實現這樣的功能。
select的用法與switch語言非常類似,由select開始一個新的選擇塊,每個選擇條件由case語句來描述。
與switch語句相比,select有比較多的限制,其中最大的一條限制就是每個case語句里必須是一個IO操作,大致的結構如下:
select { case <- chan1: // 如果chan1成功讀到數據,則進行該case處理語句 case chan2 <- 1: // 如果成功向chan2寫入數據,則進行該case處理語句 default: // 如果上面都沒有成功,則進入default處理流程 }
1.每一個case分支,都必須一個 IO操作(channel r/w事件)。
2.通常將 select 置於 for 循環中。
3.一個case監聽的 channel 不滿足監聽條件。當前case分支阻塞。
4.當所有case分支都不滿足監聽條件時,select如果包含default分支,走default;如果沒有default,select等待case。
5.當監聽的多個case分支中,同時有多個case滿足,隨機選擇任一一個執行。
6.為防止忙輪詢,可以適當選擇省略 default
示例如下:
import ( "fmt" "runtime" ) func main() { ch1 := make(chan int) ch2 := make(chan bool) go func() { for{ fmt.Println("===================") select { case num := <- ch1: fmt.Println("num = ",num) case ch2 <- false: fmt.Println("子go程結束") runtime.Goexit() } } }() for i := 0;i < 10;i++{ ch1 <- i if i == 5{ <- ch2 break } } fmt.Println("finish") }
結果如下:
=================== num = 0 =================== num = 1 =================== num = 2 =================== num = 3 =================== num = 4 =================== num = 5 =================== finish 子go程結束
之后用select實現輸出斐波那契數列的前15位,代碼如下:
import ( "fmt" "runtime" ) func main() { ch1 := make(chan int) ch2 := make(chan bool) go func() { for{ select { case num := <- ch1: fmt.Println(num) case ch2 <- false: runtime.Goexit() } } }() x,y := 1,1 for i := 0;i < 15;i++{ ch1 <- x x,y = y,x+y } <-ch2 }
得到結果如下:
1 1 2 3 5 8 13 21 34 55 89 144 233 377 610
有時候會出現goroutine阻塞的情況,那么我們如何避免整個程序進入阻塞的情況呢?我們可以利用select來設置超時,通過如下的方式實現:
監聽超時定時器:case <-time.After(time.Second * 3)
當select監聽的其他case分支滿足時,time.After所在的case分支,會被重置成初始定時時長。
示例如下:
import ( "fmt" "time" ) func main() { ch1 := make(chan int) ch2 := make(chan bool) go func() { for{ select { case num := <- ch1: fmt.Println("num = ",num) case <- time.After(time.Second * 3): fmt.Println("子go程讀到系統時間, 定時滿 3 秒") ch2 <- false } } }() for i := 0;i < 2;i++{ ch1 <- i time.Sleep(time.Second*2) } <-ch2 fmt.Println("finish") }
六.鎖和條件變量
前面我們為了解決go程同步的問題我們使用了channel,但是GO也提供了傳統的同步工具,就是鎖。
它們都在GO的標准庫代碼包sync和sync/atomic中。
我們看一下鎖的應用。
是鎖呢?就是某個go程(線程)在訪問某個資源時先鎖住,防止其它go程的訪問,等訪問完畢解鎖后其他go程再來加鎖進行訪問。這和我們生活中加鎖使用公共資源相似,例如:公共衛生間。
6.1 死鎖
首先,死鎖不是鎖的一種,是錯誤使用鎖的現象。
死鎖是指兩個或兩個以上的進程在執行過程中,由於競爭資源或者由於彼此通信而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖。
先面列舉幾個造成死鎖的現象:
(1)
示例如下:
func main() { ch := make(chan int) ch <- 10 num := <- ch fmt.Println(num) }
(2)
示例如下:
func main() { ch := make(chan int) num := <- ch go func() { ch <- 10 }() fmt.Println(num) }
(3)
func main() { ch1 := make(chan int) ch2 := make(chan int) go func() { for i := 0;i < 10;i++{ num := <- ch2 fmt.Println(num) ch1 <- i } }() for data := range ch1{ fmt.Println(data) ch2 <- 4096 } }
(4)
每個資源都對應於一個可稱為"互斥鎖" 的標記,這個標記用來保證在任意時刻,只能有一個go程(線程)訪問該資源。其它的go程只能等待。
互斥鎖是傳統並發編程對共享資源進行訪問控制的主要手段,它由標准庫sync中的Mutex結構體類型表示。sync.Mutex類型只有兩個公開的指針方法,Lock和Unlock。Lock鎖定當前的共享資源,Unlock進行解鎖。
在使用互斥鎖時,一定要注意:對資源操作完成后,一定要解鎖,否則會出現流程執行異常,死鎖等問題。通常借助defer。鎖定后,立即使用defer語句保證互斥鎖及時解鎖。如下所示:
var mutex sync.Mutex // 定義互斥鎖變量 mutex func write(){ mutex.Lock( ) defer mutex.Unlock( ) }
示例如下:
import ( "fmt" "sync" "time" ) //定義互斥鎖 var mutex sync.Mutex //定義一個channel用開控制主,子go程結束的先后順序 var ch = make(chan bool) func printer(str string) { mutex.Lock() defer mutex.Unlock() for _,ch := range str{ fmt.Printf("%c",ch) time.Sleep(time.Millisecond*200) } } func user1() { printer("hello") ch <- false } func user2() { printer("world") ch<- false } func main() { go user1() go user2() for i := 0;i < 2;i++{ <-ch } }
6.3 讀寫鎖 RWMUTEX
互斥鎖的本質是當一個goroutine訪問的時候,其他goroutine都不能訪問。這樣在資源同步,避免競爭的同時也降低了程序的並發性能。程序由原來的並行執行變成了串行執行。
其實,當我們對一個不會變化的數據只做“讀”操作的話,是不存在資源競爭的問題的。因為數據是不變的,不管怎么讀取,多少goroutine同時讀取,都是可以的。
所以問題不是出在“讀”上,主要是修改,也就是“寫”。修改的數據要同步,這樣其他goroutine才可以感知到。所以真正的互斥應該是讀取和修改、修改和修改之間,讀和讀是沒有互斥操作的必要的。
因此,衍生出另外一種鎖,叫做讀寫鎖。
讀寫鎖可以讓多個讀操作並發,同時讀取,但是對於寫操作是完全互斥的。也就是說,當一個goroutine進行寫操作的時候,其他goroutine既不能進行讀操作,也不能進行寫操作。
GO中的讀寫鎖由結構體類型sync.RWMutex表示。此類型的方法集合中包含兩對方法:
一組是對寫操作的鎖定和解鎖,簡稱“寫鎖定”和“寫解鎖”:
func (*RWMutex)Lock() func (*RWMutex)Unlock()
另一組表示對讀操作的鎖定和解鎖,簡稱為“讀鎖定”與“讀解鎖”:
func (*RWMutex)RLock() func (*RWMutex)RUnlock()
讀寫鎖基本示例:
import ( "fmt" "math/rand" "runtime" "sync" ) var count int var rwmutex sync.RWMutex func Read(n int) { rwmutex.RLock() defer rwmutex.RUnlock() fmt.Printf("讀goruntine %d 正在讀取數據...\n",n) num := count fmt.Printf("讀goroutine %d 讀取數據結束,讀到 %d\n",n,num) } func Write(n int) { rwmutex.Lock() defer rwmutex.Unlock() fmt.Printf("寫goruntine %d 正在寫數據...\n",n) num := rand.Intn(1000) count = num fmt.Printf("寫goroutine %d 寫數據結束,寫入新值 %d\n",n,num) } func main() { for i:=0;i<5;i++{ go Read(i+1) } for j:=0;j<5;j++{ go Write(j+1) } for{ runtime.GC() } }
結果如下:
讀goruntine 2 正在讀取數據... 讀goroutine 2 讀取數據結束,讀到 0 讀goruntine 1 正在讀取數據... 讀goroutine 1 讀取數據結束,讀到 0 寫goruntine 1 正在寫數據... 寫goroutine 1 寫數據結束,寫入新值 81 讀goruntine 3 正在讀取數據... 讀goroutine 3 讀取數據結束,讀到 81 讀goruntine 4 正在讀取數據... 讀goroutine 4 讀取數據結束,讀到 81 讀goruntine 5 正在讀取數據... 讀goroutine 5 讀取數據結束,讀到 81 寫goruntine 3 正在寫數據... 寫goroutine 3 寫數據結束,寫入新值 887 寫goruntine 2 正在寫數據... 寫goroutine 2 寫數據結束,寫入新值 847 寫goruntine 4 正在寫數據... 寫goroutine 4 寫數據結束,寫入新值 59 寫goruntine 5 正在寫數據... 寫goroutine 5 寫數據結束,寫入新值 81
我們在read里使用讀鎖,也就是RLock和RUnlock,寫鎖的方法名和我們平時使用的一樣,是Lock和Unlock。這樣,我們就使用了讀寫鎖,可以並發地讀,但是同時只能有一個寫,並且寫的時候不能進行讀操作。
我們從結果可以看出,讀取操作可以並行,例如2,3,1正在讀取,但是同時只能有一個寫,例如1正在寫,只能等待1寫完,這個過程中不允許進行其它的操作。
處於讀鎖定狀態,那么針對它的寫鎖定操作將永遠不會成功,且相應的Goroutine也會被一直阻塞。因為它們是互斥的。
總結:讀寫鎖控制下的多個寫操作之間都是互斥的,並且寫操作與讀操作之間也都是互斥的。但是,多個讀操作之間不存在互斥關系。
6.4 條件變量
在講解條件變量之前,先回顧一下前面我們所涉及的“生產者消費者模型”:
import ( "fmt" ) //生產者:只寫,不讀 func producer(ch chan <- int) { for i := 1;i <= 10;i++{ ch <- i*i } close(ch) } //消費者:只讀,不寫 func consumer(ch <- chan int) { for data := range ch{ fmt.Println("num:",data) } } func main() { //定義一個雙向channel ch := make(chan int) //創建生產者 go producer(ch) //創建消費者 consumer(ch) }
這個案例中,雖然實現了生產者消費者的功能,但有一個問題。如果有多個消費者來消費數據,並且並不是簡單的從channel中取出來進行打印,而是還要進行一些復雜的運算。在consumer( )方法中的實現是否有問題呢?如下所示:
import ( "fmt" "runtime" ) var sum int //生產者:只寫,不讀 func producer(ch chan <- int) { for i := 1;i <= 100;i++{ ch <- i } close(ch) } //消費者:只讀,不寫 func consumer(ch <- chan int) { for data := range ch{ sum += data } fmt.Println("sum:",sum) } func main() { //定義一個雙向channel ch := make(chan int) //創建生產者 go producer(ch) //創建消費者 go consumer(ch) consumer(ch) for{ runtime.GC() } }
在上面的代碼中,加了一個消費者,同時在consumer方法中,將數據取出來后,又進行了一組運算。這時可能會出現一個go程從通道中取出數據,參與加法運算,但是還沒有算完另外一個go程又從通道中取出一個數據賦值給了data變量。所以這樣累加計算,很有可能出現問題。當然,按照前面的知識,解決這個問題的方法很簡單,就是通過加鎖的方式來解決。增加生產者也是一樣的道理。
另外一個問題,如果消費者比生產者多,倉庫中就會出現沒有數據的情況。我們需要不斷的通過循環來判斷倉庫隊列中是否有數據,這樣會造成cpu的浪費。反之,如果生產者比較多,倉庫很容易滿,滿了就不能繼續添加數據,也需要循環判斷倉庫滿這一事件,同樣也會造成CPU的浪費。
我們希望當倉庫滿時,生產者停止生產,等待消費者消費;同理,如果倉庫空了,我們希望消費者停下來等待生產者生產。為了達到這個目的,這里引入條件變量。(需要注意:如果倉庫隊列用channel,是不存在以上情況的,因為channel被填滿后就阻塞了,或者channel中沒有數據也會阻塞)。
條件變量:條件變量的作用並不保證在同一時刻僅有一個go程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的go程(線程)。條件變量不是鎖,在並發中不能達到同步的目的,因此條件變量總是與鎖一塊使用。
例如,我們上面說的,如果倉庫隊列滿了,我們可以使用條件變量讓生產者對應的goroutine暫停(阻塞),但是當消費者消費了某個產品后,倉庫就不再滿了,應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品。
GO標准庫中的sync.Cond類型代表了條件變量。條件變量要與鎖(互斥鎖,或者讀寫鎖)一起使用。成員變量L代表與條件變量搭配使用的鎖。
type Cond struct { noCopy noCopy // L is held while observing or changing the condition L Locker notify notifyList checker copyChecker }
對應的有3個常用方法,Wait,Signal,Broadcast。
(1)func (c *Cond) Wait()
該函數的作用可歸納為如下三點:
a) 阻塞等待條件變量滿足
b) 釋放已掌握的互斥鎖相當於cond.L.Unlock()。注意:兩步為一個原子操作。要求,在調用wait之前,先加鎖。
c) 當被喚醒,Wait()函數返回時,解除阻塞並重新獲取互斥鎖。相當於cond.L.Lock()
(2)func (c *Cond) Signal()
單發通知,給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知。
(3)func (c *Cond) Broadcast()
廣播通知,給正在等待(阻塞)在該條件變量上的所有goroutine(線程)發送通知。
下面我們用條件變量來編寫一個“生產者消費者模型”
示例代碼:
package main import "fmt" import "sync" import "math/rand" import "time" var cond sync.Cond // 創建全局條件變量 // 生產者 func producer(out chan<- int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖 for len(out) == 3{ // 產品區滿 等待消費者消費 cond.Wait() // 掛起當前go程, 等待條件變量滿足,被消費者喚醒 } num := rand.Intn(1000) // 產生一個隨機數 out <- num // 寫入到 channel 中 (生產) fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩余%d個數據\n", idx, num, len(out)) cond.Signal() // 喚醒 阻塞的 消費者 cond.L.Unlock() // 生產結束,解鎖互斥鎖 time.Sleep(time.Second) // 生產完休息一會,給其他go程執行機會 } } //消費者 func consumer(in <-chan int, idx int) { for { cond.L.Lock() // 條件變量對應互斥鎖加鎖(與生產者是同一個) for len(in) == 0 { // 產品區為空 等待生產者生產 cond.Wait() // 掛起當前go程, 等待條件變量滿足,被生產者喚醒 } num := <-in // 將 channel 中的數據讀走 (消費) fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩余%d個數據\n", idx, num, len(in)) cond.Signal() // 喚醒 阻塞的 生產者 cond.L.Unlock() // 消費結束,解鎖互斥鎖 time.Sleep(time.Millisecond * 500) //消費完 休息一會,給其他go程執行機會 } } func main() { rand.Seed(time.Now().UnixNano()) // 設置隨機數種子 product := make(chan int, 3) // 產品區(公共區)使用channel 模擬 cond.L = new(sync.Mutex) // 創建互斥鎖和條件變量 for i := 0; i < 5; i++ { // 5個消費者 go producer(product, i+1) } for i := 0; i < 3; i++ { // 3個生產者 go consumer(product, i+1) } for { // 主go程阻塞 不結束 runtime.GC() } }
1) main函數中定義死循環,其作用是讓主go程阻塞。
2) 定義product作為隊列,生產者產生數據保存至隊列中,最多存儲3個數據,消費者從中取出數據模擬消費
3) 條件變量要與鎖一起使用,這里定義全局條件變量cond,它有一個屬性:L Locker。是一個互斥鎖。
4) 開啟5個消費者go程,開啟3個生產者go程。
5) producer生產者,在該方法中開啟互斥鎖,保證數據完整性。並且判斷隊列是否滿,如果已滿,調用wait()讓該goroutine阻塞。當消費者取出數后執行cond.Signal(),會喚醒該goroutine,繼續生產數據。
6) consumer消費者,同樣開啟互斥鎖,保證數據完整性。判斷隊列是否為空,如果為空,調用wait()使得當前goroutine阻塞。當生產者產生數據並添加到隊列,執行cond.Signal() 喚醒該goroutine。