前言
Go語言雖然開並發Goroutine特別簡單,但是實際中如果不控制並發的數量會導致資源的浪費以及同時占用大量服務資源(http連接、數據庫連接、文件句柄等)導致服務性能下降!
筆者之前總結過一篇在業務代碼中控制並發數量的文章:Go控制協裎並發數量的用法及實際中的一個案例
ants庫實現鏈接池的效果控制並發量
今天介紹另外一個控制並發數量的第三方庫:ants
簡而言之,ants庫通過實現“Goroutine鏈接池”來限制Goroutine的數量:通過NewPool函數創建一個goroutine pool實現具體效果。
創建完 goroutine pool 后,通過 pool.Submit
方法向 pool 中提交任務。
如果 pool 中尚有空閑的 goroutine worker,則 pool.Submit
立即返回;否則根據 pool 的配置,pool.Submit
立即返回錯誤或等待有空閑 goroutine worker 成功接收任務后返回。
使用案例
使用之前記當然是 go get一下:
go get github.com/panjf2000/ants
基本使用
最基本的使用場景是:提交任務,等待任務完成並獲取結果。

package test1 import ( "fmt" "github.com/panjf2000/ants/v2" "sync" "testing" ) func sum(a, b int) int { return a + b } func wrapSum(i int, ch chan int, wg *sync.WaitGroup) func() { return func() { defer wg.Done() ch <- sum(i, i) } } func TestT1(t *testing.T) { var wg sync.WaitGroup ch := make(chan int, 10) // ants.Release 相當於調用 defaultPool.Release,停止 defaultPool 中所有的 goroutine worker. defer ants.Release() for i := 0; i < 10; i++ { wg.Add(1) // ants.Submit 相當於調用 defaultPool.Submit,而 defaultPool 是在 package 初始化時 ants 庫創建的 if err := ants.Submit(wrapSum(i, ch, &wg)); err != nil { return } } wg.Wait() close(ch) for v := range ch { fmt.Println(v) } }
需要注意以下幾點:
1、這里使用的是ants包默認的鏈接池(ants.Submit方法),打開源碼可以看到鏈接池的容量大小為: math.MaxInt32*(2147483647),所以實際中推薦大家自己控制鏈接池的容量大小。
2、Submit 方法只接受 func() 類型的參數,如果提交的任務有參數,需要自己 wrap。
3、ants 沒有提供返回值機制,任務的執行結果需要自己進行處理,例子中用了一個帶 buffer 的 channel。需要注意的是,當 pool 中有多個任務時,任務的返回值不是根據任務的提交順序進行排序的,任務的返回順序取決於調用時機,可以認為是隨機的。
4、ants 沒有提供等待所有任務完成的機制,例子中用了 sync.WaitGroup 實現了等待所有任務完成的機制,否則 main goroutine 可能會在任務執行結束前退出。
配置pool為nonblocking狀態的情況
以下示例將 pool 配置為 nonblocking。在這種情況下,當 pool 中沒有 可用的 goroutine worker 時,Submit 會直接返回錯誤 ants.ErrPoolOverload
,而不會等待提交任務成功才返回。
另外這里可以配置鏈接池的大小(ants.NewPool方法):

package test1 import ( "fmt" "github.com/panjf2000/ants/v2" "testing" ) func hangForever() { ch := make(chan int) ch <- 10 } func TestT2(t *testing.T) { pool, err := ants.NewPool(10, ants.WithNonblocking(true)) if err != nil { return } defer pool.Release() for i := 0; i < 10; i++ { if err := pool.Submit(hangForever); err != nil { return } } if err := pool.Submit(func() { fmt.Println("hello") }); err != nil { fmt.Printf("err=ErrPoolOverload:%t\n", err == ants.ErrPoolOverload) } }
關於超時任務的處理 ***
在實際中我們往往會希望在摸一個Goroutine執行任務超時或者其他一些情況下退出而不是一直占用着資源!
但是由於線程才是操作系統可調度的最小的單位,Goroutine是代碼級別的並發,由於GMP模型的限制,我們並不能確定開啟的子Goroutine什么時候執行,Go中也沒有像epoll那樣的“輪詢機制”——專門開一個協程去輪詢其他的子Goroutine管理它們,所以想要真正的去實現子Goroutine的超時退出需要程序員們在業務代碼中做相應的邏輯處理。
我這里使用context去簡單處理超時的Goroutine:

package test1 import ( "context" "fmt" "github.com/panjf2000/ants/v2" "testing" "time" ) func expensiveTask2(ctx context.Context, a, b int) (int, error) { select { // simulate an expensive task case <-time.After(10 * time.Second): return a + b, nil case <-ctx.Done(): return 0, ctx.Err() } } func wrap2(ctx context.Context) func() { return func() { sum, err := expensiveTask2(ctx, 10, 20) if err != nil { fmt.Printf("error is %v\n", err) } else { fmt.Printf("sum is %d\n", sum) } } } func TestT31(t *testing.T) { pool, err := ants.NewPool(1) if err != nil { return } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() if err := pool.Submit(wrap2(ctx)); err != nil { return } // wait other goroutines. for i := 0; i < 20; i++ { time.Sleep(time.Second) fmt.Printf("main waits for %d seconds\n", i+1) } }
以上的例子中,在提交任務時,向任務傳遞了一個 3 秒鍾超時的 context。
在任務函數的邏輯中,通過 Done()
方法等待停止信號(超時或被 main goroutine 主動 cancel),從而使任務函數在一定的時機結束,避免一直執行下去。
需要注意的是,expensiveTask
函數中用了 select
來等待 Done()
的返回,在業務邏輯的哪個時機等待 Done
,需要開程序員自己去設計!