筆者在《Golang 入門 : 競爭條件》一文中介紹了 Golang 並發編程中需要面對的競爭條件。本文我們就介紹如何使用 Golang 提供的 channel(通道) 消除競爭條件。
Channel 是 Golang 在語言級別提供的 goroutine 之間的通信方式,可以使用 channel 在兩個或多個 goroutine 之間傳遞消息。Channel 是進程內的通信方式,因此通過 channel 傳遞對象的過程和調用函數時的參數傳遞行為比較一致,比如也可以傳遞指針等。使用通道發送和接收所需的共享資源,可以在 goroutine 之間消除競爭條件。
當一個資源需要在 goroutine 之間共享時,channel 在 goroutine 之間架起了一個管道,並提供了確保同步交換數據的機制。Channel 是類型相關的,也就是說,一個 channel 只能傳遞一種類型的值,這個類型需要在聲明 channel 時指定。可以通過 channel 共享內置類型、命名類型、結構類型和引用類型的值或者指針。
基本語法
聲明 channel 的語法格式為:
var ChannelName chan ElementType
與一般變量聲明的不同之處僅僅是在類型前面添加了一個 chan 關鍵字。ElementType 則指明這個 channel 能夠傳遞的數據的類型。比如聲明一個傳遞 int 類型的 channel:
var ch chan int
或者是聲明一個 map,其元素是 bool 型的 channel:
var m map[string] chan bool
在 Golang 中需要使用內置的 make 函數類創建 channel 的實例:
ch := make(chan int)
這樣就聲明並初始化了一個名為 ch 的 int 型 channel。使用 channel 發送和接收數據的語法也很直觀,比如下面的代碼把數據發送到 channel 中:
ch <- value
向 channel 中寫入數據通常會導致程序阻塞,直到有其它 goroutine 從這個 channel 中讀取數據。下面的代碼把數據從 channel 讀取到變量中:
value := <-ch
注意,如果 channel 中沒有數據,那么從 channel 中讀取數據也會導致程序阻塞,直到 channel 中被寫入數據為止。
根據 channel 是否有緩沖區可以簡單地把 channel 分為無緩沖區的 channel 和帶緩沖區的 channel,在本文接下來的篇幅中會詳細的介紹這兩類 channel 的用法。
select
Linux 系統中的 select 函數用來監控一系列的文件句柄,一旦其中一個文件句柄發生了 I/O 動作,select 函數就會返回。該函數主要被用來實現高並發的 socket 服務器程序。Golang 中的 select 關鍵字和 linux 中的 select 函數功能有點相似,它主要用於處理異步 I/O 問題。
select 的語法與 switch 的語法非常相似,由 select 開始一個新的選擇塊,每個選擇條件有 case 語句來描述。與 switch 語句可以選擇任何可使用相等比較的條件相比,select 有比較多的限制,其中最大的一條限制就是每個 case 語句里必須是一個 I/O 操作。其大致的結構如下:
select { case <-chan1: // 如果 chan1 成功讀取到數據,則執行該 case 語句 case chan2 <- 1: // 如果成功向 chan2 寫入數據,則執行該 case 語句 default: // 如果上面的條件都沒有成功,則執行 default 流程 }
可以看出,select 不像 switch,后面並沒有條件判斷,而是直接去查看 case 語句。每個 case 語句都必須是一個面向 channel 的操作。比如上面的例子中,第一個 case 試圖從 chan1 讀取一個數據並直接忽略讀取到的數據,而第二個 case 則試圖向 chan2 中寫入一個整數 1,如果這兩者都沒有成功,則執行 default 語句。
無緩沖的 channel
無緩沖的 channel(unbuffered channel) 是指在接收前沒有能力保存任何值的 channel。這種類型的 channel 要求發送 goroutine 和接收 goroutine 同時准備好,才能完成發送和接收操作。如果兩個 goroutine 沒有同時准備好,channel 會導致先執行發送或接收操作的 goroutine 阻塞等待。這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。我們可以通過下面的圖示形象地理解兩個 goroutine 如何利用無緩沖的 channel 來共享一個值(下圖來自互聯網):
下面詳細地解釋一下上圖:
- 在第 1 步,兩個 goroutine 都到達通道,但兩個都沒有開始執行數據的發送或接收。
- 在第 2 步,左側的 goroutine 將它的手伸進了通道,這模擬了向通道發送數據的行為。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
- 在第 3 步,右側的 goroutine 將它的手放入通道,這模擬了從通道里接收數據。這個 goroutine一樣也會在通道中被鎖住,直到交換完成。
- 在第 4 步和第 5 步,進行數據交換。
- 在第 6 步,兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個 goroutine 現在都可以去做別的事情了。
下面的例子模擬一場網球比賽。在網球比賽中,兩位選手會把球在兩個人之間來回傳遞。選手總是處在以下兩種狀態之一:要么在等待接球,要么將球打向對方。可以使用兩個goroutine來模擬網球比賽,並使用無緩沖的通道來模擬球的來回:
// 這個示例程序展示如何用無緩沖的通道來模擬 //2個goroutine間的網球比賽 package main import( "math/rand" "sync" "time" "fmt" ) // wg用來等待程序結束 var wg sync.WaitGroup func init() { rand.Seed(time.Now().UnixNano()) } // main是所有Go程序的入口 func main() { // 創建一個無緩沖的通道 court := make(chan int) // 計數加2,表示要等待兩個goroutine wg.Add(2) // 啟動兩個選手 go player("Nick", court) go player("Jack", court) // 發球 court <- 1 // 等待游戲結束 wg.Wait() } // player 模擬一個選手在打網球 func player(name string, court chan int) { // 在函數退出時調用Done來通知main函數工作已經完成 defer wg.Done() for{ // 等待球被擊打過來 ball, ok := <-court if !ok { // 如果通道被關閉,我們就贏了 fmt.Printf("Player %s Won\n", name) return } // 選隨機數,然后用這個數來判斷我們是否丟球 n := rand.Intn(100) if n%5 == 0 { fmt.Printf("Player %s Missed\n", name) // 關閉通道,表示我們輸了 close(court) return } // 顯示擊球數,並將擊球數加1 fmt.Printf("Player %s Hit %d\n", name, ball) ball++ // 將球打向對手 court <- ball } }
運行上面的代碼,會輸出類似下面的信息:
Player Jack Hit 1 Player Nick Hit 2 Player Jack Hit 3 Player Nick Hit 4 Player Jack Missed Player Nick Won
簡單解釋一下上面的代碼:
在 main 函數中創建了一個 int 類型的無緩沖的通道,使用該通道讓兩個 goroutine 在擊球時能夠互相同步。然后創建了參與比賽的兩個 goroutine。在這個時候,兩個 goroutine 都阻塞住等待擊球。court <- 1 模擬發球,將球發到通道里,程序開始執行這個比賽,直到某個 goroutine 輸掉比賽。
在 player 函數里,主要是運行一個無限循環的 for 語句。在這個循環里,是玩游戲的過程。goroutine 從通道接收數據,用來表示等待接球。這個接收動作會鎖住 goroutine,直到有數據發送到通道里。通道的接收動作返回時,會檢測 ok 標志是否為 false。如果這個值是 false,表示通道已經被關閉,游戲結束。在這個模擬程序中,使用隨機數來決定 goroutine 是否擊中了球。如果擊中了球,就把 ball 的值遞增 1,並將 ball 作為球重新放入通道,發送給另一位選手。在這個時刻,兩個 goroutine 都會被鎖住,直到交換完成。最終,引某個 goroutine 沒有打中球會把通道關閉。之后兩個 goroutine 都會返回,通過 defer 聲明的 Done 會被執行,程序終止。
帶緩沖的 channel
帶緩沖的 channel(buffered channel) 是一種在被接收前能存儲一個或者多個值的通道。這種類型的通道並不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:無緩沖的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;有緩沖的通道沒有這種保證。可以通過下面的圖示形象地理解兩個 goroutine 分別向帶緩沖的通道里增加一個值和從帶緩沖的通道里移除一個值(下圖來自互聯網):
下面詳細地解釋一下上圖:
- 在第 1 步,右側的 goroutine 正在從通道接收一個值。
- 在第 2 步,右側的這個 goroutine 獨立完成了接收值的動作,而左側的 goroutine 正在發送一個新值到通道里。
- 在第 3 步,左側的 goroutine 還在向通道發送新值,而右側的 goroutine 正在從通道接收另外一個值。這個步驟里的兩個操作既不是同步的,也不會互相阻塞。
- 最后,在第 4 步,所有的發送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。
創建帶緩沖區的 channel 非常簡單,只需要再添加一個緩沖區的大小就可以了,比如創建一個傳遞 int 類型數據,緩沖區為 10 的 channel:
ch := make(chan int, 10)
下面的 demo 使用一組 goroutine 來接收並完成任務,帶緩沖區的通道提供了一種清晰而直觀的方式來實現這個功能:
// 這個示例程序展示如何使用 // 有緩沖的通道和固定數目的 // goroutine來處理一堆工作 package main import( "math/rand" "sync" "time" "fmt" ) const( numberGoroutines = 2 // 要使用的goroutine的數量 taskLoad = 5 // 要處理的工作的數量 ) // wg用來等待程序結束 var wg sync.WaitGroup func init() { rand.Seed(time.Now().UnixNano()) } // main是所有Go程序的入口 func main() { // 創建一個有緩沖的通道來管理工作 tasks := make(chan string, taskLoad) // 啟動goroutine來處理工作 wg.Add(numberGoroutines) for gr := 1; gr <= numberGoroutines; gr++ { go worker(tasks, gr) } // 增加一組要完成的工作 for post := 1; post <= taskLoad; post++ { tasks <- fmt.Sprintf("Task: %d", post) } // 當所有工作都處理完時關閉通道 // 以便所有goroutine退出 close(tasks) // 等待所有工作完成 wg.Wait() } // worker作為goroutine啟動來處理 // 從有緩沖的通道傳入的工作 func worker(tasks chan string, worker int) { // 通知函數已經返回 defer wg.Done() for{ // 等待分配工作 task, ok := <-tasks if !ok{ // 這意味着通道已經空了,並且已被關閉 fmt.Printf("Worker: %d: Shutting Down\n", worker) return } // 顯示我們開始工作了 fmt.Printf("Worker: %d: Started %s\n", worker, task) // 隨機等一段時間來模擬工作 sleep := rand.Int63n(100) time.Sleep(time.Duration(sleep)* time.Millisecond) // 顯示我們完成了工作 fmt.Printf("Worker: %d: Completed %s\n", worker, task) } }
運行上面的程序,輸出結果大致如下:
Worker: 2: Started Task: 1 Worker: 1: Started Task: 2 Worker: 1: Completed Task: 2 Worker: 1: Started Task: 3 Worker: 1: Completed Task: 3 Worker: 1: Started Task: 4 Worker: 2: Completed Task: 1 Worker: 2: Started Task: 5 Worker: 1: Completed Task: 4 Worker: 1: Shutting Down Worker: 2: Completed Task: 5 Worker: 2: Shutting Down
代碼里有很詳細的注釋,因此不再贅言,只解釋一下通道的關閉:
關閉通道的代碼非常重要。當通道關閉后,goroutine 依舊可以從通道接收數據,但是不能再向通道里發送數據。能夠從已經關閉的通道接收數據這一點非常重要,因為這允許通道關閉后依舊能取出其中緩沖的全部值,而不會有數據丟失。從一個已經關閉且沒有數據的通道里獲取數據,總會立刻返回,並返回一個通道類型的零值。如果在獲取通道時還加入了可選的標志,就能得到通道的狀態信息。
處理超時
使用 channel 時需要小心,比如對於下面的簡單用法:
i := <-ch
碰到永遠沒有往 ch 中寫入數據的情況,那么這個讀取動作將永遠也無法從 ch 中讀取到數據,導致的結果就是整個 goroutine 永遠阻塞並且沒有挽回的機會。如果 channel 只是被同一個開發者使用,那樣出問題的可能性還低一些。但如果一旦對外公開,就必須考慮到最差情況並對程序進行維護。
Golang 沒有提供直接的超時處理機制,但可以利用 select 機制變通地解決。因為 select 的特點是只要其中一個 case 已經完成,程序就會繼續往下執行,而不會考慮其它的 case。基於此特性我們來實現一個 channel 的超時機制:
ch := make(chan int) // 首先實現並執行一個匿名的超時等待函數 timeout := make(chan bool, 1) go func() { time.Sleep(1e9) // 等待 1 秒 timeout <- true }() // 然后把 timeout 這個 channel 利用起來 select { case <-ch: // 從 ch 中讀取到數據 case <- timeout: // 一直沒有從 ch 中讀取到數據,但從 timeout 中讀取到了數據 fmt.Println("Timeout occurred.") }
執行上面的代碼,輸出的結果為:
Timeout occurred.
關閉 channel
關閉 channel 非常簡單,直接調用 Golang 內置的 close() 函數就可以了:
close(ch)
在關閉了 channel 之后我們要面對的問題是:如何判斷一個 channel 是否已關閉?
其實在從 channel 中讀取數據的同時,還可以獲得一個布爾類型的值,該值表示 channel 是否已關閉:
x, ok := <-ch
如果 ok 的值為 false,則表示 ch 已經被關閉。
參考:
《Go語言實戰》
《Go語言編程入門與實戰技巧》