33. 如何手動實現一個協程池?


Hi,大家好,我是明哥。

在自己學習 Golang 的這段時間里,我寫了詳細的學習筆記放在我的個人微信公眾號 《Go編程時光》,對於 Go 語言,我也算是個初學者,因此寫的東西應該會比較適合剛接觸的同學,如果你也是剛學習 Go 語言,不防關注一下,一起學習,一起成長。

我的在線博客:http://golang.iswbm.com
我的 Github:github.com/iswbm/GolangCodingTime


在 Golang 中要創建一個協程是一件無比簡單的事情,你只要定義一個函數,並使用 go 關鍵字去執行它就行了。

如果你接觸過其他語言,會發現你在使用使用線程時,為了減少線程頻繁創建銷毀還來的開銷,通常我們會使用線程池來復用線程。

池化技術就是利用復用來提升性能的,那在 Golang 中需要協程池嗎?

在 Golang 中,goroutine 是一個輕量級的線程,他的創建、調度都是在用戶態進行,並不需要進入內核,這意味着創建銷毀協程帶來的開銷是非常小的。

因此,我認為大多數情況下,開發人員是不太需要使用協程池的。

但也不排除有某些場景下是需要這樣做,因為我還沒有遇到就不說了。

拋開是否必要這個問題,單純從技術的角度來看,我們可以怎樣實現一個通用的協程池呢?

下面就來一起學習一下我的寫法

首先定義一個協程池(Pool)結構體,包含兩個屬性,都是 chan 類型的。

一個是 work,用於接收 task 任務

一個是 sem,用於設置協程池大小,即可同時執行的協程數量

type Pool struct {
    work chan func()   // 任務
    sem  chan struct{} // 數量
}

然后定義一個 New 函數,用於創建一個協程池對象,有一個細節需要注意

work 是一個無緩沖通道

而 sem 是一個緩沖通道,size 大小即為協程池大小

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

最后給協程池對象綁定兩個函數

1、NewTask:往協程池中添加任務

當第一次調用 NewTask 添加任務的時候,由於 work 是無緩沖通道,所以會一定會走第二個 case 的分支:使用 go worker 開啟一個協程。

func (p *Pool) NewTask(task func()) { 
    select {
        case p.work <- task:
        case p.sem <- struct{}{}:
            go p.worker(task)
    }
}

2、worker:用於執行任務

為了能夠實現協程的復用,這個使用了 for 無限循環,使這個協程在執行完任務后,也不退出,而是一直在接收新的任務。

func (p *Pool) worker(task func()) { 
    defer func() { <-p.sem }()
    for {
        task()
        task = <-p.work
    }
}

這兩個函數是協程池實現的關鍵函數,里面的邏輯很值得推敲:

1、如果設定的協程池數大於 2,此時第二次傳入往 NewTask 傳入task,select case 的時候,如果第一個協程還在運行中,就一定會走第二個case,重新創建一個協程執行task

2、如果傳入的任務數大於設定的協程池數,並且此時所有的任務都還在運行中,那此時再調用 NewTask 傳入 task ,這兩個 case 都不會命中,會一直阻塞直到有任務執行完成,worker 函數里的 work 通道才能接收到新的任務,繼續執行。

以上便是協程池的實現過程。

使用它也很簡單,看下面的代碼你就明白了

func main()  {
    pool := New(128)
    pool.NewTask(func(){
        fmt.Println("run task")
    })
}

為了讓你看到效果,我設置協程池數為 2,開啟四個任務,都是 sleep 2 秒后,打印當前時間。

func main()  {
    pool := New(2)

    for i := 1; i <5; i++{
        pool.NewTask(func(){
            time.Sleep(2 * time.Second)
            fmt.Println(time.Now())
        })
    }
    
    // 保證所有的協程都執行完畢
    time.Sleep(5 * time.Second)
}

執行結果如下,可以看到總共 4 個任務,由於協程池大小為 2,所以 4 個任務分兩批執行(從打印的時間可以看出)

2020-05-24 23:18:02.014487 +0800 CST m=+2.005207182
2020-05-24 23:18:02.014524 +0800 CST m=+2.005243650
2020-05-24 23:18:04.019755 +0800 CST m=+4.010435443
2020-05-24 23:18:04.019819 +0800 CST m=+4.010499440


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM