golang實現協程池(GoroutinePool)並控制池中的協程大小


轉https://blog.csdn.net/boyhandsome7/article/details/80284880

package pool
 
import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "strconv"
    "syscall"
)
 
//參考模型:工廠流水線->流水線員工->待加工產品
type Payload struct {
    Name string
}
 
func (p *Payload) Play() {
    log.Printf("%s加工完成。\n", p.Name)
}
 
//任務
type Job struct {
    Payload Payload
}
 
type Worker struct {
    WorkerId        string        //員工ID
    Workbench       chan Job      //員工加工產品的工作台,即來即走(無緩沖)。
    GWorkbenchQueue chan chan Job //等待分配加工產品的員工工作台隊列
    Finished        chan bool     //員工結束工作通知通道,無緩沖
}
 
// 新建一條工廠流水線
func NewWorker(WorkbenchQueue chan chan Job, Id string) *Worker {
    log.Printf("新建流水線:%s \n", Id)
    return &Worker{
        WorkerId:        Id,              //員工ID
        Workbench:       make(chan Job),  //員工加工產品的工作台,即來即走(無緩沖)。
        GWorkbenchQueue: WorkbenchQueue,  //等待分配加工產品的員工工作台隊列
        Finished:        make(chan bool), //無緩沖
    }
}
 
// 工人開始工作
func (w *Worker) Start() {
    //開一個新的協程
    go func() {
        for {
            //將當前未分配待加工產品的工作台添加到工作台隊列中
            w.GWorkbenchQueue <- w.Workbench
            log.Printf("把[%s]的工作台添加到工作台隊列中,當前工作台隊列長度:%d\n", w.WorkerId, len(w.GWorkbenchQueue))
            select {
            //接收到了新的WorkerJob
            case wJob := <-w.Workbench:
                wJob.Payload.Play()
            case bFinished := <-w.Finished:
                if true == bFinished {
                    log.Printf("%s 結束工作!\n", w.WorkerId)
                    return
                }
            }
        }
    }()
}
 
func (w *Worker) Stop() {
    //w.QuitChannel <- true
    go func() {
        w.Finished <- true
    }()
}
 
type Dispatcher struct {
    DispatcherId    string         //流水線ID
    MaxWorkers      int            //流水線上的員工(Worker)最大數量
    Workers         []*Worker      //流水線上所有員工(Worker)對象集合
    Closed          chan bool      //流水線工作狀態通道
    EndDispatch     chan os.Signal //流水線停止工作信號
    GJobQueue       chan Job       //流水線上的所有代加工產品(Job)隊列通道
    GWorkbenchQueue chan chan Job  //流水線上的所有操作台隊列通道
}
 
func NewDispatcher(maxWorkers, maxQueue int) *Dispatcher {
    Closed := make(chan bool)
    EndDispatch := make(chan os.Signal)
    JobQueue := make(chan Job, maxQueue)
    WorkbenchQueue := make(chan chan Job, maxWorkers)
    signal.Notify(EndDispatch, syscall.SIGINT, syscall.SIGTERM)
    return &Dispatcher{
        DispatcherId:    "調度者",
        MaxWorkers:      maxWorkers,
        Closed:          Closed,
        EndDispatch:     EndDispatch,
        GJobQueue:       JobQueue,
        GWorkbenchQueue: WorkbenchQueue,
    }
}
 
func (d *Dispatcher) Run() {
    // 開始運行
    for i := 0; i < d.MaxWorkers; i++ {
        worker := NewWorker(d.GWorkbenchQueue, fmt.Sprintf("work-%s", strconv.Itoa(i)))
        d.Workers = append(d.Workers, worker)
        //開始工作
        worker.Start()
    }
    //監控
    go d.Dispatch()
}
 
func (d *Dispatcher) Dispatch() {
FLAG:
    for {
        select {
        case endDispatch := <-d.EndDispatch:
            log.Printf("流水線關閉命令[%v]已發出...\n", endDispatch)
            close(d.GJobQueue)
        case wJob, Ok := <-d.GJobQueue:
            if true == Ok {
                log.Println("從流水線獲取一個待加工產品(Job)")
                go func(wJob Job) {
                    //獲取未分配待加工產品的工作台
                    Workbench := <-d.GWorkbenchQueue
                    //將待加工產品(Job)放入工作台進行加工
                    Workbench <- wJob
                }(wJob)
            } else {
                for _, w := range d.Workers {
                    w.Stop()
                }
                d.Closed <- true
                break FLAG
            }
        }
    }
}
 
type WorkFlow struct {
    GDispatch *Dispatcher
}
 
func (wf *WorkFlow) StartWorkFlow(maxWorkers, maxQueue int) {
    //初始化一個調度器(流水線),並指定該流水線上的員工(Worker)和待加工產品(Job)的最大數量
    wf.GDispatch = NewDispatcher(maxWorkers, maxQueue)
    //啟動流水線
    wf.GDispatch.Run()
}
 
func (wf *WorkFlow) AddJob(wJob Job) {
    //向流水線中放入待加工產品(Job)
    wf.GDispatch.GJobQueue <- wJob
}
 
func (wf *WorkFlow) CloseWorkFlow() {
    closed := <-wf.GDispatch.Closed
    if true == closed {
        log.Println("調度器(流水線)已關閉.")
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM