golang channel詳解和協程優雅退出


非緩沖chan,讀寫對稱

非緩沖channel,要求一端讀取,一端寫入。channel大小為零,所以讀寫操作一定要匹配。

func main() {
	nochan := make(chan int)
	go func(ch chan int) {
		data := <-ch
		fmt.Println("receive data ", data)
	}(nochan)
	nochan <- 5
	fmt.Println("send data ", 5)
}

我們啟動了一個協程從channel中讀取數據,在主協程中寫入,程序的運行流程是主協程優先啟動,運行到nochan<-5寫入是阻塞,然后啟動協程讀取,從而完成協程間通信。 

程序輸出

receive data  5
send data  5

如果將啟動協程的代碼放在nochan<-5下邊,這樣會造成主協程阻塞,無法啟動協程,一直掛起。 

func main() {
	nochan := make(chan int)
	nochan <- 5
	fmt.Println("send data ", 5)
	go func(ch chan int) {
			data := <-ch
			fmt.Println("receive data ", data)
		}(nochan)	
}

上述代碼在運行時golang會直接panic,日志輸出dead lock警告。 

我們可以通過go run -race 選項檢測並運行,是可以看到主協程一直阻塞,子協程無法啟動的。

WaitGroup 待時而動

func main() {
	nochan := make(chan int)
	waiter := &sync.WaitGroup{}
	waiter.Add(2)
	go func(ch chan int, wt *sync.WaitGroup) {
		data := <-ch
		fmt.Println("receive data ", data)
		wt.Done()
	}(nochan, waiter)

	go func(ch chan int, wt *sync.WaitGroup) {
		ch <- 5
		fmt.Println("send data ", 5)
		wt.Done()
	}(nochan, waiter)
	waiter.Wait()
}

 通過waitgroup管理兩個協程,主協程等待兩個子協程退出。

receive data  5
send data  5

range 自動讀取

使用range可以自動的從channel中讀取,當channel被關閉時,for循環退出,否則一直掛起

func main() {
	catchan := make(chan int, 2)
	go func(ch chan int) {
		for i := 0; i < 2; i++ {
			ch <- i
			fmt.Println("send data is ", i)
		}
		//不關閉close,主協程將無法range退出
		close(ch)
		fmt.Println("goroutine1 exited")
	}(catchan)
    
    for data := range catchan {
		fmt.Println("receive data is ", data)
	}

	fmt.Println("main exited")
}

輸出如下 

receive data is  0
send data is  0
send data is  1
goroutine1 exited
receive data is  1
main exited

如果不寫close(ch),主協程將一直掛起,編譯會出現死鎖panic。 

可以通過go run -race 選項檢查看到主協程一直掛起。

緩沖channel, 先進先出

非緩沖channel內部其實是一個加鎖的隊列,先進先出。先寫入的數據優先讀出來。

func main() {
	catchan := make(chan int, 2)
	go func(ch chan int) {
		for i := 0; i < 2; i++ {
			ch <- i
			fmt.Println("send data is ", i)
		}
	}(catchan)
	for i := 0; i < 2; i++ {
		data := <-catchan
		fmt.Println("receive data is ", data)
	}
}

輸出如下 

send data is  0
send data is  1
receive data is  0
receive data is  1

主協程從catchan中讀取數據,子協程先catchan中寫數據。主協程運行到讀取位置先阻塞,子協程啟動后向catchan中寫數據后,主協程繼續讀取。 

如果將主協程的for循環卸載go啟動子協程之前,會造成編譯警告死鎖,當然可以通過go run -race 查看到主協程一直掛起。

讀取關閉的channel

從關閉的channel中讀取數據,優先讀出其中沒有取出的數據,然后讀出存儲類型的空置。循環讀取關閉的channel不會阻塞,會一直讀取空值。可以通過讀取結果的bool值判斷該channel是否關閉。

func main() {
	nochan := make(chan int)
	go func(ch chan int) {
		ch <- 100
		fmt.Println("send data", 100)
		close(ch)
		fmt.Println("goroutine exit")
	}(nochan)
	data := <-nochan
	fmt.Println("receive data is ", data)
	//從關閉的
	data, ok := <-nochan
	if !ok {
		fmt.Println("receive close chan")
		fmt.Println("receive data is ", data)
	}
	fmt.Println("main exited")
}

輸出如下 

receive data is  100
send data 100
goroutine exit
receive close chan
receive data is  0
main exited

主協程運行到data := <- nochan阻塞,子協程啟動后向ch中寫入數據,並關閉ch,此時主協程繼續執行,取出一個數據后,再次取出為空值,並且ok為false表示ch已經被關閉。 

切忌重復關閉channel

重復關閉channel會導致panic

func main() {
	nochan := make(chan int)
	go func(ch chan int) {
		close(ch)
		fmt.Println("goroutine exit")
	}(nochan)

	data, ok := <-nochan
	if !ok {
		fmt.Println("receive close chan")
		fmt.Println("receive data is ", data)
	}
	//二次關閉
	close(nochan)
	fmt.Println("main exited")
}

輸出如下 

goroutine exit
receive close chan
receive data is  0
panic: close of closed channel

子協程退出后,主協程讀取到退出信息,主協程再次關閉chan導致主協程崩潰。 

切忌向關閉的channel寫數據

向關閉的channel寫數據會導致panic

func main() {
	nochan := make(chan int)
	go func(ch chan int) {
		close(ch)
		fmt.Println("goroutine1 exit")
	}(nochan)

	data, ok := <-nochan
	if !ok {
		fmt.Println("receive close chan")
		fmt.Println("receive data is ", data)
	}

	go func(ch chan int) {
		<-ch
		fmt.Println("goroutine2 exit")
	}(nochan)

	//向關閉的channel中寫數據
	nochan <- 200
	fmt.Println("main exited")
}

主線程運行到nochan讀取數據阻塞,此時子協程1關閉,主協程繼續執行獲知nochan被關閉,然后啟動子協程2,繼續運行nochan<-200,此時nochan已被關閉,導致panic,效果如下 

receive close chan
receive data is  0
goroutine1 exit
goroutine2 exit
panic: send on closed channel

切忌關閉nil的channel 

關閉nil值的channel會導致panic

func main() {
	var nochan chan int = nil
	go func(ch chan int) {
		//關閉nil channel會panic
		close(ch)
		fmt.Println("goroutine exit")
	}(nochan)

	//從nil channel中讀取會阻塞
	data, ok := <-nochan
	if !ok {
		fmt.Println("receive close chan")
		fmt.Println("receive data is ", data)
	}
	fmt.Println("main exited")
}

主協程定義了一個nil值的nochan,並未開辟空間。運行至data, ok := <-nochan 阻塞,此時啟動子協程,關閉nochan,導致panic 

效果如下

panic: close of nil channel

讀或寫nil的channel都會阻塞 

向nil的channel寫數據,或者讀取nil的channel也會導致阻塞。

func main() {
	var nochan chan int = nil
	go func(ch chan int) {
		fmt.Println("goroutine begin receive data")
		data, ok := <-nochan
		if !ok {
			fmt.Println("receive close chan")
		}
		fmt.Println("receive data is ", data)
		fmt.Println("goroutine exit")
	}(nochan)
	fmt.Println("main begin send data")
	//向nil channel中寫數據會阻塞
	nochan <- 100
	fmt.Println("main exited")
}

如果直接編譯系統會判斷死鎖panic,我們用go run -race main.go死鎖檢測,並運行,看到主協程一直掛起,子協程也一直掛起。 

結果如下

goroutine begin receive data
main begin send data

主協程和子協程都阻塞了,一直掛起。 

select 多路復用,大巧不工

select 內部可以寫多個協程讀寫,通過case完成多路復用,其結構如下

select {
	case ch <- 100:
		...
	case <- ch2:
		...
	dafault:
		...
}

如果有多個case滿足條件,則select隨機選擇一個執行。否則進入dafault執行。 

我們可以利用上面的九種原理配合select創造出各種並發場景。

總結

1 當我們不使用一個channel時將其置為nil,這樣select就不會檢測它了。
2 當多個子協程想獲取主協程退出通知時,可以從同一個chan中讀取,如果主協程退出則關閉這個chan,那么所有從chan讀取的子協程就會獲得退出消息。從而實現廣播。
3 為保證協程優雅退出,關閉channel的操作盡量放在對channel執行寫操作的協程中。

並發實戰

假設有這樣的需求:
1 主協程啟動兩個協程,協程1負責發送數據給協程2,協程2負責接收並累加獲得的數據。
2 主協程等待兩個子協程退出,當主協程意外退出時通知兩個子協程退出。
3 當發送協程崩潰和主動退出時通知接收協程也要退出,然后主協程退出
4 當接收協程崩潰或主動退出時通知發送協程退出,然后主協程退出。
5 無論三個協程主動退出還是panic,都要保證所有資源手動回收。
下面我們用上面總結的十招完成這個需求

datachan := make(chan int)
groutineclose := make(chan struct{})
mainclose := make(chan struct{})
var onceclose sync.Once
var readclose sync.Once
var sendclose sync.Once
var waitgroup sync.WaitGroup
waitgroup.Add(2)

datachan: 用來裝載發送協程給接收協程的數據 

groutineclose: 用於發送協程和接收協程之間關閉通知
onceclose: 保證datachan一次關閉。
readclose: 保證接收協程資源一次回收。
sendclose: 保證發送協程資源一次回收。
waitgroup: 主協程管理兩個子協程。
接下來我們實現發送協程

go func(datachan chan int, gclose chan struct{}, mclose chan struct{}, group *sync.WaitGroup) {
		defer func() {
			onceclose.Do(func() {
				close(gclose)
			})
			sendclose.Do(func() {
				close(datachan)
				fmt.Println("send goroutine closed !")
				group.Done()
			})
		}()

		for i := 0; i < 100; i++ {
			select {
			case <-gclose:
				fmt.Println("other goroutine exited")
				return
			case <-mclose:
				fmt.Println("main goroutine exited")
				return
				/*
					default:
						datachan <- i
				*/
			case datachan <- i:
			}
		}
	}(datachan, groutineclose, mainclose, &waitgroup)

發送協程在defer函數中回收了和接收協程公用的chan,也主動關閉了數據chan,這么做保證關閉不會panic。此外還對group做了釋放。 

其實將datachan <- i 放在default分支也是可以的。但是為了保證接收協程退出后該發送協程也要及時退出,就放在case邏輯中,這樣不會死鎖。
發送協程累計發送100次數據給接收協程,然后退出。
接下來我們實現接收協程

go func(datachan chan int, gclose chan struct{}, mclose chan struct{}, group *sync.WaitGroup) {
		sum := 0
		defer func() {
			onceclose.Do(func() {
				close(gclose)
			})
			readclose.Do(func() {
				fmt.Println("sum is ", sum)
				fmt.Println("receive goroutine closed !")
				group.Done()
			})
		}()

		for i := 0; ; i++ {
			select {
			case <-gclose:
				fmt.Println("other goroutine exited")
				return
			case <-mclose:
				fmt.Println("main goroutine exited")
				return
			case data, ok := <-datachan:
				if !ok {
					fmt.Println("receive close chan data")
					return
				}
				sum += data
			}
		}
	}(datachan, groutineclose, mainclose, &waitgroup)

和發送協程一樣,接收協程也通過once操作保證公用的通知chan只回收一次。然后回收了自己的資源。接收協程一直循環獲取數據,如果收到主協程退出或者發送協程退出的通知,就退出。 

接下來我們繼續編寫主協程的等待和回收操作

defer func() {
close(mainclose)
time.Sleep(time.Second * 5)
}()

waitgroup.Wait()
fmt.Println("main exited")

這些邏輯我們都寫在main函數里即可。主協程通過waitgroup等待兩個協程,並通過defer通知兩個協程退出。 

運行代碼效果如下

send goroutine closed !
receive close chan data
sum is  4950
receive goroutine closed !
main exited

可以看出發送協程退出接收協程也退出了,接收協程正好計算100次累加,數值為4950。主協程也退出了。 

測試接收協程異常退出

接下來我們測試接收協程異常退出后,發送協程和主協程退出是否回收資源。
我們將接收協程的case邏輯改為i>=20時該接收協程主動panic

case data, ok := <-datachan:
	if !ok {
				fmt.Println("receive close chan data")
				return
			}
		sum += data
	if i >= 20 {
				panic("receive goroutine test panic !!")
			}

運行代碼看下效果 

recover !
close gclose channel
sum is  210
receive goroutine closed !
other goroutine exited
send goroutine closed !
main exited
defer main close

我們在接收協程的defer里增加了recover邏輯,可以看到三個協程都正常退出並回收了各自的資源。 

測試主協程主動退出

我們將主協程的等待代碼去掉,並且在defer中增加延時退出,方便看到兩個協程退出情況

defer func() {
	fmt.Println("defer main close")
	close(mainclose)
	time.Sleep(time.Second * 10)
}()

time.Sleep(time.Second * 10)
fmt.Println("main exited")

運行看效果 

main exited
defer main close
main goroutine exited
sum is  88074498378441
receive goroutine closed !
main goroutine exited
send goroutine closed !

看到三個協程正常退出,並回收了資源。 

源碼下載

https://github.com/secondtonone1/golang-/tree/master/channelpractice

我的公眾號,謝謝關注

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM