轉https://blog.csdn.net/boyhandsome7/article/details/80284880
package pool import ( "fmt" "log" "os" "os/signal" "strconv" "syscall" ) //參考模型:工廠流水線->流水線員工->待加工產品 type Payload struct { Name string } func (p *Payload) Play() { log.Printf("%s加工完成。\n", p.Name) } //任務 type Job struct { Payload Payload } type Worker struct { WorkerId string //員工ID Workbench chan Job //員工加工產品的工作台,即來即走(無緩沖)。 GWorkbenchQueue chan chan Job //等待分配加工產品的員工工作台隊列 Finished chan bool //員工結束工作通知通道,無緩沖 } // 新建一條工廠流水線 func NewWorker(WorkbenchQueue chan chan Job, Id string) *Worker { log.Printf("新建流水線:%s \n", Id) return &Worker{ WorkerId: Id, //員工ID Workbench: make(chan Job), //員工加工產品的工作台,即來即走(無緩沖)。 GWorkbenchQueue: WorkbenchQueue, //等待分配加工產品的員工工作台隊列 Finished: make(chan bool), //無緩沖 } } // 工人開始工作 func (w *Worker) Start() { //開一個新的協程 go func() { for { //將當前未分配待加工產品的工作台添加到工作台隊列中 w.GWorkbenchQueue <- w.Workbench log.Printf("把[%s]的工作台添加到工作台隊列中,當前工作台隊列長度:%d\n", w.WorkerId, len(w.GWorkbenchQueue)) select { //接收到了新的WorkerJob case wJob := <-w.Workbench: wJob.Payload.Play() case bFinished := <-w.Finished: if true == bFinished { log.Printf("%s 結束工作!\n", w.WorkerId) return } } } }() } func (w *Worker) Stop() { //w.QuitChannel <- true go func() { w.Finished <- true }() } type Dispatcher struct { DispatcherId string //流水線ID MaxWorkers int //流水線上的員工(Worker)最大數量 Workers []*Worker //流水線上所有員工(Worker)對象集合 Closed chan bool //流水線工作狀態通道 EndDispatch chan os.Signal //流水線停止工作信號 GJobQueue chan Job //流水線上的所有代加工產品(Job)隊列通道 GWorkbenchQueue chan chan Job //流水線上的所有操作台隊列通道 } func NewDispatcher(maxWorkers, maxQueue int) *Dispatcher { Closed := make(chan bool) EndDispatch := make(chan os.Signal) JobQueue := make(chan Job, maxQueue) WorkbenchQueue := make(chan chan Job, maxWorkers) signal.Notify(EndDispatch, syscall.SIGINT, syscall.SIGTERM) return &Dispatcher{ DispatcherId: "調度者", MaxWorkers: maxWorkers, Closed: Closed, EndDispatch: EndDispatch, GJobQueue: JobQueue, GWorkbenchQueue: WorkbenchQueue, } } func (d *Dispatcher) Run() { // 開始運行 for i := 0; i < d.MaxWorkers; i++ { worker := NewWorker(d.GWorkbenchQueue, fmt.Sprintf("work-%s", strconv.Itoa(i))) d.Workers = append(d.Workers, worker) //開始工作 worker.Start() } //監控 go d.Dispatch() } func (d *Dispatcher) Dispatch() { FLAG: for { select { case endDispatch := <-d.EndDispatch: log.Printf("流水線關閉命令[%v]已發出...\n", endDispatch) close(d.GJobQueue) case wJob, Ok := <-d.GJobQueue: if true == Ok { log.Println("從流水線獲取一個待加工產品(Job)") go func(wJob Job) { //獲取未分配待加工產品的工作台 Workbench := <-d.GWorkbenchQueue //將待加工產品(Job)放入工作台進行加工 Workbench <- wJob }(wJob) } else { for _, w := range d.Workers { w.Stop() } d.Closed <- true break FLAG } } } } type WorkFlow struct { GDispatch *Dispatcher } func (wf *WorkFlow) StartWorkFlow(maxWorkers, maxQueue int) { //初始化一個調度器(流水線),並指定該流水線上的員工(Worker)和待加工產品(Job)的最大數量 wf.GDispatch = NewDispatcher(maxWorkers, maxQueue) //啟動流水線 wf.GDispatch.Run() } func (wf *WorkFlow) AddJob(wJob Job) { //向流水線中放入待加工產品(Job) wf.GDispatch.GJobQueue <- wJob } func (wf *WorkFlow) CloseWorkFlow() { closed := <-wf.GDispatch.Closed if true == closed { log.Println("調度器(流水線)已關閉.") } }