Golang協程池(workpool)實現


背景

因與工作相關,所以本文中的數據都進行了更改,但邏輯是一樣的。

筆者的服務ServerA會請求服務ServerH獲取一些數據,但ServerH的接口有個N秒內只能請求M次的限制,並返回false。而筆者的服務瞬時請求量遠超M次,所以采用了協程池在收到103錯誤時,停止worker的運行N秒,然后再啟動。

協程池的概念

協程池的相關概念:要有一個一定數量大小的池子(pool),池子里存儲需要執行的任務(task),還要有若干個工作協程(worker)。

協程池要有啟動,停止,睡眠的功能。

下面是從零開始記錄一下思想過程和遇到的問題。

基礎版

在此版本里,除了睡眠的功能,已經實現了一個基本的協程池。

// workpool.go
package workpool

import (
	"context"
	"sync"
)

type TaskFunc func()

type Task struct {
	f TaskFunc
}

type WorkPool struct {
	pool        chan *Task
	workerCount int

	stopCtx        context.Context
	stopCancelFunc context.CancelFunc
	wg             sync.WaitGroup
}

func (t *Task) Execute() {
	t.f()
}

func New(workerCount, poolLen int) *WorkPool {
	return &WorkPool{
		workerCount: workerCount,
		pool:        make(chan *Task, poolLen),
	}
}

func (w *WorkPool) PushTask(t *Task) {
	w.pool <- t
}

func (w *WorkPool) PushTaskFunc(f TaskFunc) {
	w.pool <- &Task{
		f: f,
	}
}

func (w *WorkPool) work() {
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case t := <-w.pool:
			t.Execute()
		}
	}
}

func (w *WorkPool) Start() *WorkPool {
	w.wg.Add(w.workerCount)
	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
	for i := 0; i < w.workerCount; i++ {
		go w.work()
	}
	return w
}

func (w *WorkPool) Stop() {
	w.stopCancelFunc()
	w.wg.Wait()
}

看起來沒什么毛病,還挺簡潔。其實不然...

下面的程序是創建一個容量為50的workpool,並將通過3個worker輸出100個數字。

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
)

func TestWorkPool_Start(t *testing.T) {
	wg := sync.WaitGroup{}
	wp := New(3, 50).Start()
	lenth := 100
	wg.Add(lenth)
	for i := 0; i < lenth; i++ {
		wp.PushTaskFunc(func() {
			defer wg.Done()
			fmt.Print(i, " ")
		})
	}
	wg.Wait()
}

運行后輸出結果如下:

50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 50 51 51 51 51 69 72 78 78 80 81 81 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 83 84 84 84 84 50 84
100 100 100 100 100 100 100 100 100 100 50 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 100 84 100 100 100 

這和想象中的輸出 0-99 相差甚遠。

其原因在於閉包函數對於外部變量是引用的,所以在函數執行的時候,i的值早就已經改變了。下面是一個關於閉包的簡單例子。

x := 1
f := func() {
	println(x)
}
x = 2
x = 3
f() // 3

可以將 f() 的調用時機對應為協程池中的 t.Execute()。

解決閉包引用問題

既然是因為閉包引用導致的問題,那就不使用閉包了唄。

可以把參數傳到函數內,但是因為並不知道將要執行的函數需要的參數個數及類型,所以只能是使用不定長的interface{}TaskFunc,在使用的時候進行斷言。

以下僅列出改動部分:

// workpool.go
type TaskFunc func(args ...interface{})

type Task struct {
	f    TaskFunc
	args []interface{}
}

func (t *Task) Execute() {
	t.f(t.args...)
}

func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
	w.pool <- &Task{
		f:    f,
		args: args,
	}
}

以下是測試程序:

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
)

func TestWorkPool_Start(t *testing.T) {
	wg := sync.WaitGroup{}
	wp := New(3, 50).Start()
	lenth := 100
	wg.Add(lenth)
	for i := 0; i < lenth; i++ {
		wp.PushTaskFunc(func(args ...interface{}) {
			defer wg.Done()
			fmt.Print(args[0].(int), " ")
		}, i)
	}
	wg.Wait()
}

輸出內容如下:

0 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 2 1 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 26 48 49 51 52 53 54 55 56 50 58 59 57 61 62 63 64 65 66 25 68 6
9 70 71 72 73 67 75 76 77 74 79 78 81 82 83 84 60 86 87 88 89 90 91 92 85 94 95 96 97 98 99 80 93 

雖然順序是錯亂的,但這是正常情況,閉包引用問題已解決。

添加睡眠功能

基於開頭的應用場景,在任意一個被worker執行的任務收到ServerH的103錯誤后,要停止所有worker一段時間,因為再一直請求也沒有意義。

這個版本已經與筆者正在使用的相差無幾了

// workpool.go
package workpool

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type Flag int64

const (
	FLAG_OK    Flag = 1 << iota
	FLAG_RETRY Flag = 1 << iota
)

type TaskFunc func(w *WorkPool, args ...interface{}) Flag

type Task struct {
	f    TaskFunc
	args []interface{}
}

type WorkPool struct {
	pool        chan *Task
	workerCount int

        // stop相關
	stopCtx        context.Context
	stopCancelFunc context.CancelFunc
	wg             sync.WaitGroup

        // sleep相關
	sleepCtx        context.Context
	sleepCancelFunc context.CancelFunc
	sleepSeconds    int64
	sleepNotify     chan bool
}

func (t *Task) Execute(w *WorkPool) Flag {
	return t.f(w, t.args...)
}

func New(workerCount, poolLen int) *WorkPool {
	return &WorkPool{
		workerCount: workerCount,
		pool:        make(chan *Task, poolLen),
		sleepNotify: make(chan bool),
	}
}

func (w *WorkPool) PushTask(t *Task) {
	w.pool <- t
}

func (w *WorkPool) PushTaskFunc(f TaskFunc, args ...interface{}) {
	w.pool <- &Task{
		f:    f,
		args: args,
	}
}

func (w *WorkPool) work(i int) {
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case <-w.sleepCtx.Done():
			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
		case t := <-w.pool:
			flag := t.Execute(w)
			if flag&FLAG_RETRY != 0 {
				w.PushTask(t)
				fmt.Printf("work %v PushTask,pool length %v\n", i, len(w.pool))
			}
		}
	}
}

func (w *WorkPool) Start() *WorkPool {
	fmt.Printf("workpool run %d worker\n", w.workerCount)
	w.wg.Add(w.workerCount + 1)
	w.stopCtx, w.stopCancelFunc = context.WithCancel(context.Background())
	w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
	go w.sleepControl()
	for i := 0; i < w.workerCount; i++ {
		go w.work(i)
	}
	return w
}

func (w *WorkPool) Stop() {
	w.stopCancelFunc()
	w.wg.Wait()
}

func (w *WorkPool) sleepControl() {
	fmt.Println("sleepControl start...")
	for {
		select {
		case <-w.stopCtx.Done():
			w.wg.Done()
			return
		case <-w.sleepNotify:
			fmt.Printf("receive sleep notify start...\n")
			w.sleepCtx, w.sleepCancelFunc = context.WithCancel(context.Background())
			w.sleepCancelFunc()
			fmt.Printf("sleepControl will star sleep %v s\n", w.sleepSeconds)
			time.Sleep(time.Duration(w.sleepSeconds) * time.Second)
			w.sleepSeconds = 0
			fmt.Println("sleepControl was end sleep")
		}
	}
}


func (w *WorkPool) SleepNotify(seconds int64) {
	// 因為需要CAS操作,所以sleepSeconds沒有采用time.Duration類型
	// 成功設置后才發出通知
	if atomic.CompareAndSwapInt64(&w.sleepSeconds, 0, seconds) {
		fmt.Printf("sleepSeconds set %v\n", seconds)
		w.sleepNotify <- true
	}
}

下面的測試程序中,模擬了一下ServerH,其使用場景與筆者工作中大同小異。

// workpool_test.go
package workpool

import (
	"fmt"
	"sync"
	"testing"
	"time"
)

// 這里模擬ServerH服務的限流操作
var serverh = &server{max: 10, interval: 5}

type server struct {
	count    int
	max      int
	lasttime time.Time
	interval int64
	mu       sync.Mutex
}

func (s *server) Access(i int) bool {
	now := time.Now()

	s.mu.Lock()
	defer s.mu.Unlock()

	time.Sleep(100 * time.Millisecond)

	if s.lasttime.Unix() <= 0 || s.count >= s.max {
		if now.After(s.lasttime) {
			s.count = 1
			s.lasttime = time.Unix(now.Unix()+s.interval, 0)
			return true
		}
		fmt.Printf("Access false,i=%d \n", i)
		return false
	} else {
		s.count++
		fmt.Printf("Access true,i=%d s.count %d\n", i, s.count)
		return true
	}
}

// 這里是筆者服務的邏輯
func TestWorkPool_Start(t *testing.T) {
	wp := New(3, 100).Start()
	for i := 0; i < 100; i++ {
		time.Sleep(100 * time.Millisecond)
		wp.PushTaskFunc(func(w *WorkPool, args ...interface{}) Flag {
			if !serverh.Access(args[0].(int)) {
                                // 發送睡眠5秒的通知
				w.SleepNotify(5) 
                                // 此次未執行成功,要將該任務放回協程池
				return FLAG_RETRY 
			}
			return FLAG_OK
		}, i)
	}
	time.Sleep(100 * time.Second)
}

輸出內容如下:

workpool run 3 worker
sleepControl start...
Access true,i=1 s.count 2
Access true,i=2 s.count 3
Access true,i=3 s.count 4
Access true,i=4 s.count 5
Access true,i=5 s.count 6
Access true,i=6 s.count 7
Access true,i=7 s.count 8
Access true,i=8 s.count 9
Access true,i=9 s.count 10
Access false,i=10 
sleepSeconds set 5
work 1 PushTask,pool length 0
receive sleep notify start...
sleepControl will star sleep 5 s
Access false,i=10 
work 0 PushTask,pool length 1
Access false,i=10 
work 0 PushTask,pool length 2
Access false,i=11 
work 2 PushTask,pool length 3
Access false,i=12 
work 1 PushTask,pool length 5
Access false,i=13 
work 0 PushTask,pool length 6
Access false,i=14 
work 0 PushTask,pool length 7
Access false,i=10 
work 1 PushTask,pool length 8
Access false,i=15 
work 1 PushTask,pool length 9
Access false,i=11 
work 0 PushTask,pool length 11
Access false,i=12 
work 0 PushTask,pool length 11
Access false,i=16 
work 0 PushTask,pool length 12
sleepControl was end sleep
Access true,i=17 s.count 2
Access true,i=14 s.count 3
Access true,i=18 s.count 4
Access true,i=10 s.count 5
Access true,i=15 s.count 6
Access true,i=20 s.count 7
Access true,i=19 s.count 8
Access true,i=12 s.count 9
Access true,i=11 s.count 10
Access false,i=21 
sleepSeconds set 5
work 0 PushTask,pool length 53
receive sleep notify start...
sleepControl will star sleep 5 s
Access false,i=16 
work 1 PushTask,pool length 54
Access false,i=22 
work 2 PushTask,pool length 55
Access false,i=23 
work 0 PushTask,pool length 57
Access false,i=24 
...........

待補充

重試次數的邏輯


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM