Golang 在異步處理上有着上佳的表現。因為 goroutines 和 channels 是非常容易使用且有效的異步處理手段。下面我們一起來看一看 Golang 的簡易任務隊列
一種"非任務隊列"的任務隊列
有些時候,我們需要做異步處理但是並不需要一個任務對列,這類問題我們使用 Golang 可以非常簡單的實現。如下:
go process(job)
這的確是很多場景下的絕佳選擇,比如操作一個HTTP請求等待結果。然而,在一些相對復雜高並發的場景下,你就不能簡單的使用該方法來實現異步處理。這時候,你需要一個隊列來管理需要處理的任務,並且按照一定的順序來處理這些任務。
最簡單的任務隊列
接下來看一個最簡單的任務隊列和工作者模型。
func worker(jobChan <-chan Job) {
for job := range jobChan {
process(job)
}
}
// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)
// start the worker
go worker(jobChan)
// enqueue a job
jobChan <- job
代碼中創建了一個 Job 對象的 channel , 容量為100。然后開啟一個工作者協程從 channel 中去除任務並執行。任務的入隊操作就是將一個 Job 對象放入任務 channel 中。
雖然上面只有短短的幾行代碼,卻完成了很多的工作。我們實現了一個簡易的線程安全的、支持並發的、可靠的任務隊列。
限流
上面的例子中,我們初始化了一個容量為 100 的任務 channel。
// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)
這意味着任務的入隊操作十分簡單,如下:
// enqueue a job
jobChan <- job
這樣一來,當 job channel 中已經放入 100 個任務的時候,入隊操作將會阻塞,直至有任務被工作者處理完成。這通常不是一個好的現象,因為我們通常不希望程序出現阻塞等待。這時候,我們通常希望有一個超時機制來告訴服務調用方,當前服務忙,稍后重試。我之前的博文--我讀《通過Go來處理每分鍾達百萬的數據請求》介紹過類似的限流策略。這里方法類似,就是當隊列滿的時候,返回503,告訴調用方服務忙。代碼如下:
// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
select {
case jobChan <- job:
return true
default:
return false
}
}
這樣一來,我們嘗試入隊的時候,如果入隊失敗,放回一個 false ,這樣我們再對這個返回值處理如下:
if !TryEnqueue(job, chan) {
http.Error(w, "max capacity reached", 503)
return
}
這樣就簡單的實現了限流操作。當 jobChan 滿的時候,程序會走到 default 返回 false ,從而告知調用方當前的服務器情況。
關閉工作者
到上面的步驟,限流已經可以解決,那么我們接下來考慮,怎么才能優雅的關閉工作者?假設我們決定不再向任務隊列插入任務,我們希望讓所有的已入隊任務執行完成,我們可以非常簡單的實現:
close(jobChan)
沒錯,就是這一行代碼,我們就可以讓任務隊列不再接收新任務(仍然可以從 channel 讀取 job ),如果我們想執行隊列里的已經存在的任務,只需要:
for job := range jobChan {...}
所有已經入隊的 job 會正常被 woker 取走執行。但是,這樣實際上還存在一個問題,就是主協成不會等待工作者執行完工作就會退出。它不知道工作者協成什么時候能夠處理完以上的任務。可以運行的例子如下:
package main
import (
"fmt"
)
var jobChan chan int
func worker(jobChan <- chan int) {
for job := range jobChan{
fmt.Printf("執行任務 %d \n", job)
}
}
func main() {
jobChan = make(chan int, 100)
//入隊
for i := 1; i <= 10; i++{
jobChan <- i
}
close(jobChan)
go worker(jobChan)
}
運行發現,woker 無法保證執行完 channel 中的 job 就退出了。那我們怎么解決這個問題?
等待 woker 執行完成
使用 sysc.WaitGroup:
package main
import (
"fmt"
"sync"
)
var jobChan chan int
var wg sync.WaitGroup
func worker(jobChan <- chan int) {
defer wg.Done()
for job := range jobChan{
fmt.Printf("執行任務 %d \n", job)
}
}
func main() {
jobChan = make(chan int, 100)
//入隊
for i := 1; i <= 10; i++{
jobChan <- i
}
wg.Add(1)
close(jobChan)
go worker(jobChan)
wg.Wait()
}
使用這種協程間同步的方法,協成會等待 worker 執行完 job 才會退出。運行結果:
執行任務 1
執行任務 2
執行任務 3
執行任務 4
執行任務 5
執行任務 6
執行任務 7
執行任務 8
執行任務 9
執行任務 10
Process finished with exit code 0
這樣是完美的么?在設計功能的時候,為了防止協程假死,我們應該給協程設置一個超時。
超時設置
上面的例子中 wg.Wait() 會一直等待,直到 wg.Done() 被調用。但是如果這個操作假死,無法調用,將永遠等待。這是我們不希望看到的,因此,我們可以給他設置一個超時時間。方法如下:
package main
import (
"fmt"
"sync"
"time"
)
var jobChan chan int
var wg sync.WaitGroup
func worker(jobChan <-chan int) {
defer wg.Done()
for job := range jobChan {
fmt.Printf("執行任務 %d \n", job)
time.Sleep(1 * time.Second)
}
}
func main() {
jobChan = make(chan int, 100)
//入隊
for i := 1; i <= 10; i++ {
jobChan <- i
}
wg.Add(1)
close(jobChan)
go worker(jobChan)
res := WaitTimeout(&wg, 5*time.Second)
if res {
fmt.Println("執行完成退出")
} else {
fmt.Println("執行超時退出")
}
}
//超時機制
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
ch := make(chan struct{})
go func() {
wg.Wait()
close(ch)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
}
}
執行結果如下:
執行任務 1
執行任務 2
執行任務 3
執行任務 4
執行任務 5
執行超時退出
Process finished with exit code 0
這樣,5s 超時生效,雖然不是所有的任務被執行,由於超時,也會退出。
有時候我們希望 woker 丟棄在執行的工作,也就是 cancel 操作,怎么處理?
Cancel Worker
我們可以借助 context.Context 實現。如下:
package main
import (
"context"
"fmt"
"sync"
"time"
)
var jobChan chan int
var ctx context.Context
var cancel context.CancelFunc
func worker(jobChan <-chan int, ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job := <-jobChan:
fmt.Printf("執行任務 %d \n", job)
time.Sleep(1 * time.Second)
}
}
}
func main() {
jobChan = make(chan int, 100)
//帶有取消功能的 contex
ctx, cancel = context.WithCancel(context.Background())
//入隊
for i := 1; i <= 10; i++ {
jobChan <- i
}
close(jobChan)
go worker(jobChan, ctx)
time.Sleep(2 * time.Second)
//調用cancel
cancel()
}
結果如下:
執行任務 1
執行任務 2
Process finished with exit code 0
可以看出,我們等待2s后,我們主動調用了取消操作,woker 協程主動退出。
這是借助 context 包實現了取消操作,實質上也是監聽一個 channel 的操作,那我們有沒有可能不借助 context 實現取消操作呢?
不使用 context 的超時機制實現取消:
package main
import (
"fmt"
"time"
)
var jobChan chan int
func worker(jobChan <-chan int, cancelChan <-chan struct{}) {
for {
select {
case <-cancelChan:
return
case job := <-jobChan:
fmt.Printf("執行任務 %d \n", job)
time.Sleep(1 * time.Second)
}
}
}
func main() {
jobChan = make(chan int, 100)
//通過chan 取消操作
cancelChan := make(chan struct{})
//入隊
for i := 1; i <= 10; i++ {
jobChan <- i
}
close(jobChan)
go worker(jobChan, cancelChan)
time.Sleep(2 * time.Second)
//關閉chan
close(cancelChan)
}
這樣,我們使用一個關閉 chan 的信號實現了取消操作。原因是無緩沖 chan 讀取會阻塞,當關閉后,可以讀取到空,因此會執行 select 里的 return.
總結
照例總結一波,本文介紹了 golang 協程間的同步和通信的一些方法,任務隊列的最簡單實現。關於工作者池的實現,我在其他博文也寫到了,這里不多寫。本文更多是工具性的代碼,寫功能時候可以借用,比如超時、取消、chan的操作等。
