Goroutine Pool架構
超大規模並發的場景下,不加限制的大規模的goroutine可能造成內存暴漲,給機器帶來極大的壓力,吞吐量下降和處理速度變慢。
而實現一個Goroutine Pool,復用goroutine,減輕runtime的調度壓力以及緩解內存壓力,依托這些優化,在大規模goroutine並發的場景下可以極大地提高並發性能。
Pool類型
type Pool struct { // capacity of the pool. //capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行; //worker類型為任務類。 capacity int32 // running is the number of the currently running goroutines. //running是當前正在執行任務的worker數量 running int32 // expiryDuration set the expired time (second) of every worker. //expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運行時間與當前時間之差如果大於這個值則表示已過期,定時清理任務會清理掉這個worker; expiryDuration time.Duration // workers is a slice that store the available workers. //任務隊列 workers []*Worker // release is used to notice the pool to closed itself. //當關閉該Pool支持通知所有worker退出運行以防goroutine泄露 release chan sig // lock for synchronous operation //用以支持Pool的同步操作 lock sync.Mutex //once用在確保Pool關閉操作只會執行一次 once sync.Once }
初始化Pool
// NewPool generates a instance of ants pool func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, errors.New("Pool Size <0,not Create") } p := &Pool{ capacity: int32(size), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, running: 0, } // 啟動定期清理過期worker任務,獨立goroutine運行, // 進一步節省系統資源 p.monitorAndClear() return p, nil }
獲取Worker
// getWorker returns a available worker to run the tasks. func (p *Pool) getWorker() *Worker { var w *Worker // 標志,表示當前運行的worker數量是否已達容量上限 waiting := false // 涉及從workers隊列取可用worker,需要加鎖 p.lock.Lock() workers := p.workers n := len(workers) - 1 fmt.Println("空閑worker數量:",n+1) fmt.Println("協程池現在運行的worker數量:",p.running) // 當前worker隊列為空(無空閑worker) if n < 0 { //沒有空閑的worker有兩種可能: //1.運行的worker超出了pool容量 //2.當前是空pool,從未往pool添加任務或者一段時間內沒有任務添加,被定期清除 // 運行worker數目已達到該Pool的容量上限,置等待標志 if p.running >= p.capacity { //print("超過上限") waiting = true } else { // 當前無空閑worker但是Pool還沒有滿, // 則可以直接新開一個worker執行任務 p.running++ w = &Worker{ pool: p, task: make(chan functinType), str:make(chan string), } } // 有空閑worker,從隊列尾部取出一個使用 } else { //<-p.freeSignal w = workers[n] workers[n] = nil p.workers = workers[:n] p.running++ } // 判斷是否有worker可用結束,解鎖 p.lock.Unlock() if waiting { //當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行: // 阻塞等待直到有空閑worker for len(p.workers) == 0{ continue } p.lock.Lock() workers = p.workers l := len(workers) - 1 w = workers[l] workers[l] = nil p.workers = workers[:l] p.running++ p.lock.Unlock() } return w }
定期清理過期Worker
func (p *Pool) monitorAndClear() { go func() { for { // 周期性循環檢查過期worker並清理 time.Sleep(p.expiryDuration) currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers n := 0 for i, w := range idleWorkers { // 計算當前時間減去該worker的最后運行時間之差是否符合過期時長 if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i w.stop() idleWorkers[i] = nil } if n > 0 { n++ p.workers = idleWorkers[n:] } p.lock.Unlock() } }() }
復用Worker
// putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { // 寫入回收時間,亦即該worker的最后運行時間 worker.recycleTime = time.Now() p.lock.Lock() p.running -- p.workers = append(p.workers, worker) p.lock.Unlock() }
動態擴容或者縮小容量
// ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { cap := int(p.capacity) if size < cap{ diff := cap - size for i := 0; i < diff; i++ { p.getWorker().stop() } } else if size == cap { return } atomic.StoreInt32(&p.capacity, int32(size)) }
提交Worker
// Submit submit a task to pool func (p *Pool) Submit(task functinType,str string) error { if len(p.release) > 0 { return errors.New("Pool is Close") } //創建或得到一個空閑的worker w := p.getWorker() w.run() //將任務參數通過信道傳遞給它 w.sendarg(str) //將任務通過信道傳遞給它 w.sendTask(task) return nil }
Worker類
package Poolpkg import ( "sync/atomic" "time" ) type functinType func(string) error // Worker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type Worker struct { // pool who owns this worker. pool *Pool // task is a job should be done. task chan functinType // recycleTime will be update when putting a worker back into queue. recycleTime time.Time str chan string } // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { go func() { //監聽任務列表,一旦有任務立馬取出運行 count := 1 var str string var f functinType for count <=2{ select { case str_temp, ok := <- w.str: if !ok { return } count ++ str = str_temp case f_temp, ok := <-w.task: if !ok { //如果接收到關閉 atomic.AddInt32(&w.pool.running, -1) close(w.task) return } count ++ f = f_temp } } err := f(str) if err != nil{ //fmt.Println("執行任務失敗") } //回收復用 w.pool.putWorker(w) return }() } // stop this worker. func (w *Worker) stop() { w.sendTask(nil) close(w.str) } // sendTask sends a task to this worker. func (w *Worker) sendTask(task functinType) { w.task <- task } func (w *Worker) sendarg(str string) { w.str <- str }
總結和實踐
怎么理解Woreker,task、Pool的關系
Woker類型其實就是task的載體,Worker類型有兩個很重要的參數:
task chan functinType:用來是傳遞task。
str chan string:用來傳遞task所需的參數。
task是任務本身,它一般為一個函數,在程序中被定義為函數類型:
type functinType func(string) error
Pool存儲Worker,當用戶要執行一個task時,首先要得到一個Worker,必須從池中獲取,獲取到一個Worker后,就開啟一個協程去處理,在這個協程中接收任務task和參數。
//創建或得到一個空閑的worker w := p.getWorker()
//開協程去處理 w.run() //將任務參數通過信道傳遞給它 w.sendarg(str) //將任務通過信道傳遞給它 w.sendTask(task)
Worker怎么接收task和參數
count定義接收數據的個數,一個Woker必須接收到task和參數才能開始工作。
工作完后這個Worker被返回到Pool中,下次還可以復用這個Worker,也就是復用Worker這個實例。
go func() { //監聽任務列表,一旦有任務立馬取出運行 count := 1 var str string var f functinType for count <=2{ select { case str_temp, ok := <- w.str: if !ok { return } count ++ str = str_temp case f_temp, ok := <-w.task: if !ok { //如果接收到關閉 atomic.AddInt32(&w.pool.running, -1) close(w.task) return } count ++ f = f_temp } } err := f(str) if err != nil{ //fmt.Println("執行任務失敗") } //回收復用 w.pool.putWorker(w) return }()
Pool怎么處理用戶提交task獲取Worker的請求
1.先得到Pool池中空閑Worker的數量,然后判斷
2.如果小於零,則表示池中沒有空閑的Worker,這里有兩種原因:
- 1.運行的Worker數量超過了Pool容量,當用戶獲取Worker的請求數量激增,池中大多數Worker都是執行完任務的Worker重新添加到池中的,返回的Worker跟不上激增的需求。
- 2.當前是空pool,從未往pool添加任務或者一段時間內沒有Worker任務運行,被定期清除。
3.如果大於或者等於零,有空閑的Worker直接從池中獲取最后一個Worker。
4.如果是第二種的第一種情況,則阻塞等待池中有空閑的Worker。
if waiting { //當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行: // 阻塞等待直到有空閑worker for len(p.workers) == 0{ continue } p.lock.Lock() workers = p.workers l := len(workers) - 1 w = workers[l] workers[l] = nil p.workers = workers[:l] p.running++ p.lock.Unlock() }
5.如果是第二種的第二種情況,直接創建一個Worker實例。
// 當前無空閑worker但是Pool還沒有滿, // 則可以直接新開一個worker執行任務 p.running++ w = &Worker{ pool: p, task: make(chan functinType), str:make(chan string), }
測試
package main import ( "Pool/Poolpkg" "fmt" ) func main(){
//開20個大小的Pool池,過期清除時間5分鍾 Pool,err := Poolpkg.NewPool(20,5) i :=0 for i < 50 { err = Pool.Submit(Print_Test1,"並發測試!") if err != nil{ fmt.Println(err) } i++ } }
源碼
Pool
package Poolpkg import ( "errors" "fmt" "sync" "sync/atomic" "time" ) type sig struct{} // Pool accept the tasks from client,it limits the total // of goroutines to a given number by recycling goroutines. type Pool struct { // capacity of the pool. //capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行; //worker類型為任務類。 capacity int32 // running is the number of the currently running goroutines. //running是當前正在執行任務的worker數量 running int32 // expiryDuration set the expired time (second) of every worker. //expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運行時間與當前時間之差如果大於這個值則表示已過期,定時清理任務會清理掉這個worker; expiryDuration time.Duration // workers is a slice that store the available workers. //任務隊列 workers []*Worker // release is used to notice the pool to closed itself. //當關閉該Pool支持通知所有worker退出運行以防goroutine泄露 release chan sig // lock for synchronous operation //用以支持Pool的同步操作 lock sync.Mutex //once用在確保Pool關閉操作只會執行一次 once sync.Once } // NewPool generates a instance of ants pool func NewPool(size, expiry int) (*Pool, error) { if size <= 0 { return nil, errors.New("Pool Size <0,not Create") } p := &Pool{ capacity: int32(size), release: make(chan sig, 1), expiryDuration: time.Duration(expiry) * time.Second, running: 0, } // 啟動定期清理過期worker任務,獨立goroutine運行, // 進一步節省系統資源 p.monitorAndClear() return p, nil } // Submit submit a task to pool func (p *Pool) Submit(task functinType,str string) error { if len(p.release) > 0 { return errors.New("Pool is Close") } //創建或得到一個空閑的worker w := p.getWorker() w.run() //將任務參數通過信道傳遞給它 w.sendarg(str) //將任務通過信道傳遞給它 w.sendTask(task) return nil } // getWorker returns a available worker to run the tasks. func (p *Pool) getWorker() *Worker { var w *Worker // 標志,表示當前運行的worker數量是否已達容量上限 waiting := false // 涉及從workers隊列取可用worker,需要加鎖 p.lock.Lock() workers := p.workers n := len(workers) - 1 fmt.Println("空閑worker數量:",n+1) fmt.Println("協程池現在運行的worker數量:",p.running) // 當前worker隊列為空(無空閑worker) if n < 0 { //沒有空閑的worker有兩種可能: //1.運行的worker超出了pool容量 //2.當前是空pool,從未往pool添加任務或者一段時間內沒有任務添加,被定期清除 // 運行worker數目已達到該Pool的容量上限,置等待標志 if p.running >= p.capacity { //print("超過上限") waiting = true } else { // 當前無空閑worker但是Pool還沒有滿, // 則可以直接新開一個worker執行任務 p.running++ w = &Worker{ pool: p, task: make(chan functinType), str:make(chan string), } } // 有空閑worker,從隊列尾部取出一個使用 } else { //<-p.freeSignal w = workers[n] workers[n] = nil p.workers = workers[:n] p.running++ } // 判斷是否有worker可用結束,解鎖 p.lock.Unlock() if waiting { //當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行: // 阻塞等待直到有空閑worker for len(p.workers) == 0{ continue } p.lock.Lock() workers = p.workers l := len(workers) - 1 w = workers[l] workers[l] = nil p.workers = workers[:l] p.running++ p.lock.Unlock() } return w } //定期清理過期Worker func (p *Pool) monitorAndClear() { go func() { for { // 周期性循環檢查過期worker並清理 time.Sleep(p.expiryDuration) currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers n := 0 for i, w := range idleWorkers { // 計算當前時間減去該worker的最后運行時間之差是否符合過期時長 if currentTime.Sub(w.recycleTime) <= p.expiryDuration { break } n = i w.stop() idleWorkers[i] = nil p.running-- } if n > 0 { n++ p.workers = idleWorkers[n:] } p.lock.Unlock() } }() } //Worker回收(goroutine復用) // putWorker puts a worker back into free pool, recycling the goroutines. func (p *Pool) putWorker(worker *Worker) { // 寫入回收時間,亦即該worker的最后運行時間 worker.recycleTime = time.Now() p.lock.Lock() p.running -- p.workers = append(p.workers, worker) p.lock.Unlock() } //動態擴容或者縮小池容量 // ReSize change the capacity of this pool func (p *Pool) ReSize(size int) { cap := int(p.capacity) if size < cap{ diff := cap - size for i := 0; i < diff; i++ { p.getWorker().stop() } } else if size == cap { return } atomic.StoreInt32(&p.capacity, int32(size)) }
Woker
package Poolpkg import ( "sync/atomic" "time" ) type functinType func(string) error // Worker is the actual executor who runs the tasks, // it starts a goroutine that accepts tasks and // performs function calls. type Worker struct { // pool who owns this worker. pool *Pool // task is a job should be done. task chan functinType // recycleTime will be update when putting a worker back into queue. recycleTime time.Time str chan string } // run starts a goroutine to repeat the process // that performs the function calls. func (w *Worker) run() { go func() { //監聽任務列表,一旦有任務立馬取出運行 count := 1 var str string var f functinType for count <=2{ select { case str_temp, ok := <- w.str: if !ok { return } count ++ str = str_temp case f_temp, ok := <-w.task: if !ok { //如果接收到關閉 atomic.AddInt32(&w.pool.running, -1) close(w.task) return } count ++ f = f_temp } } err := f(str) if err != nil{ //fmt.Println("執行任務失敗") } //回收復用 w.pool.putWorker(w) return }() } // stop this worker. func (w *Worker) stop() { w.sendTask(nil) close(w.str) } // sendTask sends a task to this worker. func (w *Worker) sendTask(task functinType) { w.task <- task } func (w *Worker) sendarg(str string) { w.str <- str }