Goroutine Pool架构
超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢。
而实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

Pool类型
type Pool struct {
// capacity of the pool.
//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
//worker类型为任务类。
capacity int32
// running is the number of the currently running goroutines.
//running是当前正在执行任务的worker数量
running int32
// expiryDuration set the expired time (second) of every worker.
//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
expiryDuration time.Duration
// workers is a slice that store the available workers.
//任务队列
workers []*Worker
// release is used to notice the pool to closed itself.
//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
release chan sig
// lock for synchronous operation
//用以支持Pool的同步操作
lock sync.Mutex
//once用在确保Pool关闭操作只会执行一次
once sync.Once
}
初始化Pool
// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Pool Size <0,not Create")
}
p := &Pool{
capacity: int32(size),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
running: 0,
}
// 启动定期清理过期worker任务,独立goroutine运行,
// 进一步节省系统资源
p.monitorAndClear()
return p, nil
}
获取Worker
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
// 标志,表示当前运行的worker数量是否已达容量上限
waiting := false
// 涉及从workers队列取可用worker,需要加锁
p.lock.Lock()
workers := p.workers
n := len(workers) - 1
fmt.Println("空闲worker数量:",n+1)
fmt.Println("协程池现在运行的worker数量:",p.running)
// 当前worker队列为空(无空闲worker)
if n < 0 {
//没有空闲的worker有两种可能:
//1.运行的worker超出了pool容量
//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
// 运行worker数目已达到该Pool的容量上限,置等待标志
if p.running >= p.capacity {
//print("超过上限")
waiting = true
} else {
// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
p.running++
w = &Worker{
pool: p,
task: make(chan functinType),
str:make(chan string),
}
}
// 有空闲worker,从队列尾部取出一个使用
} else {
//<-p.freeSignal
w = workers[n]
workers[n] = nil
p.workers = workers[:n]
p.running++
}
// 判断是否有worker可用结束,解锁
p.lock.Unlock()
if waiting {
//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
// 阻塞等待直到有空闲worker
for len(p.workers) == 0{
continue
}
p.lock.Lock()
workers = p.workers
l := len(workers) - 1
w = workers[l]
workers[l] = nil
p.workers = workers[:l]
p.running++
p.lock.Unlock()
}
return w
}
定期清理过期Worker
func (p *Pool) monitorAndClear() {
go func() {
for {
// 周期性循环检查过期worker并清理
time.Sleep(p.expiryDuration)
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := 0
for i, w := range idleWorkers {
// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.stop()
idleWorkers[i] = nil
}
if n > 0 {
n++
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}()
}
复用Worker
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
// 写入回收时间,亦即该worker的最后运行时间
worker.recycleTime = time.Now()
p.lock.Lock()
p.running --
p.workers = append(p.workers, worker)
p.lock.Unlock()
}
动态扩容或者缩小容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
cap := int(p.capacity)
if size < cap{
diff := cap - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
} else if size == cap {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
}
提交Worker
// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
if len(p.release) > 0 {
return errors.New("Pool is Close")
}
//创建或得到一个空闲的worker
w := p.getWorker()
w.run()
//将任务参数通过信道传递给它
w.sendarg(str)
//将任务通过信道传递给它
w.sendTask(task)
return nil
}
Worker类
package Poolpkg
import (
"sync/atomic"
"time"
)
type functinType func(string) error
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan functinType
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
str chan string
}
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
go func() {
//监听任务列表,一旦有任务立马取出运行
count := 1
var str string
var f functinType
for count <=2{
select {
case str_temp, ok := <- w.str:
if !ok {
return
}
count ++
str = str_temp
case f_temp, ok := <-w.task:
if !ok {
//如果接收到关闭
atomic.AddInt32(&w.pool.running, -1)
close(w.task)
return
}
count ++
f = f_temp
}
}
err := f(str)
if err != nil{
//fmt.Println("执行任务失败")
}
//回收复用
w.pool.putWorker(w)
return
}()
}
// stop this worker.
func (w *Worker) stop() {
w.sendTask(nil)
close(w.str)
}
// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
w.task <- task
}
func (w *Worker) sendarg(str string) {
w.str <- str
}
总结和实践
怎么理解Woreker,task、Pool的关系
Woker类型其实就是task的载体,Worker类型有两个很重要的参数:
task chan functinType:用来是传递task。
str chan string:用来传递task所需的参数。
task是任务本身,它一般为一个函数,在程序中被定义为函数类型:
type functinType func(string) error
Pool存储Worker,当用户要执行一个task时,首先要得到一个Worker,必须从池中获取,获取到一个Worker后,就开启一个协程去处理,在这个协程中接收任务task和参数。
//创建或得到一个空闲的worker w := p.getWorker()
//开协程去处理 w.run() //将任务参数通过信道传递给它 w.sendarg(str) //将任务通过信道传递给它 w.sendTask(task)
Worker怎么接收task和参数
count定义接收数据的个数,一个Woker必须接收到task和参数才能开始工作。
工作完后这个Worker被返回到Pool中,下次还可以复用这个Worker,也就是复用Worker这个实例。
go func() {
//监听任务列表,一旦有任务立马取出运行
count := 1
var str string
var f functinType
for count <=2{
select {
case str_temp, ok := <- w.str:
if !ok {
return
}
count ++
str = str_temp
case f_temp, ok := <-w.task:
if !ok {
//如果接收到关闭
atomic.AddInt32(&w.pool.running, -1)
close(w.task)
return
}
count ++
f = f_temp
}
}
err := f(str)
if err != nil{
//fmt.Println("执行任务失败")
}
//回收复用
w.pool.putWorker(w)
return
}()
Pool怎么处理用户提交task获取Worker的请求
1.先得到Pool池中空闲Worker的数量,然后判断
2.如果小于零,则表示池中没有空闲的Worker,这里有两种原因:
- 1.运行的Worker数量超过了Pool容量,当用户获取Worker的请求数量激增,池中大多数Worker都是执行完任务的Worker重新添加到池中的,返回的Worker跟不上激增的需求。
- 2.当前是空pool,从未往pool添加任务或者一段时间内没有Worker任务运行,被定期清除。
3.如果大于或者等于零,有空闲的Worker直接从池中获取最后一个Worker。
4.如果是第二种的第一种情况,则阻塞等待池中有空闲的Worker。
if waiting {
//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
// 阻塞等待直到有空闲worker
for len(p.workers) == 0{
continue
}
p.lock.Lock()
workers = p.workers
l := len(workers) - 1
w = workers[l]
workers[l] = nil
p.workers = workers[:l]
p.running++
p.lock.Unlock()
}
5.如果是第二种的第二种情况,直接创建一个Worker实例。
// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
p.running++
w = &Worker{
pool: p,
task: make(chan functinType),
str:make(chan string),
}
测试
package main
import (
"Pool/Poolpkg"
"fmt"
)
func main(){
//开20个大小的Pool池,过期清除时间5分钟
Pool,err := Poolpkg.NewPool(20,5)
i :=0
for i < 50 {
err = Pool.Submit(Print_Test1,"并发测试!")
if err != nil{
fmt.Println(err)
}
i++
}
}


源码
Pool
package Poolpkg
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
type sig struct{}
// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
// capacity of the pool.
//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
//worker类型为任务类。
capacity int32
// running is the number of the currently running goroutines.
//running是当前正在执行任务的worker数量
running int32
// expiryDuration set the expired time (second) of every worker.
//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
expiryDuration time.Duration
// workers is a slice that store the available workers.
//任务队列
workers []*Worker
// release is used to notice the pool to closed itself.
//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
release chan sig
// lock for synchronous operation
//用以支持Pool的同步操作
lock sync.Mutex
//once用在确保Pool关闭操作只会执行一次
once sync.Once
}
// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
if size <= 0 {
return nil, errors.New("Pool Size <0,not Create")
}
p := &Pool{
capacity: int32(size),
release: make(chan sig, 1),
expiryDuration: time.Duration(expiry) * time.Second,
running: 0,
}
// 启动定期清理过期worker任务,独立goroutine运行,
// 进一步节省系统资源
p.monitorAndClear()
return p, nil
}
// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
if len(p.release) > 0 {
return errors.New("Pool is Close")
}
//创建或得到一个空闲的worker
w := p.getWorker()
w.run()
//将任务参数通过信道传递给它
w.sendarg(str)
//将任务通过信道传递给它
w.sendTask(task)
return nil
}
// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
var w *Worker
// 标志,表示当前运行的worker数量是否已达容量上限
waiting := false
// 涉及从workers队列取可用worker,需要加锁
p.lock.Lock()
workers := p.workers
n := len(workers) - 1
fmt.Println("空闲worker数量:",n+1)
fmt.Println("协程池现在运行的worker数量:",p.running)
// 当前worker队列为空(无空闲worker)
if n < 0 {
//没有空闲的worker有两种可能:
//1.运行的worker超出了pool容量
//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
// 运行worker数目已达到该Pool的容量上限,置等待标志
if p.running >= p.capacity {
//print("超过上限")
waiting = true
} else {
// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
p.running++
w = &Worker{
pool: p,
task: make(chan functinType),
str:make(chan string),
}
}
// 有空闲worker,从队列尾部取出一个使用
} else {
//<-p.freeSignal
w = workers[n]
workers[n] = nil
p.workers = workers[:n]
p.running++
}
// 判断是否有worker可用结束,解锁
p.lock.Unlock()
if waiting {
//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
// 阻塞等待直到有空闲worker
for len(p.workers) == 0{
continue
}
p.lock.Lock()
workers = p.workers
l := len(workers) - 1
w = workers[l]
workers[l] = nil
p.workers = workers[:l]
p.running++
p.lock.Unlock()
}
return w
}
//定期清理过期Worker
func (p *Pool) monitorAndClear() {
go func() {
for {
// 周期性循环检查过期worker并清理
time.Sleep(p.expiryDuration)
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
n := 0
for i, w := range idleWorkers {
// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
break
}
n = i
w.stop()
idleWorkers[i] = nil
p.running--
}
if n > 0 {
n++
p.workers = idleWorkers[n:]
}
p.lock.Unlock()
}
}()
}
//Worker回收(goroutine复用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
// 写入回收时间,亦即该worker的最后运行时间
worker.recycleTime = time.Now()
p.lock.Lock()
p.running --
p.workers = append(p.workers, worker)
p.lock.Unlock()
}
//动态扩容或者缩小池容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
cap := int(p.capacity)
if size < cap{
diff := cap - size
for i := 0; i < diff; i++ {
p.getWorker().stop()
}
} else if size == cap {
return
}
atomic.StoreInt32(&p.capacity, int32(size))
}
Woker
package Poolpkg
import (
"sync/atomic"
"time"
)
type functinType func(string) error
// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
// pool who owns this worker.
pool *Pool
// task is a job should be done.
task chan functinType
// recycleTime will be update when putting a worker back into queue.
recycleTime time.Time
str chan string
}
// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {
go func() {
//监听任务列表,一旦有任务立马取出运行
count := 1
var str string
var f functinType
for count <=2{
select {
case str_temp, ok := <- w.str:
if !ok {
return
}
count ++
str = str_temp
case f_temp, ok := <-w.task:
if !ok {
//如果接收到关闭
atomic.AddInt32(&w.pool.running, -1)
close(w.task)
return
}
count ++
f = f_temp
}
}
err := f(str)
if err != nil{
//fmt.Println("执行任务失败")
}
//回收复用
w.pool.putWorker(w)
return
}()
}
// stop this worker.
func (w *Worker) stop() {
w.sendTask(nil)
close(w.str)
}
// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
w.task <- task
}
func (w *Worker) sendarg(str string) {
w.str <- str
}
