Go語言學習之路第10天(Go並發編程)


一.概述

  簡而言之,所謂並發編程是指在一台處理器上"同時"處理多個任務。

  通常程序會被編寫為一個順序執行並完成一個獨立任務的代碼。如果沒有特別的需求,最好總是這樣寫代碼,因為這種類型的程序通常很容易寫,也很容易維護。不過也有一些情況下,並行執行多個任務會有更大的好處。一個例子是,Web服務器需要在各自獨立的套接字(socket)上同時接受多個數據請求。每個套接字的請求都是獨立的,可以完全獨立於其他套接字進行處理。具有並行執行多個請求的能力可以顯著提高這類系統的性能。考慮到這一點,Go語言的語法和運行時直接內置了對並發的支持。

  宏觀的並發是指在一段時間內,有多個程序在同時運行。

  並發在微觀上,是指在同一時刻只能有一條指令執行,但多個程序指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若干段,使多個程序快速交替的執行。

 

1.1 並行和並發

  並行:指在同一時刻,有多條指令在多個處理器上同時執行。 

  並發:指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若干段,通過cpu時間片輪轉使多個進程快速交替的執行。

  並發(concurrency)不是並行(parallelism)。並行是讓不同的代碼片段同時在不同的物理處理器上執行。並行的關鍵是同時做很多事情,而並發是指同時管理很多事情,這些事情可能只做了一半就被暫停去做別的事情了。在很多情況下,並發的效果比並行好,因為操作系統和硬件的總資源一半很少,但能支持系統同時做很多事情。這種"使用較少資源做更多的事情"的哲學,也是指導Go語言設計的哲學。

 

二.常見的並發編程基礎

2.1 進程並發

  (1)程序與進程

  程序:是指編譯好的二進制文件,在磁盤上,不占用系統資源(內存、打開的文件、設備、鎖....),是一個靜態的實體。

  進程:是指一個程序在運行時所需要和維護的資源的集合,是一個動態的實體。

  進程和程序並不是一一對應的,一個程序執行在不同的數據集上就成為不同的進程,可以用進程控制塊(PCB)來唯一地標識每個進程。而這一點正是程序無法做到的,由於程序沒有和數據產生直接的聯系,既使是執行不同的數據的程序,他們的指令的集合依然是一樣的,所以無法唯一地標識出這些運行於不同數據集上的程序。一般來說,一個進程肯定有一個與之對應的程序,而且只有一個。而一個程序有可能沒有與之對應的進程(因為它沒有執行),也有可能有多個進程與之對應(運行在幾個不同的數據集上)。

 

  (2)進程地址空間

  地址空間就是每個進程所能訪問的內存地址范圍。

  這個地址范圍不是真實的,是虛擬地址的范圍,有時甚至會超過實際物理內存的大小。

  現代的操作系統中進程都是在保護模式下運行的,地址空間其實是操作系統給進程用的一段連續的虛擬內存空間。

  地址空間最終會通過虛擬內訓映射管理單元映射到物理內存上,因為內核操作的是物理內存。

  雖然地址空間的范圍很大,但是進程也不一定有權限訪問全部的地址空間(一般都是只能訪問地址空間中的一些地址區間),

  進程能夠訪問的那些地址區間也稱為 內存區域。

  進程如果訪問了有效內存區域以外的內容就會報 “段錯誤” 信息。

 

  代碼段:程序代碼在內存中的映射,存放函數體的二進制代碼。

  初始化過的數據(Data):在程序運行初已經對變量進行初始化的數據。

  未初始化過的數據(BSS):在程序運行初未對變量進行初始化的數據。

  棧 (Stack):存儲局部、臨時變量,函數調用時,存儲函數的返回指針,用於控制函數的調用和返回。在程序塊開始時自動分配內存,結束時自動釋放內存,其操作方式類似於數據結構中的棧。

  堆 (Heap):存儲動態內存分配,需要程序員手工分配,手工釋放.注意它與數據結構中的堆是兩回事,分配方式類似於鏈表。 

 

  每個進程都有自己的地址空間。對32位進程來說,由於32位指針可以表示從0x00000000到0xFFFFFFFF之間的任一值,地址空間的大小為4GB。對64位進程來說,由於64位指針可以表示從0x00000000'00000000到0xFFFFFFFF'FFFFFFFF之間的任一值, 地址空間大小為16GB。其實這個地址空間是不存在的,也就是我們所說的進程虛擬內存空間。


  操作系統內核為每個被創建的進程都建立一個PCB(進程控制塊或進程描述符)來保存與其相關的信息,PCB存在於進程的高 1 G空間,也就是內核空間中。

 

  (3)進程的狀態

  進程基本的狀態有5種。分別為初始態,就緒態,運行態,掛起態與終止態。其中初始態為進程准備階段,常與就緒態結合來看。

 

 

 

  (4)進程並發

  在使用進程實現並發時會出現什么問題呢?

  1:系統開銷比較大,占用資源比較多,開啟進程數量比較少。

  2:在unix/linux系統下,還會產生"孤兒進程"和"僵屍進程"。

  在操作系統運行過程中,可以產生很多的進程。在unix/linux系統中,正常情況下,子進程是通過父進程fork創建的,子進程再創建新的進程。並且父進程永遠無法預測子進程到底什么時候結束。當一個進程完成它的工作終止之后,它的父進程需要調用系統調用取得子進程的終止狀態。

  孤兒進程:

    父進程先於子進程結束,則子進程成為孤兒進程,子進程的父進程成為init進程,稱為init進程領養孤兒進程。

  僵屍進程:

    子進程終止,父進程尚未回收,子進程殘留資源(PCB)存放於內核中,變成僵屍(Zombie)進程。

  守護進程:

    永久運行在系統中,不占用控制終端。不與前台用戶進行交互。通常采用以d結尾命名方法

 

2.2 線程並發

  LWP:light weight process 輕量級的進程,本質仍是進程(Linux下)。

  進程:獨立地址空間,擁有PCB 

  線程:有獨立的PCB,但沒有獨立的地址空間(共享)

  區別:在於是否共享地址空間。獨居(進程);合租(線程)。

    線程:最小的執行單位

    進程:最小分配資源單位,可看成是只有一個線程的進程。

 

  一個線程是一個執行空間,這個空間會被操作系統調度來運行函數中所寫的代碼。每個進程至少包含一個線程,每個進程的初識線程被稱作主線程。因為執行這個線程的空間是應用程序本身的空間,所以在主線程終止時,應用程序也會終止。操作系統將線程調度到某個處理器上運行,這個處理器並不一定是進程所在的處理器。不同的操作系統使用的線程調度算法一般都不一樣,但這種不同會被操作系統屏蔽,並不會展示給程序員。

 

  (1)線程同步

  同步即協同步調,按預定的先后次序運行。

  線程同步,指一個線程發出某一功能調用時,在沒有得到結果之前,該調用不返回。同時其它線程為保證數據一致性,不能調用該功能。

  舉例1:銀行存款5000。櫃台,折:取3000;提款機,卡:取3000。剩余:2000

  舉例2:內存中100字節,線程T1欲填入全1,線程T2欲填入全0。但如果T1執行了50個字節失去cpu,T2執行,會將T1寫過的內容覆蓋。當T1再次獲得cpu繼續   從失去cpu的位置向后寫入1,當執行結束,內存中的100字節,既不是全1,也不是全0。

  產生的現象叫做"與時間有關的錯誤"(time related)。為了避免這種數據混亂,線程需要同步。

  "同步"的目的,是為了避免數據混亂,解決與時間有關的錯誤。實際上,不僅線程間需要同步,進程間、信號間等等都需要同步機制。

  因此,所有"多個控制流,共同操作一個共享資源"的情況,都需要同步,同步的方式一般是加鎖(這個會在后面介紹道)。

 

2.3 協成並發

  協程:coroutine。也叫輕量級線程。

  與傳統的系統級線程和進程相比,協程最大的優勢在於“輕量級”。可以輕松創建上萬個而不會導致系統資源衰竭。而線程和進程通常很難超過1萬個。這也是協程別稱“輕量級線程”的原因。

  一個線程中可以有任意多個協程,但某一時刻只能有一個協程在運行,多個協程分享該線程分配到的計算機資源

  多數語言在語法層面並不直接支持協程,而是通過庫的方式支持,但用庫的方式支持的功能也並不完整,比如僅僅提供協程的創建、銷毀與切換等能力。如果在這樣的輕量級線程中調用一個同步IO 操作,比如網絡通信、本地文件讀寫,都會阻塞其他的並發執行輕量級線程,從而無法真正達到輕量級線程本身期望達到的目標。

  在協程中,調用一個任務就像調用一個函數一樣,消耗的系統資源最少!但能達到進程、線程並發相同的效果。

  在一次並發任務中,進程、線程、協程均可以實現。從系統資源消耗的角度出發來看,進程相當多,線程次之,協程最少。      

 

三.Go並發

  Go 在語言級別支持協程,叫goroutine。

  goroutine是Go語言並發設計的核心,有人稱之為go程。Goroutine從量級上看很像協程,它比線程更小,十幾個goroutine可能體現在底層就是五六個線程,Go語言內部幫你實現了這些goroutine之間的內存共享。執行goroutine只需極少的棧內存(大概是4~5KB),當然會根據相應的數據伸縮。也正因為如此,可同時運行成千上萬個並發任務。goroutine比thread更易用、更高效、更輕便。

  一般情況下,一個普通計算機跑幾十個線程就有點負載過大了,但是同樣的機器卻可以輕松地讓成百上千個goroutine進行資源競爭。

  Go語言中的並發指的是能讓某個函數獨立於其他函數運行的能力。當一個函數創建為goroutine時,Go會將其視為一個獨立的工作單元。這個單元會被調度到可用的邏輯處理器上執行。Go語言運行時的調度器是一個復雜的軟件,能管理被創建的所有goroutine並為其分配執行時間。這個調度器在操作系統之上,將操作系統線程和語言運行時的邏輯處理器綁定,並在邏輯處理器上運行goroutine。調度器在任何給定的時間,都會全面控制哪個goroutine要在哪個邏輯處理器上運行。

  Go語言的並發同步模型來自一個叫做通信順序進程(Communicating Sequential Process,CSP)的范型(paradigm)。CSP是一種消息傳遞模型,通過在goroutine之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。用於在goroutine之間同步和傳遞數據的關鍵數據類型叫做通道(channel,這個會在后面講到)。使用通道可以使編寫並發程序更容易,也能夠讓並發程序更少出錯。

  操作系統會在物理處理器上調度線程來運行,而Go語言在運行時會在邏輯處理器上調度goroutine來運行。每個邏輯處理器都會分別綁定到單個操作系統線程。在1.5版本上,Go語言的運行默認會為每個可用的物理處理器分配一個邏輯處理器。在1.5版本之前的版本中,默認給整個應用程序只分配一個邏輯處理器。這些邏輯處理器會用於執行所用被創建的goroutine。即便只有一個邏輯處理器,Go也可以以神奇的效率和性能,並發調度無數個goroutine。

  在下圖中,可以看到操作系統線程,邏輯處理器和本地運行隊列之間的關系。如果創建一個goroutine並准備運行,這個goroutine就會被放到調度器的全局運行隊列中。之后,調度器就會將這些隊列中的goroutine分配給一個邏輯處理器,並放到這個邏輯處理器對應的本地運行隊列中。本地運行隊列中的goroutine會一直等待直到自己被分配到邏輯處理器執行。

            

  有時,正在運行的goroutine需要執行一個阻塞的系統調用,如打開一個文件。當這類調用發生時,線程和goroutine會從邏輯處理器上分離,該線程會繼續阻塞,等待系統調用的返回。與此同時,這個邏輯處理器就會失去了用來運行的線程。所以,調度器會創建一個新線程,並將其綁定到該邏輯處理器上。之后,調度器會從本地運行隊列里選擇另一個goruntine來運行。一旦被阻塞的系統調用執行完並返回,對應的goruntine就會放回到本地運行隊列中,而之前的線程會保存好,以便之后可以繼續使用。

 

  (1)創建goroutine

  只需在函數調⽤語句前添加go 關鍵字,就可創建並發執⾏單元。開發⼈員無需了解任何執⾏細節,調度器會自動將其安排到合適的系統線程上執行。

  在並發編程中,我們通常想將一個過程切分成幾塊,然后讓每個goroutine各自負責一塊工作,當一個程序啟動時,主函數在一個單獨的goroutine中運行,我們叫它main goroutine。新的goroutine會用go語句來創建。而go語言的並發設計,讓我們很輕松就可以達成這一目的。

  示例如下:

import (
	"fmt"
	"time"
)

func singing()  {
	for i := 0;i < 5;i++{
		fmt.Println("----正在唱歌:人猿泰山----")
		time.Sleep(time.Millisecond * 30)
	}
}

func danceing()  {
	for j := 0;j < 5;j++{
		fmt.Println("====正在跳舞:趙四街舞====")
		time.Sleep(time.Millisecond * 30)
	}
}

func main()  {
	go singing()
	go danceing()
}

  但這時我們執行發現並沒有內容輸出,是我們的語法有什么問題嗎,並不是,是因為在主go程啟動兩個子go程后,主go程就結束了,主go程先於子go程結束運行,自動釋放(0-4G)進程地址空間。子go程沒有內存執行指令,被動結束,所以就沒有結果輸出了,這就是goruntine的特性:主goroutine退出后,其它的工作goroutine也會自動退出

  為了防止這種現象,我們需要主go程后於子go程結束,我們暫時先可以在主go程中加上死循環,等后面介紹過通道后,就可以用通道來實現控制主子go程結束的先后循序。

package main

import (
	"fmt"
	"runtime"
	"time"
)

func singing()  {
	for i := 0;i < 5;i++{
		fmt.Println("----正在唱歌:人猿泰山----")
		time.Sleep(time.Millisecond * 30)
	}
}

func danceing()  {
	for j := 0;j < 5;j++{
		fmt.Println("====正在跳舞:趙四街舞====")
		time.Sleep(time.Millisecond * 30)
	}
}

func main()  {
	go singing()
	go danceing()
     //保證主go程不先於子go程結束 for{ runtime.GC() } }

  結果如下:

----正在唱歌:人猿泰山----
====正在跳舞:趙四街舞====
----正在唱歌:人猿泰山----
====正在跳舞:趙四街舞====
====正在跳舞:趙四街舞====
----正在唱歌:人猿泰山----
====正在跳舞:趙四街舞====
----正在唱歌:人猿泰山----
----正在唱歌:人猿泰山----
====正在跳舞:趙四街舞====

  通過發現程序中的子go程是並行執行的。

 

  (2)Goexit()函數

  調用runtime.Goexit() 將立即終止當前goroutine 執⾏,調度器確保所有已注冊defer 延遲調用被執行。

import (
	"fmt"
	"runtime"
	"time"
)

func test()  {
	defer fmt.Println("子go程結束")
	fmt.Println("子go程即將結束")
	runtime.Goexit()
}

func main()  {
	//匿名子go程
	go func() {
		for i := 0;i < 10;i++{
			fmt.Println(i)
			if i == 5{
				test()
			}
			time.Sleep(time.Millisecond * 100)
		}
	}()

	for {
		runtime.GC()
	}
}

  結果如下:

0
1
2
3
4
5
子go程即將結束
子go程結束

 

四.channel

  channel是Go語言中的一個核心類型,可以把它看成管道。並發核心單元通過它就可以發送或者接收數據進行通訊,這在一定程度上又進一步降低了編程的難度。

  channel是一個數據類型,主要用來解決go程的同步問題以及go程之間數據共享(數據傳遞)的問題。

  goroutine運行在相同的地址空間,因此訪問共享內存必須做好同步。goroutine 奉行通過通信來共享內存,而不是共享內存來通信。

  引⽤類型channel可用於多個goroutine 通訊。其內部實現了同步,確保並發安全。

 

 

4.1 定義channel變量

  和map類似,channel也一個對應make創建的底層數據結構的引用

  當我們復制一個channel或用於函數參數傳遞時,我們只是拷貝了一個channel引用,因此調用者和被調用者將引用同一個channel對象。和其它的引用類型一樣,channel的零值也是nil。

  定義一個channel時,也需要定義發送到channel的值的類型。channel可以使用內置的make()函數來創建:

make(chan Type)  //等價於make(chan Type, 0)
make(chan Type, capacity)

  chan是創建channel所需使用的關鍵字。Type 代表指定channel收發數據的類型。

  例子

ch1 := make(chan int)
ch2 := make(chan string,0)

  當參數capacity=0 時,channel是無緩沖阻塞讀寫的;當capacity > 0 時,channel 有緩沖、是非阻塞的,直到寫滿capacity個元素才阻塞寫入。

 

  channel非常像生活中的管道,一邊可以存放東西,另一邊可以取出東西。channel通過操作符<- 來接收和發送數據,發送和接收數據語法:

  讀channel:

    <-ch1 讀到數據,丟棄

    num := <-ch1 讀到數據,存入 num中

  寫channel:

    ch1 <- data data類型嚴格與 定義的語法一致

 

  channel的特性:

 

    通道中的數據只能單向流動。一端讀端、另外必須寫端。

    通道中的數據只能讀取一次,不能重復讀。先進先出。

 

 

    讀端 和 寫端在不同的 goroutine 之間。

 

    讀端讀,寫端不在線,讀端阻塞。寫端寫,讀端不在線,寫端阻塞。

 

  默認情況下,channel接收和發送數據都是阻塞的,除非另一端已經准備好,這樣就使得goroutine同步變的更加的簡單,而不需要顯式的lock。

  示例如下:

import (
	"fmt"
	"time"
)

func main()  {
	ch := make(chan string)

	go func() {
		defer fmt.Println("子go程結束,寫數據給主go程")
		for i := 0;i < 3;i++{
			fmt.Println(i)
			time.Sleep(time.Second * 2)
		}
		ch <- "子go程打印3次數據完畢"
	}()

	str := <- ch
	fmt.Println("主go程接收到數據:",str)
}

  結果如下:

0
1
2
子go程結束,寫數據給主go程
主go程接收到數據: 子go程打印3次數據完畢

  我們發現主go程在子go程輸出完三次數據后才結束,我們並沒有在主go程中添加死循環來讓主go程后於子go程結束,只是通過通道實現了控制兩個go程到執行順序。

 

  通道channel不僅可以實現goruntine之間的同步,還可以實現goruntine之間的數據通信,示例如下:

import "fmt"

func main()  {
	//通道ch1:用於兩個goruntine之間傳遞數據
	ch1 := make(chan int)
	//通道ch2:協調兩個goruntine之間使用stdout
	ch2 := make(chan bool)

	//定義匿名子go程
	go func() {
		for i := 0;i < 3;i++{
			ch1 <- i
			fmt.Println("子go程向主go程傳遞:",i)
			ch2 <- false
		}
	}()

	//因為子go程向主go程傳遞3次數據,所以主go程要循環3次接收
	for j := 0;j < 3;j++{
		num := <- ch1
		<- ch2
		fmt.Println("主go程讀到:",num)
	}
}

  結果如下:

子go程向主go程傳遞: 0
主go程讀到: 0
子go程向主go程傳遞: 1
主go程讀到: 1
子go程向主go程傳遞: 2
主go程讀到: 2

  上面的程序定義通道ch2的目的是為協調主子go程使用標准輸出的順序,子go程先使用標准輸出,因為在這里標准輸出是公共資源,多個go程調用公共資源需要同步,否則就會發生競爭。

 

4.2 無緩沖channel

  無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何數據值的通道。

  這種類型的通道要求發送goroutine和接收goroutine同時准備好,才能完成發送和接收操作。否則,通道會導致先執行發送或接收操作的goroutine 阻塞等待。

  這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。

  阻塞:由於某種原因數據沒有到達,當前go程(線程)持續處於等待狀態,直到條件滿足,才解除阻塞。

  同步:在兩個或多個go程(線程)間,保持數據內容一致性的機制。

  下圖展示兩個goroutine 如何利用無緩沖的通道來共享一個值:

  在第1步:兩個goruntine都到達通道,但哪個都沒有開始執行發送或者接受。

  在第2步:左側的goruntine將它的手伸進了通道,這模擬了向通道發送數據的行為。這時,這個goruntine會在通道中被鎖住,知道交換完成。

  在第3步:右側的goroutine 將它的手放入通道,這模擬了從通道里接收數據。這個goroutine 一樣也會在通道中被鎖住,直到交換完成。

  在第4 步和第5 步,進行交換,並最終,在第6 步,兩個goroutine都將它們的手從通道里拿出來,這模擬了被鎖住的goroutine 得到釋放。兩個goroutine 現在都可以去做其他事情了。

 

  無緩沖的channel創建格式:

make(chan Type)   //等價於make(chan Type, 0)

  如果沒有指定緩沖區容量,那么該通道就是同步的,因此會阻塞到發送者准備好發送和接收者准備好接收。

  示例代碼:

import (
	"fmt"
)

func main()  {
	//定義無緩沖channel
	ch1 := make(chan int) //等價於:ch := make(chan int,0)
	ch2 := make(chan bool)
	fmt.Println("len=",len(ch1),"cap=",cap(ch1))

	go func() {
		for i := 0;i < 5;i++{
			ch1 <- i
			fmt.Println("---- len=",len(ch1),"cap=",cap(ch1),"i=",i)
			ch2 <- false
		}
	}()

	for j := 0;j < 5;j++{
		num := <- ch1
		<- ch2
		fmt.Println("==== len=",len(ch1),"cap=",cap(ch1),"num=",num)
	}
}

  結果如下:

len= 0 cap= 0
---- len= 0 cap= 0 i= 0
==== len= 0 cap= 0 num= 0
---- len= 0 cap= 0 i= 1
==== len= 0 cap= 0 num= 1
---- len= 0 cap= 0 i= 2
==== len= 0 cap= 0 num= 2
---- len= 0 cap= 0 i= 3
==== len= 0 cap= 0 num= 3
---- len= 0 cap= 0 i= 4
==== len= 0 cap= 0 num= 4

 

4.3 有緩沖的channel

  有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個數據值的通道。

  這種類型的通道並不強制要求goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也不同。

  只有通道中沒有要接收的值時,接收動作才會阻塞。

  只有通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。

  這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的goroutine 會在同一時間進行數據交換;有緩沖的通道沒有這種保證。

  示例圖如下: 

  在第1 步,右側的goroutine 正在從通道接收一個值。

  在第2 步,右側的這個goroutine獨立完成了接收值的動作,而左側的goroutine 正在發送一個新值到通道里。

  在第3 步,左側的goroutine 還在向通道發送新值,而右側的goroutine 正在從通道接收另外一個值。這個步驟里的兩個操作既不是同步的,也不會互相阻塞。

  最后,在第4 步,所有的發送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。

 

  有緩沖的channel創建格式:

 make(chan Type, capacity)

  如果給定了一個緩沖區容量,通道就是異步的。只要緩沖區有未使用空間用於發送數據,或還包含可以接收的數據,那么其通信就會無阻塞地進行。

  借助函數len(ch)求取緩沖區中剩余元素個數,cap(ch) 求取緩沖區元素容量大小。

  示例如下:

import (
"fmt"
"time"
)

func main() {
//定義有緩沖channel,初識容量為3
ch1 := make(chan int,3)
ch2 := make(chan bool)
fmt.Println("len=",len(ch1),"cap=",cap(ch1))

go func() {
for i := 0;i < 7;i++{
ch1 <- i
fmt.Println("---- len=",len(ch1),"cap=",cap(ch1),"i=",i)
}
ch2 <- false
}()

time.Sleep(time.Second * 3)

for j := 0;j < 7;j++{
num := <- ch1
fmt.Println("==== len=",len(ch1),"cap=",cap(ch1),"num=",num)
}
<-ch2
}

  結果如下

len= 0 cap= 3
---- len= 1 cap= 3 i= 0
---- len= 2 cap= 3 i= 1
---- len= 3 cap= 3 i= 2
==== len= 3 cap= 3 num= 0
---- len= 3 cap= 3 i= 3
==== len= 2 cap= 3 num= 1
==== len= 2 cap= 3 num= 2
==== len= 1 cap= 3 num= 3
==== len= 0 cap= 3 num= 4
---- len= 3 cap= 3 i= 4
---- len= 0 cap= 3 i= 5
---- len= 1 cap= 3 i= 6
==== len= 1 cap= 3 num= 5
==== len= 0 cap= 3 num= 6

 

4.4 關閉channel

  如果發送者知道,沒有更多的值需要發送到channel的話,那么讓接收者也能及時知道沒有多余的值可接收將是有用的,因為接收者可以停止不必要的接收等待。這可以通過內置的close函數來關閉channel實現。

  示例如下:

import "fmt"

func main()  {
	ch := make(chan int)

	go func() {
		for i := 0;i < 5;i++{
			ch <- i
		}
		close(ch)
	}()

	for{
		if data,status := <- ch;status{
			fmt.Println(data)
		}else {
			break
		}
	}
	fmt.Println("Finished")
}

  結果如下:

0
1
2
3
4
Finished

  注意:

    channel不像文件一樣需要經常去關閉,只有當你確實沒有任何發送數據了,或者你想顯式的結束range循環之類的,才去關閉channel;

    關閉channel后,無法向channel 再發送數據(引發panic 錯誤后導致接收立即返回零值);

    關閉channel后,可以繼續從channel接收數據;

    對於nil channel,無論收發都會被阻塞。

 

  也可以使用range來迭代不斷操作channel:

import (
	"fmt"
)

func main()  {
	ch := make(chan int)

	go func() {
		for i := 0;i < 5;i++{
			ch <- i
		}
		close(ch)
	}()

	for data := range ch{
		fmt.Println(data)
	}
	fmt.Println("Finished")
}

 

4.5 單項channel

  默認情況下,通道channel是雙向的,也就是,既可以往里面發送數據也可以同里面接收數據。

  但是,我們經常見一個通道作為參數進行傳遞而只希望對方是單向使用的,要么只讓它發送數據,要么只讓它接收數據,這時候我們可以指定通道的方向。

 

 

單向channel變量的聲明非常簡單,如下:

var ch1 chan int       // ch1是一個正常的channel,是雙向的
var ch2 chan<- float64 // ch2是單向channel,只用於寫float64數據
var ch3 <-chan int     // ch3是單向channel,只用於讀int數據

  chan<- 表示數據進入管道,要把數據寫進管道,對於調用者就是輸出。

  <-chan 表示數據從管道出來,對於調用者就是得到管道的數據,當然就是輸入。

 

  可以將channel 隱式轉換為單向隊列,只收或只發,不能將單向channel 轉換為普通channel:

c := make(chan int, 3)
var send chan<- int = c // send-only
var recv <-chan int = c // receive-only
send <- 1
//<-send //invalid operation: <-send (receive from send-only type chan<- int)
<-recv
//recv <- 2 //invalid operation: recv <- 2 (send to receive-only type <-chan int)

 

  單項channel示例如下:

import (
	"fmt"
)

func sendto(out chan <- int)  {
	for i := 0;i < 5;i++{
		out <- i
	}
	close(out)
}

func receivefrom(in <- chan int)  {
	for data := range in{
		fmt.Println("從子go程接收到:",data)
	}
}

func main()  {
	ch := make(chan int)
	go sendto(ch)

	receivefrom(ch)
}

   結果如下:

從子go程接收到: 0
從子go程接收到: 1
從子go程接收到: 2
從子go程接收到: 3
從子go程接收到: 4

 

  (1)生產者和消費者模型

  單向channel最典型的應用是“生產者消費者模型”

  所謂“生產者消費者模型”: 某個模塊(函數等)負責產生數據,這些數據由另一個模塊來負責處理(此處的模塊是廣義的,可以是類、函數、go程、線程、進程等)。產生數據的模塊,就形象地稱為生產者;而處理數據的模塊,就稱為消費者。

  單單抽象出生產者和消費者,還夠不上是生產者/消費者模型。該模式還需要有一個緩沖區處於生產者和消費者之間,作為一個中介。生產者把數據放入緩沖區,而消費者從緩沖區取出數據。大概的結構如下圖:

  舉一個寄信的例子來輔助理解一下,假設你要寄一封平信,大致過程如下:

    1.把信寫好——相當於生產者制造數據

    2.把信放入郵筒——相當於生產者把數據放入緩沖區

    3.郵遞員把信從郵筒取出——相當於消費者把數據取出緩沖區

    4.郵遞員把信拿去郵局做相應的處理——相當於消費者處理數據

  那么,這個緩沖區有什么用呢?為什么不讓生產者直接調用消費者的某個函數,直接把數據傳遞過去,而畫蛇添足般的設置一個緩沖區呢?

  緩沖區的好處大概如下:

  1:解耦

  假設生產者和消費者分別是兩個類。如果讓生產者直接調用消費者的某個方法,那么生產者對於消費者就會產生依賴(也就是耦合)。將來如果消費者的代碼發生變化,可能會直接影響到生產者。而如果兩者都依賴於某個緩沖區,兩者之間不直接依賴,耦合度也就相應降低了。

  接着上述的例子,如果不使用郵筒(緩沖區),須得把信直接交給郵遞員。那你就必須要認識誰是郵遞員。這就產生和你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員換人了,你還要重新認識下一個郵遞員(相當於消費者變化導致修改生產者代碼)。而郵筒相對來說比較固定,你依賴它的成本也比較低(相當於和緩沖區之間的弱耦合)。

  2:處理並發

  生產者直接調用消費者的某個方法,還有另一個弊端。由於函數調用是同步的(或者叫阻塞的),在消費者的方法沒有返回之前,生產者只好一直等在那邊。萬一消費者處理數據很慢,生產者只能無端浪費時間。

  使用了生產者/消費者模式之后,生產者和消費者可以是兩個獨立的並發主體。生產者把制造出來的數據往緩沖區一丟,就可以再去生產下一個數據。基本上不用依賴消費者的處理速度。

  其實最當初這個生產者消費者模式,主要就是用來處理並發問題的。

  從寄信的例子來看。如果沒有郵筒,你得拿着信傻站在路口等郵遞員過來收(相當於生產者阻塞);又或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。

  3:緩存

  如果生產者制造數據的速度時快時慢,緩沖區的好處就體現出來了。當數據制造快的時候,消費者來不及處理,未處理的數據可以暫時存在緩沖區中。等生產者的制造速度慢下來,消費者再慢慢處理掉。

  假設郵遞員一次只能帶走1000封信。萬一某次碰上情人節送賀卡,需要寄出去的信超過1000封,這時候郵筒這個緩沖區就派上用場了。郵遞員把來不及帶走的信暫存在郵筒中,等下次過來時再拿走。

 

  示例如下:

import "fmt"

//定義生產者
func producer(in chan <- int)  {
	for i := 0;i < 10;i++{
		fmt.Println("------生產了:",i)
		in <- i
	}
	close(in)
}

//定義消費者
func consumer(out <- chan int)  {
	for data := range out{
		fmt.Println("======消費了:",data*data)
	}
}

func main()  {
	//定義公共區(緩沖區)
	ch := make(chan int,5)

	//生成生產者
	go producer(ch)

	//生成消費者
	consumer(ch)
}

  結果如下:

------生產了: 0
------生產了: 1
======消費了: 0
======消費了: 1
------生產了: 2
------生產了: 3
------生產了: 4
------生產了: 5
------生產了: 6
======消費了: 4
======消費了: 9
======消費了: 16
======消費了: 25
======消費了: 36
------生產了: 7
------生產了: 8
------生產了: 9
======消費了: 49
======消費了: 64
======消費了: 81

  簡單說明:首先創建一個雙向的channel,然后開啟一個新的goroutine,把雙向通道作為參數傳遞到producer方法中,同時轉成只寫通道。子go程開始執行循環,向只寫通道中添加數據,這就是生產者。主go程,直接調用consumer方法,該方法將雙向通道轉成只讀通道,通過循環每次從通道中讀取數據,這就是消費者。

  注意:channel作為參數傳遞,是引用傳遞

 

4.6 定時器

  (1)time.Timer

  Timer是一個定時器。代表未來的一個單一事件,你可以告訴timer你要等待多長時間。

type Timer struct {
	C <-chan Time
	r runtimeTimer
}

  它提供一個channel,在定時時間到達之前,沒有數據寫入timer.C會一直阻塞。直到定時時間到,系統會自動向timer.C 這個channel中寫入當前時間,阻塞即被解除。

  示例如下:

func main()  {
	//創建定時器,指定定時時長
	timer := time.NewTimer(time.Second * 3)
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"))

	// 從 timer的 C 中讀. 定時時間到達后,系統會自動寫入當前時間到 C 中
	t := <- timer.C
	fmt.Println(t.Format("2006-01-02 15:04:05"))

}

  結果如下:

2019-07-19 20:19:36
2019-07-19 20:19:39

 

  time.After()可以合並上面兩個步驟

func main()  {
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
	//把3秒后的時間寫入到t中
	t := <- time.After(time.Second * 3)
	fmt.Println(t.Format("2006-01-02 15:04:05"))
}

  結果如下:

2019-07-19 20:22:31
2019-07-19 20:22:34

 

  time.Stop()可以停止定時器

func main()  {
	timer := time.NewTimer(time.Second * 5)
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"))

	//停止計時器
	timer.Stop()

	fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
}

  結果如下:

2019-07-19 20:27:06
2019-07-19 20:27:06

 

  timer.Reset()可以重置定時器

func main()  {
	timer := time.NewTimer(time.Second * 5)
	fmt.Println(time.Now().Format("2006-01-02 15:04:05"))

	//重制計時器
	timer.Reset(time.Second * 2)

	t := <- timer.C
	fmt.Println(t.Format("2006-01-02 15:04:05"))
}

  結果如下:

2019-07-19 20:31:45
2019-07-19 20:31:47

 

  (2)time.Ticker

  Ticker是一個周期觸發定時的計時器,它會按照一個時間間隔往channel發送系統當前時間,而channel的接收者可以以固定的時間間隔從channel中讀取事件。

func main()  {
	//控制主子go程結束的先后順序
	ch := make(chan bool)
	timer := time.NewTicker(time.Second * 1)
	i := 0
	go func() {
		for{
			<-timer.C
			i++
			fmt.Println("i = ",i)
			if i == 5{
				timer.Stop()
				ch <- false
				runtime.Goexit()
			}
		}
	}()
	<-ch
}

  結果如下:

i =  1
i =  2
i =  3
i =  4
i =  5

  ticker 只有 Stop() 停止定時器。沒有 Reset() 方法。

 

五.select

  Go里面提供了一個關鍵字select,通過select可以監聽channel上的數據流動。

  有時候我們希望能夠借助channel發送或接收數據,並避免因為發送或者接收導致的阻塞,尤其是當channel沒有准備好寫或者讀時。select語句就可以實現這樣的功能。

  select的用法與switch語言非常類似,由select開始一個新的選擇塊,每個選擇條件由case語句來描述。

  與switch語句相比,select有比較多的限制,其中最大的一條限制就是每個case語句里必須是一個IO操作,大致的結構如下:

select {
    case <- chan1:
        // 如果chan1成功讀到數據,則進行該case處理語句
    case chan2 <- 1:
        // 如果成功向chan2寫入數據,則進行該case處理語句
    default:
        // 如果上面都沒有成功,則進入default處理流程
}

  特性:

    1.每一個case分支,都必須一個 IO操作(channel r/w事件)。

    2.通常將 select 置於 for 循環中。

    3.一個case監聽的 channel 不滿足監聽條件。當前case分支阻塞。

    4.當所有case分支都不滿足監聽條件時,select如果包含default分支,走default;如果沒有default,select等待case。

    5.當監聽的多個case分支中,同時有多個case滿足,隨機選擇任一一個執行。

    6.為防止忙輪詢,可以適當選擇省略 default

  示例如下:

import (
	"fmt"
	"runtime"
)

func main() {
	ch1 := make(chan int)
	ch2 := make(chan bool)

	go func() {
		for{
			fmt.Println("===================")
			select {
			case num := <- ch1:
				fmt.Println("num = ",num)
			case ch2 <- false:
				fmt.Println("子go程結束")
				runtime.Goexit()
			}
		}
	}()

	for i := 0;i < 10;i++{
		ch1 <- i
		if i == 5{
			<- ch2
			break
		}
	}
	fmt.Println("finish")
}

  結果如下:

===================
num =  0
===================
num =  1
===================
num =  2
===================
num =  3
===================
num =  4
===================
num =  5
===================
finish
子go程結束

 

  之后用select實現輸出斐波那契數列的前15位,代碼如下:

import (
	"fmt"
	"runtime"
)

func main()  {
	ch1 := make(chan int)

	ch2 := make(chan bool)

	go func() {
		for{
			select {
			case num := <- ch1:
				fmt.Println(num)
			case ch2 <- false:
				runtime.Goexit()
			}
		}
	}()

	x,y := 1,1
	for i := 0;i < 15;i++{
		ch1 <- x
		x,y = y,x+y
	}
	<-ch2
}

  得到結果如下:

1
1
2
3
5
8
13
21
34
55
89
144
233
377
610

 

  有時候會出現goroutine阻塞的情況,那么我們如何避免整個程序進入阻塞的情況呢?我們可以利用select來設置超時,通過如下的方式實現:

    監聽超時定時器:case <-time.After(time.Second * 3)

    當select監聽的其他case分支滿足時,time.After所在的case分支,會被重置成初始定時時長。

    直到在select 監聽其他case時,沒有任何case滿足監聽條件。time.After 才能定時滿。

  示例如下:

import (
	"fmt"
	"time"
)

func main()  {
	ch1 := make(chan int)
	ch2 := make(chan bool)

	go func() {
		for{
			select {
			case num := <- ch1:
				fmt.Println("num = ",num)
			case <- time.After(time.Second * 3):
				fmt.Println("子go程讀到系統時間, 定時滿 3 秒")
				ch2 <- false
			}
		}
	}()

	for i := 0;i < 2;i++{
		ch1 <- i
		time.Sleep(time.Second*2)
	}
	<-ch2
	fmt.Println("finish")
}

 

六.鎖和條件變量

  前面我們為了解決go程同步的問題我們使用了channel,但是GO也提供了傳統的同步工具,就是鎖。

  它們都在GO的標准庫代碼包sync和sync/atomic中。

  我們看一下鎖的應用。

  是鎖呢?就是某個go程(線程)在訪問某個資源時先鎖住,防止其它go程的訪問,等訪問完畢解鎖后其他go程再來加鎖進行訪問。這和我們生活中加鎖使用公共資源相似,例如:公共衛生間。

 

6.1 死鎖

  首先,死鎖不是鎖的一種,是錯誤使用鎖的現象。

  死鎖是指兩個或兩個以上的進程在執行過程中,由於競爭資源或者由於彼此通信而造成的一種阻塞的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖。

  先面列舉幾個造成死鎖的現象:

  (1)單個go程使用同一個channel自己讀、自己寫。

  示例如下:

func main()  {
	ch := make(chan int)
	ch <- 10
	num := <- ch
	fmt.Println(num)
}

 

  (2)多個go程使用 channel通信,go程創建之前,對channel讀、寫造成死鎖。

  示例如下:

func main()  {
	ch := make(chan int)
	num := <- ch
	go func() {
		ch <- 10
	}()
	fmt.Println(num)
}

 

  (3)多個go程使用多個channel 通信,相互依賴造成死鎖。

func main()  {
	ch1 := make(chan int)
	ch2 := make(chan int)

	go func() {
		for i := 0;i < 10;i++{
			num := <- ch2
			fmt.Println(num)
			ch1 <- i
		}
	}()

	for data := range ch1{
		fmt.Println(data)
		ch2 <- 4096
	}

}

 

  (4)多個go程使用 鎖(讀寫鎖、互斥鎖)和 channel 通信。

 

6.2 互斥量(互斥鎖) MUTEX

  每個資源都對應於一個可稱為"互斥鎖" 的標記,這個標記用來保證在任意時刻,只能有一個go程(線程)訪問該資源。其它的go程只能等待。

  互斥鎖是傳統並發編程對共享資源進行訪問控制的主要手段,它由標准庫sync中的Mutex結構體類型表示。sync.Mutex類型只有兩個公開的指針方法,Lock和Unlock。Lock鎖定當前的共享資源,Unlock進行解鎖。

  在使用互斥鎖時,一定要注意:對資源操作完成后,一定要解鎖,否則會出現流程執行異常,死鎖等問題。通常借助defer。鎖定后,立即使用defer語句保證互斥鎖及時解鎖。如下所示:

var mutex sync.Mutex		// 定義互斥鎖變量 mutex

func write(){
   mutex.Lock( )
   defer mutex.Unlock( )
}

  示例如下:

import (
	"fmt"
	"sync"
	"time"
)

//定義互斥鎖
var mutex sync.Mutex

//定義一個channel用開控制主,子go程結束的先后順序
var ch = make(chan bool)

func printer(str string)  {
	mutex.Lock()
	defer mutex.Unlock()

	for _,ch := range str{
		fmt.Printf("%c",ch)
		time.Sleep(time.Millisecond*200)
	}
}

func user1()  {
	printer("hello")
	ch <- false
}
func user2()  {
	printer("world")
	ch<- false
}

func main()  {
	go user1()
	go user2()
	for i := 0;i < 2;i++{
		<-ch
	}
}

 

6.3 讀寫鎖 RWMUTEX

  互斥鎖的本質是當一個goroutine訪問的時候,其他goroutine都不能訪問。這樣在資源同步,避免競爭的同時也降低了程序的並發性能。程序由原來的並行執行變成了串行執行。

  其實,當我們對一個不會變化的數據只做“讀”操作的話,是不存在資源競爭的問題的。因為數據是不變的,不管怎么讀取,多少goroutine同時讀取,都是可以的。

  所以問題不是出在“讀”上,主要是修改,也就是“寫”。修改的數據要同步,這樣其他goroutine才可以感知到。所以真正的互斥應該是讀取和修改、修改和修改之間,讀和讀是沒有互斥操作的必要的。

  因此,衍生出另外一種鎖,叫做讀寫鎖

  讀寫鎖可以讓多個讀操作並發,同時讀取,但是對於寫操作是完全互斥的。也就是說,當一個goroutine進行寫操作的時候,其他goroutine既不能進行讀操作,也不能進行寫操作。

  GO中的讀寫鎖由結構體類型sync.RWMutex表示。此類型的方法集合中包含兩對方法:

  一組是對寫操作的鎖定和解鎖,簡稱“寫鎖定”和“寫解鎖”:

func (*RWMutex)Lock()
func (*RWMutex)Unlock()

  另一組表示對讀操作的鎖定和解鎖,簡稱為“讀鎖定”與“讀解鎖”:

func (*RWMutex)RLock()
func (*RWMutex)RUnlock()

  讀寫鎖基本示例:

import (
	"fmt"
	"math/rand"
	"runtime"
	"sync"
)

var count int
var rwmutex sync.RWMutex


func Read(n int)  {
	rwmutex.RLock()
	defer rwmutex.RUnlock()
	fmt.Printf("讀goruntine %d 正在讀取數據...\n",n)
	num := count
	fmt.Printf("讀goroutine %d 讀取數據結束,讀到 %d\n",n,num)

}

func Write(n int)  {
	rwmutex.Lock()
	defer rwmutex.Unlock()
	fmt.Printf("寫goruntine %d 正在寫數據...\n",n)
	num := rand.Intn(1000)
	count = num
	fmt.Printf("寫goroutine %d 寫數據結束,寫入新值 %d\n",n,num)
}

func main()  {
	for i:=0;i<5;i++{
		go Read(i+1)
	}

	for j:=0;j<5;j++{
		go Write(j+1)
	}

	for{
		runtime.GC()
	}

}

  結果如下:

讀goruntine 2 正在讀取數據...
讀goroutine 2 讀取數據結束,讀到 0
讀goruntine 1 正在讀取數據...
讀goroutine 1 讀取數據結束,讀到 0
寫goruntine 1 正在寫數據...
寫goroutine 1 寫數據結束,寫入新值 81
讀goruntine 3 正在讀取數據...
讀goroutine 3 讀取數據結束,讀到 81
讀goruntine 4 正在讀取數據...
讀goroutine 4 讀取數據結束,讀到 81
讀goruntine 5 正在讀取數據...
讀goroutine 5 讀取數據結束,讀到 81
寫goruntine 3 正在寫數據...
寫goroutine 3 寫數據結束,寫入新值 887
寫goruntine 2 正在寫數據...
寫goroutine 2 寫數據結束,寫入新值 847
寫goruntine 4 正在寫數據...
寫goroutine 4 寫數據結束,寫入新值 59
寫goruntine 5 正在寫數據...
寫goroutine 5 寫數據結束,寫入新值 81

  我們在read里使用讀鎖,也就是RLock和RUnlock,寫鎖的方法名和我們平時使用的一樣,是Lock和Unlock。這樣,我們就使用了讀寫鎖,可以並發地讀,但是同時只能有一個寫,並且寫的時候不能進行讀操作。

  我們從結果可以看出,讀取操作可以並行,例如2,3,1正在讀取,但是同時只能有一個寫,例如1正在寫,只能等待1寫完,這個過程中不允許進行其它的操作。

  處於讀鎖定狀態,那么針對它的寫鎖定操作將永遠不會成功,且相應的Goroutine也會被一直阻塞。因為它們是互斥的。

  總結:讀寫鎖控制下的多個寫操作之間都是互斥的,並且寫操作與讀操作之間也都是互斥的。但是,多個讀操作之間不存在互斥關系。

 

6.4 條件變量

  在講解條件變量之前,先回顧一下前面我們所涉及的“生產者消費者模型”:

import (
	"fmt"
)

//生產者:只寫,不讀
func producer(ch chan <- int)  {
	for i := 1;i <= 10;i++{
		ch <- i*i
	}
	close(ch)
}

//消費者:只讀,不寫
func consumer(ch <- chan int)  {
	for data := range ch{
		fmt.Println("num:",data)
	}
}

func main()  {
	//定義一個雙向channel
	ch := make(chan int)

	//創建生產者
	go producer(ch)

	//創建消費者
	consumer(ch)
}

 

  這個案例中,雖然實現了生產者消費者的功能,但有一個問題。如果有多個消費者來消費數據,並且並不是簡單的從channel中取出來進行打印,而是還要進行一些復雜的運算。在consumer( )方法中的實現是否有問題呢?如下所示:

import (
	"fmt"
	"runtime"
)

var sum int

//生產者:只寫,不讀
func producer(ch chan <- int)  {
	for i := 1;i <= 100;i++{
		ch <- i
	}
	close(ch)
}

//消費者:只讀,不寫
func consumer(ch <- chan int)  {
	for data := range ch{
		sum += data
	}
	fmt.Println("sum:",sum)
}

func main()  {
	//定義一個雙向channel
	ch := make(chan int)

	//創建生產者
	go producer(ch)

	//創建消費者
	go consumer(ch)

	consumer(ch)
	for{
		runtime.GC()
	}
}

 

  在上面的代碼中,加了一個消費者,同時在consumer方法中,將數據取出來后,又進行了一組運算。這時可能會出現一個go程從通道中取出數據,參與加法運算,但是還沒有算完另外一個go程又從通道中取出一個數據賦值給了data變量。所以這樣累加計算,很有可能出現問題。當然,按照前面的知識,解決這個問題的方法很簡單,就是通過加鎖的方式來解決。增加生產者也是一樣的道理。

  另外一個問題,如果消費者比生產者多,倉庫中就會出現沒有數據的情況。我們需要不斷的通過循環來判斷倉庫隊列中是否有數據,這樣會造成cpu的浪費。反之,如果生產者比較多,倉庫很容易滿,滿了就不能繼續添加數據,也需要循環判斷倉庫滿這一事件,同樣也會造成CPU的浪費。

  我們希望當倉庫滿時,生產者停止生產,等待消費者消費;同理,如果倉庫空了,我們希望消費者停下來等待生產者生產。為了達到這個目的,這里引入條件變量。(需要注意:如果倉庫隊列用channel,是不存在以上情況的,因為channel被填滿后就阻塞了,或者channel中沒有數據也會阻塞)。

  條件變量:條件變量的作用並不保證在同一時刻僅有一個go程(線程)訪問某個共享的數據資源,而是在對應的共享數據的狀態發生變化時,通知阻塞在某個條件上的go程(線程)。條件變量不是鎖,在並發中不能達到同步的目的,因此條件變量總是與鎖一塊使用。

  例如,我們上面說的,如果倉庫隊列滿了,我們可以使用條件變量讓生產者對應的goroutine暫停(阻塞),但是當消費者消費了某個產品后,倉庫就不再滿了,應該喚醒(發送通知給)阻塞的生產者goroutine繼續生產產品。

  GO標准庫中的sync.Cond類型代表了條件變量。條件變量要與鎖(互斥鎖,或者讀寫鎖)一起使用。成員變量L代表與條件變量搭配使用的鎖。

type Cond struct {
	noCopy noCopy

	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
	checker copyChecker
}

  對應的有3個常用方法,Wait,Signal,Broadcast。

  (1)func (c *Cond) Wait()

  該函數的作用可歸納為如下三點:

    a) 阻塞等待條件變量滿足      

    b) 釋放已掌握的互斥鎖相當於cond.L.Unlock()。注意:兩步為一個原子操作要求,在調用wait之前,先加鎖。

    c) 當被喚醒,Wait()函數返回時,解除阻塞並重新獲取互斥鎖。相當於cond.L.Lock()

 

  (2)func (c *Cond) Signal()

   單發通知,給一個正等待(阻塞)在該條件變量上的goroutine(線程)發送通知。

 

  (3)func (c *Cond) Broadcast()

  廣播通知,給正在等待(阻塞)在該條件變量上的所有goroutine(線程)發送通知。

 

  下面我們用條件變量來編寫一個“生產者消費者模型”

  示例代碼:

package main
import "fmt"
import "sync"
import "math/rand"
import "time"

var cond sync.Cond             // 創建全局條件變量

// 生產者
func producer(out chan<- int, idx int) {
   for {
      cond.L.Lock()           	// 條件變量對應互斥鎖加鎖
      for len(out) == 3{          	// 產品區滿 等待消費者消費
         cond.Wait()             	// 掛起當前go程, 等待條件變量滿足,被消費者喚醒
      }
      num := rand.Intn(1000) 	// 產生一個隨機數
      out <- num             	// 寫入到 channel 中 (生產)
      fmt.Printf("%dth 生產者,產生數據 %3d, 公共區剩余%d個數據\n", idx, num, len(out))
      cond.Signal()           	// 喚醒 阻塞的 消費者
      cond.L.Unlock()             	// 生產結束,解鎖互斥鎖
      time.Sleep(time.Second)       // 生產完休息一會,給其他go程執行機會
   }
}
//消費者
func consumer(in <-chan int, idx int) {
   for {
      cond.L.Lock()           	// 條件變量對應互斥鎖加鎖(與生產者是同一個)
      for len(in) == 0 {      	// 產品區為空 等待生產者生產
         cond.Wait()             	// 掛起當前go程, 等待條件變量滿足,被生產者喚醒
      }
      num := <-in                	// 將 channel 中的數據讀走 (消費)
      fmt.Printf("---- %dth 消費者, 消費數據 %3d,公共區剩余%d個數據\n", idx, num, len(in))
      cond.Signal()           	// 喚醒 阻塞的 生產者
      cond.L.Unlock()             	// 消費結束,解鎖互斥鎖
      time.Sleep(time.Millisecond * 500)    	//消費完 休息一會,給其他go程執行機會
   }
}
func main() {
   rand.Seed(time.Now().UnixNano())  // 設置隨機數種子

   product := make(chan int, 3)      // 產品區(公共區)使用channel 模擬
   cond.L = new(sync.Mutex)          // 創建互斥鎖和條件變量

   for i := 0; i < 5; i++ {          // 5個消費者
      go producer(product, i+1)
   }
   for i := 0; i < 3; i++ {          // 3個生產者
      go consumer(product, i+1)
   }
   for {                         	// 主go程阻塞 不結束
	runtime.GC()
}
}

  1)     main函數中定義死循環,其作用是讓主go程阻塞。

  2)     定義product作為隊列,生產者產生數據保存至隊列中,最多存儲3個數據,消費者從中取出數據模擬消費

  3)     條件變量要與鎖一起使用,這里定義全局條件變量cond,它有一個屬性:L Locker。是一個互斥鎖。

  4)     開啟5個消費者go程,開啟3個生產者go程。

  5)     producer生產者,在該方法中開啟互斥鎖,保證數據完整性。並且判斷隊列是否滿,如果已滿,調用wait()讓該goroutine阻塞。當消費者取出數后執行cond.Signal(),會喚醒該goroutine,繼續生產數據。

  6)     consumer消費者,同樣開啟互斥鎖,保證數據完整性。判斷隊列是否為空,如果為空,調用wait()使得當前goroutine阻塞。當生產者產生數據並添加到隊列,執行cond.Signal() 喚醒該goroutine。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM