golang实现协程池(GoroutinePool)并控制池中的协程大小


转https://blog.csdn.net/boyhandsome7/article/details/80284880

package pool
 
import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "strconv"
    "syscall"
)
 
//参考模型:工厂流水线->流水线员工->待加工产品
type Payload struct {
    Name string
}
 
func (p *Payload) Play() {
    log.Printf("%s加工完成。\n", p.Name)
}
 
//任务
type Job struct {
    Payload Payload
}
 
type Worker struct {
    WorkerId        string        //员工ID
    Workbench       chan Job      //员工加工产品的工作台,即来即走(无缓冲)。
    GWorkbenchQueue chan chan Job //等待分配加工产品的员工工作台队列
    Finished        chan bool     //员工结束工作通知通道,无缓冲
}
 
// 新建一条工厂流水线
func NewWorker(WorkbenchQueue chan chan Job, Id string) *Worker {
    log.Printf("新建流水线:%s \n", Id)
    return &Worker{
        WorkerId:        Id,              //员工ID
        Workbench:       make(chan Job),  //员工加工产品的工作台,即来即走(无缓冲)。
        GWorkbenchQueue: WorkbenchQueue,  //等待分配加工产品的员工工作台队列
        Finished:        make(chan bool), //无缓冲
    }
}
 
// 工人开始工作
func (w *Worker) Start() {
    //开一个新的协程
    go func() {
        for {
            //将当前未分配待加工产品的工作台添加到工作台队列中
            w.GWorkbenchQueue <- w.Workbench
            log.Printf("把[%s]的工作台添加到工作台队列中,当前工作台队列长度:%d\n", w.WorkerId, len(w.GWorkbenchQueue))
            select {
            //接收到了新的WorkerJob
            case wJob := <-w.Workbench:
                wJob.Payload.Play()
            case bFinished := <-w.Finished:
                if true == bFinished {
                    log.Printf("%s 结束工作!\n", w.WorkerId)
                    return
                }
            }
        }
    }()
}
 
func (w *Worker) Stop() {
    //w.QuitChannel <- true
    go func() {
        w.Finished <- true
    }()
}
 
type Dispatcher struct {
    DispatcherId    string         //流水线ID
    MaxWorkers      int            //流水线上的员工(Worker)最大数量
    Workers         []*Worker      //流水线上所有员工(Worker)对象集合
    Closed          chan bool      //流水线工作状态通道
    EndDispatch     chan os.Signal //流水线停止工作信号
    GJobQueue       chan Job       //流水线上的所有代加工产品(Job)队列通道
    GWorkbenchQueue chan chan Job  //流水线上的所有操作台队列通道
}
 
func NewDispatcher(maxWorkers, maxQueue int) *Dispatcher {
    Closed := make(chan bool)
    EndDispatch := make(chan os.Signal)
    JobQueue := make(chan Job, maxQueue)
    WorkbenchQueue := make(chan chan Job, maxWorkers)
    signal.Notify(EndDispatch, syscall.SIGINT, syscall.SIGTERM)
    return &Dispatcher{
        DispatcherId:    "调度者",
        MaxWorkers:      maxWorkers,
        Closed:          Closed,
        EndDispatch:     EndDispatch,
        GJobQueue:       JobQueue,
        GWorkbenchQueue: WorkbenchQueue,
    }
}
 
func (d *Dispatcher) Run() {
    // 开始运行
    for i := 0; i < d.MaxWorkers; i++ {
        worker := NewWorker(d.GWorkbenchQueue, fmt.Sprintf("work-%s", strconv.Itoa(i)))
        d.Workers = append(d.Workers, worker)
        //开始工作
        worker.Start()
    }
    //监控
    go d.Dispatch()
}
 
func (d *Dispatcher) Dispatch() {
FLAG:
    for {
        select {
        case endDispatch := <-d.EndDispatch:
            log.Printf("流水线关闭命令[%v]已发出...\n", endDispatch)
            close(d.GJobQueue)
        case wJob, Ok := <-d.GJobQueue:
            if true == Ok {
                log.Println("从流水线获取一个待加工产品(Job)")
                go func(wJob Job) {
                    //获取未分配待加工产品的工作台
                    Workbench := <-d.GWorkbenchQueue
                    //将待加工产品(Job)放入工作台进行加工
                    Workbench <- wJob
                }(wJob)
            } else {
                for _, w := range d.Workers {
                    w.Stop()
                }
                d.Closed <- true
                break FLAG
            }
        }
    }
}
 
type WorkFlow struct {
    GDispatch *Dispatcher
}
 
func (wf *WorkFlow) StartWorkFlow(maxWorkers, maxQueue int) {
    //初始化一个调度器(流水线),并指定该流水线上的员工(Worker)和待加工产品(Job)的最大数量
    wf.GDispatch = NewDispatcher(maxWorkers, maxQueue)
    //启动流水线
    wf.GDispatch.Run()
}
 
func (wf *WorkFlow) AddJob(wJob Job) {
    //向流水线中放入待加工产品(Job)
    wf.GDispatch.GJobQueue <- wJob
}
 
func (wf *WorkFlow) CloseWorkFlow() {
    closed := <-wf.GDispatch.Closed
    if true == closed {
        log.Println("调度器(流水线)已关闭.")
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM