Go channel系列:
channel基礎
channel用於goroutines之間的通信,讓它們之間可以進行數據交換。像管道一樣,一個goroutine_A向channel_A中放數據,另一個goroutine_B從channel_A取數據。
channel是指針類型的數據類型,通過make來分配內存。例如:
ch := make(chan int)
這表示創建一個channel,這個channel中只能保存int類型的數據。也就是說一端只能向此channel中放進int類型的值,另一端只能從此channel中讀出int類型的值。
需要注意,chan TYPE
才表示channel的類型。所以其作為參數或返回值時,需指定為xxx chan int
類似的格式。
向ch這個channel放數據的操作形式為:
ch <- VALUE
從ch這個channel讀數據的操作形式為:
<-ch // 從ch中讀取一個值
val = <-ch
val := <-ch // 從ch中讀取一個值並保存到val變量中
val,ok = <-ch // 從ch讀取一個值,判斷是否讀取成功,如果成功則保存到val變量中
其實很簡單,當ch出現在<-
的左邊表示send,當ch出現在<-
的右邊表示recv。
例如:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go sender(ch) // sender goroutine
go recver(ch) // recver goroutine
time.Sleep(1e9)
}
func sender(ch chan string) {
ch <- "malongshuai"
ch <- "gaoxiaofang"
ch <- "wugui"
ch <- "tuner"
}
func recver(ch chan string) {
var recv string
for {
recv = <-ch
fmt.Println(recv)
}
}
輸出結果:
malongshuai
gaoxiaofang
wugui
tuner
上面激活了一個goroutine用於執行sender()函數,該函數每次向channel ch中發送一個字符串。同時還激活了另一個goroutine用於執行recver()函數,該函數每次從channel ch中讀取一個字符串。
注意上面的recv = <-ch
,當channel中沒有數據可讀時,recver goroutine將會阻塞在此行。由於recver中讀取channel的操作放在了無限for循環中,表示recver goroutine將一直阻塞,直到從channel ch中讀取到數據,讀取到數據后進入下一輪循環由被阻塞在recv = <-ch
上。直到main中的time.Sleep()指定的時間到了,main程序終止,所有的goroutine將全部被強制終止。
因為receiver要不斷從channel中讀取可能存在的數據,所以receiver一般都使用一個無限循環來讀取channel,避免sender發送的數據被丟棄。
channel的屬性和分類
channel的3種操作
每個channel都有3種操作:send、receive和close
- send:表示sender端的goroutine向channel中投放數據
- receive:表示receiver端的goroutine從channel中讀取數據
- close:表示關閉channel
- 關閉channel后,send操作將導致painc
- 關閉channel后,recv操作將返回對應類型的0值以及一個狀態碼false
- close並非強制需要使用close(ch)來關閉channel,在某些時候可以自動被關閉
- 如果使用close(),建議條件允許的情況下加上defer
- 只在sender端上顯式使用close()關閉channel。因為關閉通道意味着沒有數據再需要發送
例如,判斷channel是否被關閉:
val, ok := <-counter
if ok {
fmt.Println(val)
}
因為關閉通道也會讓recv成功讀取(只不過讀取到的值為類型的空值),使得原本阻塞在recv操作上的goroutine變得不阻塞,借此技巧可以實現goroutine的執行先后順序。具體示例見后文:指定goroutine的執行順序。
channel的兩種分類
channel分為兩種:unbuffered channel和buffered channel
- unbuffered channel:阻塞、同步模式
- sender端向channel中send一個數據,然后阻塞,直到receiver端將此數據receive
- receiver端一直阻塞,直到sender端向channel發送了一個數據
- buffered channel:非阻塞、異步模式
- sender端可以向channel中send多個數據(只要channel容量未滿),容量滿之前不會阻塞
- receiver端按照隊列的方式(FIFO,先進先出)從buffered channel中按序receive其中數據
可以認為阻塞和不阻塞是由channel控制的,無論是send還是recv操作,都是在向channel發送請求:
- 對於unbuffered channel,sender發送一個數據,channel暫時不會向sender的請求返回ok消息,而是等到receiver准備接收channel數據了,channel才會向sender和receiver雙方發送ok消息。在sender和receiver接收到ok消息之前,兩者一直處於阻塞。
- 對於buffered channel,sender每發送一個數據,只要channel容量未滿,channel都會向sender的請求直接返回一個ok消息,使得sender不會阻塞,直到channel容量已滿,channel不會向sender返回ok,於是sender被阻塞。對於receiver也一樣,只要channel非空,receiver每次請求channel時,channel都會向其返回ok消息,直到channel為空,channel不會返回ok消息,receiver被阻塞。
buffered channel的兩個屬性
buffered channel有兩個屬性:容量和長度:和slice的capacity和length的概念是一樣的
- capacity:表示bufffered channel最多可以緩沖多少個數據
- length:表示buffered channel當前已緩沖多少個數據
- 創建buffered channel的方式為
make(chan TYPE,CAP)
unbuffered channel可以認為是容量為0的buffered channel,所以每發送一個數據就被阻塞。注意,不是容量為1的buffered channel,因為容量為1的channel,是在channel中已有一個數據,並發送第二個數據的時候才被阻塞。
換句話說,send被阻塞的時候,其實是沒有發送成功的,只有被另一端讀走一個數據之后才算是send成功。對於unbuffered channel來說,這是send/recv的同步模式。而buffered channel則是在每次發送數據到通道的時候,(通道)都向發送者返回一個消息,容量未滿的時候返回成功的消息,發送者因此而不會阻塞,容量已滿的時候因為已滿而遲遲不返回消息,使得發送者被阻塞。
實際上,當向一個channel進行send的時候,先關閉了channel,再讀取channel時會發現錯誤在send,而不是recv。它會提示向已經關閉了的channel發送數據。
func main() {
counter := make(chan int)
go func() {
counter <- 32
}()
close(counter)
fmt.Println(<-counter)
}
輸出報錯:
panic: send on closed channel
所以,在Go的內部行為中,send和recv是一個整體行為,數據未讀就表示未send成功。
兩種特殊的channel
有兩種特殊的channel:nil channel和channal類型的channel。
當未為channel分配內存時,channel就是nil channel,例如var ch1 chan int
。nil channel會永遠阻塞對該channel的讀、寫操作。
nil channel在某些時候有些妙用,例如在select(關於select,見后文)的某個case分支A將其它某case分支B所操作的channel突然設置為nil,這將會禁用case分支B。
當channel的類型為一個channel時,就是channel的channel,也就是雙層通道。例如:
var chch1 chan chan int
channel的channel是指通道里的數據是通道,可以認為通道里面嵌套了一個或多個通道:只能將整個通道發送到外層通道,讀取外層通道時獲取到的是內層通道,然后可以操作內層通道。
// 發送通道給外層通道
chch1 <-ch1
chch1 <-ch2
// 從外層通道取出內層通道
c <-chch1
// 操作取出的內層通道
c <-123
val := <-c
channel of channel的妙用之一是將外層通道作為通道的加工廠:在某個goroutine中不斷生成通道,在其它goroutine可以不斷取出通道來操作。
死鎖(deadlock)
當channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作時,也就是說兩端都因為對方而使得自己當前處於阻塞狀態,這時將會出現死鎖問題。
更通俗地說,只要所有goroutine都被阻塞,就會出現死鎖。
比如,在main函數中,它有一個默認的goroutine,如果在此goroutine中創建一個unbuffered channel,並在main goroutine中向此channel中發送數據並直接receive數據,將會出現死鎖:
package main
import (
"fmt"
)
func main (){
goo(32)
}
func goo(s int) {
counter := make(chan int)
counter <- s
fmt.Println(<-counter)
}
在上面的示例中,向unbuffered channel中send數據的操作counter <- s
是在main goroutine中進行的,從此channel中recv的操作<-counter
也是在main goroutine中進行的。send的時候會直接阻塞main goroutine,使得recv操作無法被執行,go將探測到此問題,並報錯:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
要修復此問題,只需將send操作放在另一個goroutine中執行即可:
package main
import (
"fmt"
)
func main() {
goo(32)
}
func goo(s int) {
counter := make(chan int)
go func() {
counter <- s
}()
fmt.Println(<-counter)
}
或者,將counter設置為一個容量為1的buffered channel:
counter := make(chan int,1)
這樣放完一個數據后send不會阻塞(被recv之前放第二個數據才會阻塞),可以執行到recv操作。
unbuffered channel同步通信示例
下面通過sync.WaitGroup類型來等待程序的結束,分析多個goroutine之間通信時狀態的轉換。因為創建的channel是unbuffered類型的,所以send和recv都是阻塞的。
package main
import (
"fmt"
"sync"
)
// wg用於等待程序執行完成
var wg sync.WaitGroup
func main() {
count := make(chan int)
// 增加兩個待等待的goroutines
wg.Add(2)
fmt.Println("Start Goroutines")
// 激活一個goroutine,label:"Goroutine-1"
go printCounts("Goroutine-1", count)
// 激活另一個goroutine,label:"Goroutine-2"
go printCounts("Goroutine-2", count)
fmt.Println("Communication of channel begins")
// 向channel中發送初始數據
count <- 1
// 等待goroutines都執行完成
fmt.Println("Waiting To Finish")
wg.Wait()
fmt.Println("\nTerminating the Program")
}
func printCounts(label string, count chan int) {
// goroutine執行完成時,wg的計數器減1
defer wg.Done()
for {
// 從channel中接收數據
// 如果無數據可recv,則goroutine阻塞在此
val, ok := <-count
if !ok {
fmt.Println("Channel was closed:",label)
return
}
fmt.Printf("Count: %d received from %s \n", val, label)
if val == 10 {
fmt.Printf("Channel Closed from %s \n", label)
// Close the channel
close(count)
return
}
// 輸出接收到的數據后,加1,並重新將其send到channel中
val++
count <- val
}
}
上面的程序中,激活了兩個goroutine,激活這兩個goroutine后,向channel中發送一個初始數據值1,然后main goroutine將因為wg.Wait()等待2個goroutine都執行完成而被阻塞。
再看這兩個goroutine,這兩個goroutine執行完全一樣的函數代碼,它們都接收count這個channel的數據,但可能是goroutine1先接收到channel中的初始值1,也可能是goroutine2先接收到初始值1。接收到數據后輸出值,並在輸出后對數據加1,然后將加1后的數據再次send到channel,每次send都會將自己這個goroutine阻塞(因為unbuffered channel),此時另一個goroutine因為等待recv而執行。當加1后發送給channel的數據為10之后,某goroutine將關閉count channel,該goroutine將退出,wg的計數器減1,另一個goroutine因等待recv而阻塞的狀態將因為channel的關閉而失敗,ok狀態碼將讓該goroutine退出,於是wg的計數器減為0,main goroutine因為wg.Wait()而繼續執行后面的代碼。
使用for range迭代channel
前面都是在for無限循環中讀取channel中的數據,但也可以使用range來迭代channel,它會返回每次迭代過程中所讀取的數據,直到channel被關閉。必須注意,只要channel未關閉,range迭代channel就會一直被阻塞。
例如,將上面示例中的printCounts()改為for-range的循環形式。
func printCounts(label string, count chan int) {
defer wg.Done()
for val := range count {
fmt.Printf("Count: %d received from %s \n", val, label)
if val == 10 {
fmt.Printf("Channel Closed from %s \n", label)
close(count)
return
}
val++
count <- val
}
}
多個"管道":輸出作為輸入
channel是goroutine與goroutine之間通信的基礎,一邊產生數據放進channel,另一邊從channel讀取放進來的數據。可以借此實現多個goroutine之間的數據交換,例如goroutine_1->goroutine_2->goroutine_3
,就像bash的管道一樣,上一個命令的輸出可以不斷傳遞給下一個命令的輸入,只不過golang借助channel可以在多個goroutine(如函數的執行)之間傳,而bash是在命令之間傳。
以下是一個示例,第一個函數getRandNum()用於生成隨機整數,並將生成的整數放進第一個channel ch1中,第二個函數addRandNum()用於接收ch1中的數據(來自第一個函數),將其輸出,然后對接收的值加1后放進第二個channel ch2中,第三個函數printRes接收ch2中的數據並將其輸出。
如果將函數認為是Linux的命令,則類似於下面的命令行:ch1相當於第一個管道,ch2相當於第二個管道
getRandNum | addRandNum | printRes
以下是代碼部分:
package main
import (
"fmt"
"math/rand"
"sync"
)
var wg sync.WaitGroup
func main() {
wg.Add(3)
// 創建兩個channel
ch1 := make(chan int)
ch2 := make(chan int)
// 3個goroutine並行
go getRandNum(ch1)
go addRandNum(ch1, ch2)
go printRes(ch2)
wg.Wait()
}
func getRandNum(out chan int) {
// defer the wg.Done()
defer wg.Done()
var random int
// 總共生成10個隨機數
for i := 0; i < 10; i++ {
// 生成[0,30)之間的隨機整數並放進channel out
random = rand.Intn(30)
out <- random
}
close(out)
}
func addRandNum(in,out chan int) {
defer wg.Done()
for v := range in {
// 輸出從第一個channel中讀取到的數據
// 並將值+1后放進第二個channel中
fmt.Println("before +1:",v)
out <- (v + 1)
}
close(out)
}
func printRes(in chan int){
defer wg.Done()
for v := range in {
fmt.Println("after +1:",v)
}
}
指定channel的方向
上面通過兩個channel將3個goroutine連接起來,其中起連接作用的是第二個函數addRandNum()。在這個函數中使用了兩個channel作為參數:一個channel用於接收、一個channel用於發送。
其實channel類的參數變量可以指定數據流向:
in <-chan int
:表示channel in通道只用於接收數據out chan<- int
:表示channel out通道只用於發送數據
只用於接收數據的通道<-chan
不可被關閉,因為關閉通道是針對發送數據而言的,表示無數據再需發送。對於recv來說,關閉通道是沒有意義的。
所以,上面示例中三個函數可改寫為:
func getRandNum(out chan<- int) {
...
}
func addRandNum(in <-chan int, out chan<- int) {
...
}
func printRes(in <-chan int){
...
}
buffered channel異步隊列請求示例
下面是使用buffered channel實現異步處理請求的示例。
在此示例中:
- 有(最多)3個worker,每個worker是一個goroutine,它們有worker ID。
- 每個worker都從一個buffered channel中取出待執行的任務,每個任務是一個struct結構,包含了任務id(JobID),當前任務的隊列號(ID)以及任務的狀態(worker是否執行完成該任務)。
- 在main goroutine中將每個任務struct發送到buffered channel中,這個buffered channel的容量為10,也就是最多只允許10個任務進行排隊。
- worker每次取出任務后,輸出任務號,然后執行任務(run),最后輸出任務id已完成。
- 每個worker執行任務的方式很簡單:隨機睡眠0-1秒鍾,並將任務標記為完成。
以下是代碼部分:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Task struct {
ID int
JobID int
Status string
CreateTime time.Time
}
func (t *Task) run() {
sleep := rand.Intn(1000)
time.Sleep(time.Duration(sleep) * time.Millisecond)
t.Status = "Completed"
}
var wg sync.WaitGroup
// worker的數量,即使用多少goroutine執行任務
const workerNum = 3
func main() {
wg.Add(workerNum)
// 創建容量為10的buffered channel
taskQueue := make(chan *Task, 10)
// 激活goroutine,執行任務
for workID := 0; workID <= workerNum; workID++ {
go worker(taskQueue, workID)
}
// 將待執行任務放進buffered channel,共15個任務
for i := 1; i <= 15; i++ {
taskQueue <- &Task{
ID: i,
JobID: 100 + i,
CreateTime: time.Now(),
}
}
close(taskQueue)
wg.Wait()
}
// 從buffered channel中讀取任務,並執行任務
func worker(in <-chan *Task, workID int) {
defer wg.Done()
for v := range in {
fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
v.run()
fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID)
}
}
select多路監聽
很多時候想要同時操作多個channel,比如從ch1、ch2讀數據。Go提供了一個select語句塊,它像switch一樣工作,里面放一些case語句塊,用來輪詢每個case語句塊的send或recv情況。
select
用法格式示例:
select {
// ch1有數據時,讀取到v1變量中
case v1 := <-ch1:
...
// ch2有數據時,讀取到v2變量中
case v2 := <-ch2:
...
// 所有case都不滿足條件時,執行default
default:
...
}
defalut語句是可選的,不允許fall through行為,但允許case語句塊為空塊。select會被return、break關鍵字中斷:return是退出整個函數,break是退出當前select。
select的行為模式主要是對channel是否可讀進行輪詢,但也可以用來向channel發送數據。它的行為如下:
- 如果所有的case語句塊評估時都被阻塞,則阻塞直到某個語句塊可以被處理
- 如果多個case同時滿足條件,則隨機選擇一個進行處理,對於這一次的選擇,其它的case都不會被阻塞,而是處理完被選中的case后進入下一輪select(如果select在循環中)或者結束select(如果select不在循環中或循環次數結束)
- 如果存在default且其它case都不滿足條件,則執行default。所以default必須要可執行而不能阻塞
如果有所疑惑,后文的"select超時時間"有更有助於理解select的說明和示例。
所有的case塊都是按源代碼書寫順序進行評估的。當select未在循環中時,它將只對所有case評估一次,這次結束后就結束select。某次評估過程中如果有滿足條件的case,則所有其它case都直接結束評估,並退出此次select。
其實如果注意到select語句是在某一個goroutine中評估的,就不難理解只有所有case都不滿足條件時,select所在goroutine才會被阻塞,只要有一個case滿足條件,本次select就不會出現阻塞的情況。
需要注意的是,如果在select中執行send操作,則可能會永遠被send阻塞。所以,在使用send的時候,應該也使用defalut語句塊,保證send不會被阻塞。如果沒有default,或者能確保select不阻塞的語句塊,則遲早會被send阻塞。在后文有一個select中send永久阻塞的分析:雙層channel的一個示例。
一般來說,select會放在一個無限循環語句中,一直輪詢channel的可讀事件。
下面是一個示例,pump1()和pump2()都用於產生數據(一個產生偶數,一個產生奇數),並將數據分別放進ch1和ch2兩個通道,suck()則從ch1和ch2中讀取數據。然后在無限循環中使用select輪詢這兩個通道是否可讀,最后main goroutine在1秒后強制中斷所有goroutine。
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go pump1(ch1)
go pump2(ch2)
go suck(ch1, ch2)
time.Sleep(1e9)
}
func pump1(ch chan int) {
for i := 0; i <= 30; i++ {
if i%2 == 0 {
ch <- i
}
}
}
func pump2(ch chan int) {
for i := 0; i <= 30; i++ {
if i%2 == 1 {
ch <- i
}
}
}
func suck(ch1 chan int, ch2 chan int) {
for {
select {
case v := <-ch1:
fmt.Printf("Recv on ch1: %d\n", v)
case v := <-ch2:
fmt.Printf("Recv on ch2: %d\n", v)
}
}
}