Golang:有趣的 channel 應用


嚴格意義上說,本文是我另外一片文章《Golang Funny: Play with Channel》的中文版本。不過,畢竟是用中文當母語的,所以就不翻譯了,重新按照那個內容寫過吧。

channel 是 golang 里相當有趣的一個功能,在我使用 golang 編碼的經驗里,大部分事件都會是在享受 channel 和 goroutine 配合的樂趣。所以本文主要介紹 channel 的一些有趣的用法。

這里有 Oling Cat 翻譯的Go編程語言規范里關於 channel(信道)的描述:

信道提供了一種機制,它在兩個並發執行的函數之間進行同步,並通過傳遞(與該信道元素類型相符的)值來進行通信。

這個個描述又乏味、又枯燥。在我第一次閱讀的時候,完全不明白這到底是個什么玩意。事實上,可以認為 channel 是一個管道或者先進先出隊列,非常簡單且輕量。channel 並不是 Golang 首創的。它同樣作為內置功能出現在其他語言中。在大多數情況下,它是一個又大、又笨、又復雜的消息隊列系統的一個功能。

下面就來一起找點樂子吧!

 

最常見的方式:生產者/消費者

生產者產生一些數據將其放入 channel;然后消費者按照順序,一個一個的從 channel 中取出這些數據進行處理。這是最常見的 channel 的使用方式。當 channel 的緩沖用盡時,生產者必須等待(阻塞)。換句話說,若是 channel 中沒有數據,消費者就必須等待了。

這個例子的源代碼在這里。最好下載到本地運行。

生產者

func producer(c chan int64, max int) {
    defer close(c)
    for i:= 0; i < max; i ++ {
        c <- time.Now().Unix()
    }
}

生產者生成“max”個 int64 的數字,並且將其放入 channel “c” 中。需要注意的是,這里用 defer 在函數推出的時候關閉了 channel。

消費者

func consumer(c chan int64) {
    var v int64
    ok := true
    for ok {
        if v, ok = <-c; ok {
            fmt.Println(v)
        }
    }
}

從 channel 中一個一個的讀取 int64 的數字,然后將其打印在屏幕上。當 channel 被關閉后,變量“ok”將被設置為“false”。

自增長 ID 生成器

當生讓產者可以順序的生成整數。它就是一個自增長 ID 生成器。我將這個功能封裝成了一個包。並將其代碼托管在這里。使用示例可以參考這里的代碼。

type AutoInc struct {
    start, step int
    queue chan int
    running bool
}

func New(start, step int) (ai *AutoInc) {
    ai = &AutoInc{
        start: start,
        step: step,
        running: true,
        queue: make(chan int, 4),
    }
    go ai.process()
    return
}

func (ai *AutoInc) process() {
    defer func() {recover()}()
    for i := ai.start; ai.running ; i=i+ai.step {
        ai.queue <- i
    }
}

func (ai *AutoInc) Id() int {
    return <-ai.queue
}

func (ai *AutoInc) Close() {
    ai.running = false
    close(ai.queue)
}

信號量

信號量也是 channel 的一個有趣的應用。這里有一個來自“高效Go編程”的例子。你應當讀過了吧?如果還沒有,現在就開始讀吧……

我在 Gearman 服務的 API 包 gearman-go 中使用了信號量。在 worker/worker.go 的 232 行,在並行的 Worker.exec 的數量達到 Worker.limit 時,將被阻塞。

var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
    sem <- 1    // 等待放行;
    process(r)  // 可能需要一個很長的處理過程;
    <-sem       // 完成,放行另一個過程。
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req)  // 無需等待 handle 完成。
    }
}

隨機序列生成器

當然可以修改自增長 ID 生成器。讓生產者生成隨機數放入 channel。不過這挺無聊的,不是嗎?

這里是隨機序列生成器的另一個實現。靈感來自語言規范。它會隨機的生成 0/1 序列:

func producer(c chan int64, max int) {
    defer close(c)
    for i:= 0; i < max; i ++ {
        select { // randomized select
            case c <- 0:
            case c <- 1:
        }
    }
}

超時定時器

當一個 channel 被 read/write 阻塞時,它會被永遠阻塞下去,直到 channel 被關閉,這時會產生一個 panic。channel 沒有內建用於超時的定時器。並且似乎也沒有計划向 channel 添加一個這樣的功能。但在大多數情況下,我們需要一個超時機制。例如,由於生產者執行的時候發生了錯誤,所以沒有向 channel 放入數據。消費者會被阻塞到 channel 被關閉。每次出錯都關閉 channel?這絕對不是一個好主意。

這里有一個解決方案:

    c := make(chan int64, 5)
    defer close(c)
    timeout := make(chan bool)
    defer close(timeout)
    go func() {
        time.Sleep(time.Second) // 等一秒
        timeout <- true // 向超時隊列中放入標志
    }()
    select {
        case <-timeout: // 超時
            fmt.Println("timeout...")
        case <-c: // 收到數據
            fmt.Println("Read a date.")
    }

你注意到 select 語句了嗎?哪個 channel 先有數據,哪個分支先執行。因此……還需要更多的解釋嗎?

這同樣被使用在gearman-go 的客戶端 API 實現中,第 238 行。

在本文的英文版本發布后,@mjq 提醒我說可以用 time.After。在項目中,這確實是更好的寫法。我得向他道謝!同時我也閱讀了 src/pkg/time/sleep.go 第 74 行,time.After 的實現。其內部實現與上面的代碼完全一致。

還有更多……

上面提到的各種有趣的應用當然也可以在其他消息隊列中實現,不過由於 channel 的簡單和輕量,使得 golang 的 channel 來實現這些有趣的功能具有實際意義,並有真實的應用場景。其實,我覺得有趣的 channel 用法遠不止這些。如果你發現了其他有趣的玩法,請務必告訴我。謝謝啦!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM