golang 固定worker 工作池


服務器編程使用最多的就是通過線程池提升服務的並發執行能力,在go語言中,一樣可以輕松的構建固定數目的goroutine作為線程池,下面通過計算多個整數的和來說明這種並發范式。

設計思路:

 除了主要的main goroutine ,還需開啟以下幾類goroutine

   1,初始化任務的goroutine

   2,分發任務的goroutine

   3,等到所有work結束,然后關閉所有通道的goroutine

main主要負責拉起以上的goroutine 冰火取結果

 程序還需要三個通道

  1,傳遞task任務的通道

  2,傳遞task結果的通道

 3 ,接收workder處理完任務后所發送通知的通道

具體代碼如下

package go_worker

import (
    "fmt"
)

//定義工作數量
const (
    WORKS=5
)

//定義工作任務結構體,可根據需求改變
type task struct {
    begin int
    end int
    result  chan<- int
}
//定義執行任務的方法,可根據需求更改
func (t *task) do(){
    sum:=0
    for i:=t.begin;i<=t.end;i++{
        sum++
    }
    t.result<-sum
}

//入口函數

func main(){
    works:=WORKS
    //定義工作通道
    taskchan:=make(chan task,10)
    //定義結果通道
    resultchan:=make(chan int,10)
    //work工作信號通道
    done:=make(chan struct{},10)
    //初始化task的goroutine
    go initTask(taskchan,resultchan,100);
    //分發任務到協程池
    distributeTask(taskchan,works,done)
    //獲取goroutine處理完成任務通知,並關閉通道
    go closeResult(done,resultchan,works)
    //通過結果通道,獲取結果並匯總
    sum:=processResult(resultchan)
    fmt.Println("sum=",sum)
}
//初始化task chan
func initTask(taskchan chan<-task,r chan int ,p int){
    qu:=p/10
    mod:=p%10
    high:=qu*10;
    for j:=0;j<qu;j++{
        b:=10*j+1
        e:=10*(j+1)
        task:=task{
            begin:b,
            end:e,
            result:r,
        }
        taskchan<-task
    }
    if mod!=0{
        task:=task{
            begin:high+1,
            end:p,
            result:r,
        }
        taskchan<-task
    }
    close(taskchan)
}

//讀取taskchan 並分發到worker goRoutine處理,總數量為workers
func distributeTask(taskchan<-chan task,workers int ,done chan struct{}){
    for i:=0;i<workers;i++{
        go processTask(taskchan,done)
    }
}
//工作goroutine處理的具體內容,並將處理的結果發送到結果chan
func processTask(taskchan <-chan task,done chan struct{}){
    for t:=range taskchan{
        t.do()
    }
    done<- struct{}{}
}

//通過done channel同步等待所有工作goroutine的結束,然后關閉結果chan
func closeResult(done chan struct{},resultchan chan int ,workers int ){
    for i:=0;i<workers;i++{
        <-done
    }
    close(done)
    close(resultchan)
}

//讀取結果通道匯聚結果

func processResult(resultchan chan int )int{
    sum:=0
    for r:=range resultchan{
        sum+=r
    }
    return sum
}

ok ,至此,程序結束,可將task和task.do() 替換成自己的已有邏輯執行

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM