Golang 任務隊列策略 -- 讀《JOB QUEUES IN GO》


Golang 在異步處理上有着上佳的表現。因為 goroutines 和 channels 是非常容易使用且有效的異步處理手段。下面我們一起來看一看 Golang 的簡易任務隊列

一種"非任務隊列"的任務隊列

有些時候,我們需要做異步處理但是並不需要一個任務對列,這類問題我們使用 Golang 可以非常簡單的實現。如下:

go process(job)

這的確是很多場景下的絕佳選擇,比如操作一個HTTP請求等待結果。然而,在一些相對復雜高並發的場景下,你就不能簡單的使用該方法來實現異步處理。這時候,你需要一個隊列來管理需要處理的任務,並且按照一定的順序來處理這些任務。

最簡單的任務隊列

接下來看一個最簡單的任務隊列和工作者模型。

func worker(jobChan <-chan Job) {
    for job := range jobChan {
        process(job)
    }
}

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

// start the worker
go worker(jobChan)

// enqueue a job
jobChan <- job

代碼中創建了一個 Job 對象的 channel , 容量為100。然后開啟一個工作者協程從 channel 中去除任務並執行。任務的入隊操作就是將一個 Job 對象放入任務 channel 中。

雖然上面只有短短的幾行代碼,卻完成了很多的工作。我們實現了一個簡易的線程安全的、支持並發的、可靠的任務隊列。

限流

上面的例子中,我們初始化了一個容量為 100 的任務 channel。

// make a channel with a capacity of 100.
jobChan := make(chan Job, 100)

這意味着任務的入隊操作十分簡單,如下:

// enqueue a job
jobChan <- job

這樣一來,當 job channel 中已經放入 100 個任務的時候,入隊操作將會阻塞,直至有任務被工作者處理完成。這通常不是一個好的現象,因為我們通常不希望程序出現阻塞等待。這時候,我們通常希望有一個超時機制來告訴服務調用方,當前服務忙,稍后重試。我之前的博文--我讀《通過Go來處理每分鍾達百萬的數據請求》介紹過類似的限流策略。這里方法類似,就是當隊列滿的時候,返回503,告訴調用方服務忙。代碼如下:

// TryEnqueue tries to enqueue a job to the given job channel. Returns true if
// the operation was successful, and false if enqueuing would not have been
// possible without blocking. Job is not enqueued in the latter case.
func TryEnqueue(job Job, jobChan <-chan Job) bool {
    select {
    case jobChan <- job:
        return true
    default:
        return false
    }
}

這樣一來,我們嘗試入隊的時候,如果入隊失敗,放回一個 false ,這樣我們再對這個返回值處理如下:

if !TryEnqueue(job, chan) {
    http.Error(w, "max capacity reached", 503)
    return
}

這樣就簡單的實現了限流操作。當 jobChan 滿的時候,程序會走到 default 返回 false ,從而告知調用方當前的服務器情況。

關閉工作者

到上面的步驟,限流已經可以解決,那么我們接下來考慮,怎么才能優雅的關閉工作者?假設我們決定不再向任務隊列插入任務,我們希望讓所有的已入隊任務執行完成,我們可以非常簡單的實現:

close(jobChan)

沒錯,就是這一行代碼,我們就可以讓任務隊列不再接收新任務(仍然可以從 channel 讀取 job ),如果我們想執行隊列里的已經存在的任務,只需要:

for job := range jobChan {...}

所有已經入隊的 job 會正常被 woker 取走執行。但是,這樣實際上還存在一個問題,就是主協成不會等待工作者執行完工作就會退出。它不知道工作者協成什么時候能夠處理完以上的任務。可以運行的例子如下:

package main

import (
	"fmt"
)

var jobChan chan int

func worker(jobChan <- chan int)  {
	for job := range jobChan{
		fmt.Printf("執行任務 %d \n", job)
	}
}

func main() {
	jobChan = make(chan int, 100)
	//入隊
	for i := 1; i <= 10; i++{
		jobChan <- i
	}
	
	close(jobChan)
	go worker(jobChan)

}

運行發現,woker 無法保證執行完 channel 中的 job 就退出了。那我們怎么解決這個問題?

等待 woker 執行完成

使用 sysc.WaitGroup:

package main

import (
	"fmt"
	"sync"
)

var jobChan chan int
var wg sync.WaitGroup

func worker(jobChan <- chan int)  {
	defer wg.Done()
	for job := range jobChan{
		fmt.Printf("執行任務 %d \n", job)
	}
}

func main() {
	jobChan = make(chan int, 100)
	//入隊
	for i := 1; i <= 10; i++{
		jobChan <- i
	}

	wg.Add(1)
	close(jobChan)

	go worker(jobChan)
	wg.Wait()
}

使用這種協程間同步的方法,協成會等待 worker 執行完 job 才會退出。運行結果:

執行任務 1 
執行任務 2 
執行任務 3 
執行任務 4 
執行任務 5 
執行任務 6 
執行任務 7 
執行任務 8 
執行任務 9 
執行任務 10 

Process finished with exit code 0

這樣是完美的么?在設計功能的時候,為了防止協程假死,我們應該給協程設置一個超時。

超時設置

上面的例子中 wg.Wait() 會一直等待,直到 wg.Done() 被調用。但是如果這個操作假死,無法調用,將永遠等待。這是我們不希望看到的,因此,我們可以給他設置一個超時時間。方法如下:

package main

import (
	"fmt"
	"sync"
	"time"
)

var jobChan chan int
var wg sync.WaitGroup

func worker(jobChan <-chan int) {
	defer wg.Done()
	for job := range jobChan {
		fmt.Printf("執行任務 %d \n", job)
		time.Sleep(1 * time.Second)
	}
}

func main() {
	jobChan = make(chan int, 100)
	//入隊
	for i := 1; i <= 10; i++ {
		jobChan <- i
	}

	wg.Add(1)
	close(jobChan)

	go worker(jobChan)
	res := WaitTimeout(&wg, 5*time.Second)
	if res {
		fmt.Println("執行完成退出")
	} else {
		fmt.Println("執行超時退出")
	}
}

//超時機制
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
	ch := make(chan struct{})
	go func() {
		wg.Wait()
		close(ch)
	}()
	select {
	case <-ch:
		return true
	case <-time.After(timeout):
		return false
	}
}

執行結果如下:

執行任務 1 
執行任務 2 
執行任務 3 
執行任務 4 
執行任務 5 
執行超時退出

Process finished with exit code 0

這樣,5s 超時生效,雖然不是所有的任務被執行,由於超時,也會退出。

有時候我們希望 woker 丟棄在執行的工作,也就是 cancel 操作,怎么處理?

Cancel Worker

我們可以借助 context.Context 實現。如下:

package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

var jobChan chan int
var ctx context.Context
var cancel context.CancelFunc

func worker(jobChan <-chan int, ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case job := <-jobChan:
			fmt.Printf("執行任務 %d \n", job)
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	jobChan = make(chan int, 100)
	//帶有取消功能的 contex
	ctx, cancel = context.WithCancel(context.Background())
	//入隊
	for i := 1; i <= 10; i++ {
		jobChan <- i
	}

	close(jobChan)

	go worker(jobChan, ctx)
	time.Sleep(2 * time.Second)
	//調用cancel
	cancel()
}

結果如下:

執行任務 1 
執行任務 2 

Process finished with exit code 0

可以看出,我們等待2s后,我們主動調用了取消操作,woker 協程主動退出。

這是借助 context 包實現了取消操作,實質上也是監聽一個 channel 的操作,那我們有沒有可能不借助 context 實現取消操作呢?

不使用 context 的超時機制實現取消:

package main

import (
	"fmt"
	"time"
)

var jobChan chan int

func worker(jobChan <-chan int, cancelChan <-chan struct{}) {
	for {
		select {
		case <-cancelChan:
			return
		case job := <-jobChan:
			fmt.Printf("執行任務 %d \n", job)
			time.Sleep(1 * time.Second)
		}
	}
}

func main() {
	jobChan = make(chan int, 100)
	//通過chan 取消操作
	cancelChan := make(chan struct{})
	//入隊
	for i := 1; i <= 10; i++ {
		jobChan <- i
	}

	close(jobChan)

	go worker(jobChan, cancelChan)
	time.Sleep(2 * time.Second)
	//關閉chan
	close(cancelChan)
}

這樣,我們使用一個關閉 chan 的信號實現了取消操作。原因是無緩沖 chan 讀取會阻塞,當關閉后,可以讀取到空,因此會執行 select 里的 return.

總結

照例總結一波,本文介紹了 golang 協程間的同步和通信的一些方法,任務隊列的最簡單實現。關於工作者池的實現,我在其他博文也寫到了,這里不多寫。本文更多是工具性的代碼,寫功能時候可以借用,比如超時、取消、chan的操作等。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM