嚴格意義上說,本文是我另外一片文章《Golang Funny: Play with Channel》的中文版本。不過,畢竟是用中文當母語的,所以就不翻譯了,重新按照那個內容寫過吧。
channel 是 golang 里相當有趣的一個功能,在我使用 golang 編碼的經驗里,大部分事件都會是在享受 channel 和 goroutine 配合的樂趣。所以本文主要介紹 channel 的一些有趣的用法。
這里有 Oling Cat 翻譯的Go編程語言規范里關於 channel(信道)的描述:
信道提供了一種機制,它在兩個並發執行的函數之間進行同步,並通過傳遞(與該信道元素類型相符的)值來進行通信。
這個個描述又乏味、又枯燥。在我第一次閱讀的時候,完全不明白這到底是個什么玩意。事實上,可以認為 channel 是一個管道或者先進先出隊列,非常簡單且輕量。channel 並不是 Golang 首創的。它同樣作為內置功能出現在其他語言中。在大多數情況下,它是一個又大、又笨、又復雜的消息隊列系統的一個功能。
下面就來一起找點樂子吧!
最常見的方式:生產者/消費者
生產者產生一些數據將其放入 channel;然后消費者按照順序,一個一個的從 channel 中取出這些數據進行處理。這是最常見的 channel 的使用方式。當 channel 的緩沖用盡時,生產者必須等待(阻塞)。換句話說,若是 channel 中沒有數據,消費者就必須等待了。
這個例子的源代碼在這里。最好下載到本地運行。
生產者
func producer(c chan int64, max int) { defer close(c) for i:= 0; i < max; i ++ { c <- time.Now().Unix() } }
生產者生成“max”個 int64 的數字,並且將其放入 channel “c” 中。需要注意的是,這里用 defer 在函數推出的時候關閉了 channel。
消費者
func consumer(c chan int64) { var v int64 ok := true for ok { if v, ok = <-c; ok { fmt.Println(v) } } }
從 channel 中一個一個的讀取 int64 的數字,然后將其打印在屏幕上。當 channel 被關閉后,變量“ok”將被設置為“false”。
自增長 ID 生成器
當生讓產者可以順序的生成整數。它就是一個自增長 ID 生成器。我將這個功能封裝成了一個包。並將其代碼托管在這里。使用示例可以參考這里的代碼。
type AutoInc struct { start, step int queue chan int running bool } func New(start, step int) (ai *AutoInc) { ai = &AutoInc{ start: start, step: step, running: true, queue: make(chan int, 4), } go ai.process() return } func (ai *AutoInc) process() { defer func() {recover()}() for i := ai.start; ai.running ; i=i+ai.step { ai.queue <- i } } func (ai *AutoInc) Id() int { return <-ai.queue } func (ai *AutoInc) Close() { ai.running = false close(ai.queue) }
信號量
信號量也是 channel 的一個有趣的應用。這里有一個來自“高效Go編程”的例子。你應當讀過了吧?如果還沒有,現在就開始讀吧……
我在 Gearman 服務的 API 包 gearman-go 中使用了信號量。在 worker/worker.go 的 232 行,在並行的 Worker.exec 的數量達到 Worker.limit 時,將被阻塞。
var sem = make(chan int, MaxOutstanding) func handle(r *Request) { sem <- 1 // 等待放行; process(r) // 可能需要一個很長的處理過程; <-sem // 完成,放行另一個過程。 } func Serve(queue chan *Request) { for { req := <-queue go handle(req) // 無需等待 handle 完成。 } }
隨機序列生成器
當然可以修改自增長 ID 生成器。讓生產者生成隨機數放入 channel。不過這挺無聊的,不是嗎?
這里是隨機序列生成器的另一個實現。靈感來自語言規范。它會隨機的生成 0/1 序列:
func producer(c chan int64, max int) { defer close(c) for i:= 0; i < max; i ++ { select { // randomized select case c <- 0: case c <- 1: } } }
超時定時器
當一個 channel 被 read/write 阻塞時,它會被永遠阻塞下去,直到 channel 被關閉,這時會產生一個 panic。channel 沒有內建用於超時的定時器。並且似乎也沒有計划向 channel 添加一個這樣的功能。但在大多數情況下,我們需要一個超時機制。例如,由於生產者執行的時候發生了錯誤,所以沒有向 channel 放入數據。消費者會被阻塞到 channel 被關閉。每次出錯都關閉 channel?這絕對不是一個好主意。
這里有一個解決方案:
c := make(chan int64, 5) defer close(c) timeout := make(chan bool) defer close(timeout) go func() { time.Sleep(time.Second) // 等一秒 timeout <- true // 向超時隊列中放入標志 }() select { case <-timeout: // 超時 fmt.Println("timeout...") case <-c: // 收到數據 fmt.Println("Read a date.") }
你注意到 select 語句了嗎?哪個 channel 先有數據,哪個分支先執行。因此……還需要更多的解釋嗎?
這同樣被使用在gearman-go 的客戶端 API 實現中,第 238 行。
在本文的英文版本發布后,@mjq 提醒我說可以用 time.After。在項目中,這確實是更好的寫法。我得向他道謝!同時我也閱讀了 src/pkg/time/sleep.go 第 74 行,time.After 的實現。其內部實現與上面的代碼完全一致。
還有更多……
上面提到的各種有趣的應用當然也可以在其他消息隊列中實現,不過由於 channel 的簡單和輕量,使得 golang 的 channel 來實現這些有趣的功能具有實際意義,並有真實的應用場景。其實,我覺得有趣的 channel 用法遠不止這些。如果你發現了其他有趣的玩法,請務必告訴我。謝謝啦!