go 一步步實現Goroutine Pool


 

 Goroutine Pool架構

超大規模並發的場景下,不加限制的大規模的goroutine可能造成內存暴漲,給機器帶來極大的壓力,吞吐量下降和處理速度變慢。

而實現一個Goroutine Pool,復用goroutine,減輕runtime的調度壓力以及緩解內存壓力,依托這些優化,在大規模goroutine並發的場景下可以極大地提高並發性能。

 

 

 

 

Pool類型

type Pool struct {
	// capacity of the pool.
	//capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行;
	//worker類型為任務類。
	capacity int32
	// running is the number of the currently running goroutines.
	//running是當前正在執行任務的worker數量
	running int32
	// expiryDuration set the expired time (second) of every worker.
	//expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運行時間與當前時間之差如果大於這個值則表示已過期,定時清理任務會清理掉這個worker;
	expiryDuration time.Duration
	// workers is a slice that store the available workers.
	//任務隊列
	workers []*Worker
	// release is used to notice the pool to closed itself.
	//當關閉該Pool支持通知所有worker退出運行以防goroutine泄露
	release chan sig
	// lock for synchronous operation
	//用以支持Pool的同步操作
	lock sync.Mutex
	//once用在確保Pool關閉操作只會執行一次
	once sync.Once
}

初始化Pool

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Pool Size <0,not Create")
	}
	p := &Pool{
		capacity:       int32(size),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
		running:		0,
	}
	// 啟動定期清理過期worker任務,獨立goroutine運行,
	// 進一步節省系統資源
	p.monitorAndClear()
	return p, nil
}

獲取Worker

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
	var w *Worker
	// 標志,表示當前運行的worker數量是否已達容量上限
	waiting := false
	// 涉及從workers隊列取可用worker,需要加鎖
	p.lock.Lock()
	workers := p.workers
	n := len(workers) - 1
	fmt.Println("空閑worker數量:",n+1)
	fmt.Println("協程池現在運行的worker數量:",p.running)
	// 當前worker隊列為空(無空閑worker)
	if n < 0 {
		//沒有空閑的worker有兩種可能:
		//1.運行的worker超出了pool容量
		//2.當前是空pool,從未往pool添加任務或者一段時間內沒有任務添加,被定期清除
		// 運行worker數目已達到該Pool的容量上限,置等待標志
		if p.running >= p.capacity {
			//print("超過上限")
			waiting = true
		} else {
			// 當前無空閑worker但是Pool還沒有滿,
			// 則可以直接新開一個worker執行任務
			p.running++
			w = &Worker{
				pool: p,
				task: make(chan functinType),
				str:make(chan string),
			}
		}
		// 有空閑worker,從隊列尾部取出一個使用
	} else {
		//<-p.freeSignal
		w = workers[n]
		workers[n] = nil
		p.workers = workers[:n]
		p.running++
	}
	// 判斷是否有worker可用結束,解鎖
	p.lock.Unlock()
	if waiting {
		//當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行:
		// 阻塞等待直到有空閑worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}
	return w
}

定期清理過期Worker

func (p *Pool) monitorAndClear() {
	go func() {
		for {
			// 周期性循環檢查過期worker並清理
			time.Sleep(p.expiryDuration)
			currentTime := time.Now()
			p.lock.Lock()
			idleWorkers := p.workers
			n := 0
			for i, w := range idleWorkers {
				// 計算當前時間減去該worker的最后運行時間之差是否符合過期時長
				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
					break
				}
				n = i
				w.stop()
				idleWorkers[i] = nil
			}
			if n > 0 {
				n++
				p.workers = idleWorkers[n:]
			}
			p.lock.Unlock()
		}
	}()
}

復用Worker

// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
	// 寫入回收時間,亦即該worker的最后運行時間
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.running --
	p.workers = append(p.workers, worker)
	p.lock.Unlock()

}

動態擴容或者縮小容量

// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
	cap := int(p.capacity)
	if size <  cap{
		diff := cap - size
		for i := 0; i < diff; i++ {
			p.getWorker().stop()
		}
	} else if size == cap {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
} 

提交Worker

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
	if len(p.release) > 0 {
		return errors.New("Pool is Close")
	}
	//創建或得到一個空閑的worker
	w := p.getWorker()
	w.run()
	//將任務參數通過信道傳遞給它
	w.sendarg(str)
	//將任務通過信道傳遞給它
	w.sendTask(task)
	return nil
}

  

Worker類

package Poolpkg

import (
	"sync/atomic"
	"time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
	// pool who owns this worker.
	pool *Pool
	// task is a job should be done.
	task chan functinType
	// recycleTime will be update when putting a worker back into queue.
	recycleTime time.Time

	str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

	go func() {
		//監聽任務列表,一旦有任務立馬取出運行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到關閉
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("執行任務失敗")
		}
		//回收復用
		w.pool.putWorker(w)
		return
	}()
}

// stop this worker.
func (w *Worker) stop() {
	w.sendTask(nil)
	close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
	w.task <- task
}

func (w *Worker) sendarg(str string) {
	w.str <- str
}

  

總結和實踐

怎么理解Woreker,task、Pool的關系

Woker類型其實就是task的載體,Worker類型有兩個很重要的參數:

task chan functinType:用來是傳遞task。
str chan string:用來傳遞task所需的參數。

task是任務本身,它一般為一個函數,在程序中被定義為函數類型:

type functinType func(string) error

Pool存儲Worker,當用戶要執行一個task時,首先要得到一個Worker,必須從池中獲取,獲取到一個Worker后,就開啟一個協程去處理,在這個協程中接收任務task和參數。

//創建或得到一個空閑的worker
w := p.getWorker()
//開協程去處理 w.run() //將任務參數通過信道傳遞給它 w.sendarg(str) //將任務通過信道傳遞給它 w.sendTask(task)

Worker怎么接收task和參數

count定義接收數據的個數,一個Woker必須接收到task和參數才能開始工作。
工作完后這個Worker被返回到Pool中,下次還可以復用這個Worker,也就是復用Worker這個實例。
go func() {
		//監聽任務列表,一旦有任務立馬取出運行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到關閉
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("執行任務失敗")
		}
		//回收復用
		w.pool.putWorker(w)
		return
	}()

Pool怎么處理用戶提交task獲取Worker的請求

1.先得到Pool池中空閑Worker的數量,然后判斷

2.如果小於零,則表示池中沒有空閑的Worker,這里有兩種原因:

  • 1.運行的Worker數量超過了Pool容量,當用戶獲取Worker的請求數量激增,池中大多數Worker都是執行完任務的Worker重新添加到池中的,返回的Worker跟不上激增的需求。
  • 2.當前是空pool,從未往pool添加任務或者一段時間內沒有Worker任務運行,被定期清除。

3.如果大於或者等於零,有空閑的Worker直接從池中獲取最后一個Worker。

4.如果是第二種的第一種情況,則阻塞等待池中有空閑的Worker。

if waiting {
		//當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行:
		// 阻塞等待直到有空閑worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}

5.如果是第二種的第二種情況,直接創建一個Worker實例。

// 當前無空閑worker但是Pool還沒有滿,
// 則可以直接新開一個worker執行任務
p.running++
w = &Worker{
	pool: p,
	task: make(chan functinType),
	str:make(chan string),
}

測試

package main

import (
	"Pool/Poolpkg"
	"fmt"
)

func main(){
     //開20個大小的Pool池,過期清除時間5分鍾 Pool,err := Poolpkg.NewPool(20,5) i :=0 for i < 50 { err = Pool.Submit(Print_Test1,"並發測試!") if err != nil{ fmt.Println(err) } i++ } }

 

 

 

 

源碼

Pool

package Poolpkg

import (
	"errors"
	"fmt"
	"sync"
	"sync/atomic"
	"time"
)

type sig struct{}



// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
	// capacity of the pool.
	//capacity是該Pool的容量,也就是開啟worker數量的上限,每一個worker需要一個goroutine去執行;
	//worker類型為任務類。
	capacity int32
	// running is the number of the currently running goroutines.
	//running是當前正在執行任務的worker數量
	running int32
	// expiryDuration set the expired time (second) of every worker.
	//expiryDuration是worker的過期時長,在空閑隊列中的worker的最新一次運行時間與當前時間之差如果大於這個值則表示已過期,定時清理任務會清理掉這個worker;
	expiryDuration time.Duration
	// workers is a slice that store the available workers.
	//任務隊列
	workers []*Worker
	// release is used to notice the pool to closed itself.
	//當關閉該Pool支持通知所有worker退出運行以防goroutine泄露
	release chan sig
	// lock for synchronous operation
	//用以支持Pool的同步操作
	lock sync.Mutex
	//once用在確保Pool關閉操作只會執行一次
	once sync.Once
}

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Pool Size <0,not Create")
	}
	p := &Pool{
		capacity:       int32(size),
		release:        make(chan sig, 1),
		expiryDuration: time.Duration(expiry) * time.Second,
		running:		0,
	}
	// 啟動定期清理過期worker任務,獨立goroutine運行,
	// 進一步節省系統資源
	p.monitorAndClear()
	return p, nil
}

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
	if len(p.release) > 0 {
		return errors.New("Pool is Close")
	}
	//創建或得到一個空閑的worker
	w := p.getWorker()
	w.run()
	//將任務參數通過信道傳遞給它
	w.sendarg(str)
	//將任務通過信道傳遞給它
	w.sendTask(task)
	return nil
}

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
	var w *Worker
	// 標志,表示當前運行的worker數量是否已達容量上限
	waiting := false
	// 涉及從workers隊列取可用worker,需要加鎖
	p.lock.Lock()
	workers := p.workers
	n := len(workers) - 1
	fmt.Println("空閑worker數量:",n+1)
	fmt.Println("協程池現在運行的worker數量:",p.running)
	// 當前worker隊列為空(無空閑worker)
	if n < 0 {
		//沒有空閑的worker有兩種可能:
		//1.運行的worker超出了pool容量
		//2.當前是空pool,從未往pool添加任務或者一段時間內沒有任務添加,被定期清除
		// 運行worker數目已達到該Pool的容量上限,置等待標志
		if p.running >= p.capacity {
			//print("超過上限")
			waiting = true
		} else {
			// 當前無空閑worker但是Pool還沒有滿,
			// 則可以直接新開一個worker執行任務
			p.running++
			w = &Worker{
				pool: p,
				task: make(chan functinType),
				str:make(chan string),
			}
		}
		// 有空閑worker,從隊列尾部取出一個使用
	} else {
		//<-p.freeSignal
		w = workers[n]
		workers[n] = nil
		p.workers = workers[:n]
		p.running++
	}
	// 判斷是否有worker可用結束,解鎖
	p.lock.Unlock()
	if waiting {
		//當一個任務執行完以后會添加到池中,有了空閑的任務就可以繼續執行:
		// 阻塞等待直到有空閑worker
		for len(p.workers) == 0{
			continue
		}
		p.lock.Lock()
		workers = p.workers
		l := len(workers) - 1
		w = workers[l]
		workers[l] = nil
		p.workers = workers[:l]
		p.running++
		p.lock.Unlock()
	}
	return w
}

//定期清理過期Worker
func (p *Pool) monitorAndClear() {
	go func() {
		for {
			// 周期性循環檢查過期worker並清理
			time.Sleep(p.expiryDuration)
			currentTime := time.Now()
			p.lock.Lock()
			idleWorkers := p.workers
			n := 0
			for i, w := range idleWorkers {
				// 計算當前時間減去該worker的最后運行時間之差是否符合過期時長
				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
					break
				}
				n = i
				w.stop()
				idleWorkers[i] = nil
				p.running--
			}
			if n > 0 {
				n++
				p.workers = idleWorkers[n:]
			}
			p.lock.Unlock()
		}
	}()
}

//Worker回收(goroutine復用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
	// 寫入回收時間,亦即該worker的最后運行時間
	worker.recycleTime = time.Now()
	p.lock.Lock()
	p.running --
	p.workers = append(p.workers, worker)
	p.lock.Unlock()

}

//動態擴容或者縮小池容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
	cap := int(p.capacity)
	if size <  cap{
		diff := cap - size
		for i := 0; i < diff; i++ {
			p.getWorker().stop()
		}
	} else if size == cap {
		return
	}
	atomic.StoreInt32(&p.capacity, int32(size))
}

Woker

package Poolpkg

import (
	"sync/atomic"
	"time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
	// pool who owns this worker.
	pool *Pool
	// task is a job should be done.
	task chan functinType
	// recycleTime will be update when putting a worker back into queue.
	recycleTime time.Time

	str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

	go func() {
		//監聽任務列表,一旦有任務立馬取出運行
		count := 1
		var str string
		var f functinType
		for count <=2{
			select {
			case str_temp, ok := <- w.str:
				if !ok {
					return
				}
				count ++
				str = str_temp
			case f_temp, ok := <-w.task:
				if !ok {
					//如果接收到關閉
					atomic.AddInt32(&w.pool.running, -1)
					close(w.task)
					return
				}
				count  ++
				f = f_temp
			}
		}
		err := f(str)
		if err != nil{
			//fmt.Println("執行任務失敗")
		}
		//回收復用
		w.pool.putWorker(w)
		return
	}()
}

// stop this worker.
func (w *Worker) stop() {
	w.sendTask(nil)
	close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
	w.task <- task
}

func (w *Worker) sendarg(str string) {
	w.str <- str
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM