工作池與消息隊列框架
Woker Pool:工作池中有固定數量的協程,每一個協程對應一個消息任務隊列。
消息任務隊列:消息任務隊列,本質就是go中的緩沖信道,任務在緩沖信道中傳輸,等待被處理。
TaskQueue:消息任務隊列的集合,本質就是
client Handler Reader:在這里假設為客戶端請求的處理方法,將請求對象或者任務傳到某一個消息任務隊列。
clent Handler Writer:客戶端返回響應的執行程序。
簡單工作池與消息隊列的實現
package main import ( "fmt" "time" ) //任務:大致思路,所有任務類都必須實現Task接口,所以用戶需要創建task類 type ITask interface { write() reader() } type task struct { c string } func (t *task)write(){ fmt.Println("寫入字符",t.c) } func (t *task)reader(){ fmt.Println("接收字符",t.c) } func Newtask(c string)*task { return &task{ c: c, } } //限制 //1.worker工作池的任務隊列的最大值 //2.任務隊列中任務的最大數量 //協程池 type WorkerPool struct { cap int//工作池中協程的數量 tasksSize int//任務隊列中最大任務的容量 TaskQueue []chan ITask //信道集合 } //啟動一個worker工作池,開啟工作池只能發生一次 func (W *WorkerPool)StartWorkPool(){ //根據任務隊列的大小,分別開啟worker,每個worker用go來承載,每一個worker對應一個任務隊列 for i:=0;i<W.cap;i++{ //為每個worker開辟緩沖信道(任務隊列) W.TaskQueue[i] = make(chan ITask,W.tasksSize) //啟動worker,阻塞等待任務從channel中到來 go W.StartOneWorker(i,W.TaskQueue[i]) } } func (W *WorkerPool)StartOneWorker(id int,taskqueue chan ITask){ for{ select { case request :=<- taskqueue: //如果有消息過來,則處理業務 request.write() request.reader() default: continue } } } func (W *WorkerPool)Put(task ITask){ W.TaskQueue[0] <- task } func New(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan ITask,cap), } } func main(){ b := make(chan bool) Pool := New(1,10) //創建工作池,池中只有一個協程,每個協程對應最大任務數為10個 go Pool.StartWorkPool() task1 :=Newtask("hello1") //task2 :=Newtask("hello2") //task3 :=Newtask("hello3") //task4 :=Newtask("hello4") time.Sleep(time.Second) //第一種方法測試 //Pool.Put(task1) //Pool.Put(task2) //Pool.Put(task3) //Pool.Put(task4) //第二種方式測試 go func(){ for{ Pool.Put(task1) time.Sleep(time.Second) } }() <-b }
執行結果:
寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1 寫入字符 hello1 接收字符 hello1
。。。。。。
Tcp服務器使用工作池與消息隊列
初始化
初始化工作池對象,cap消息隊列數量,前面說了,消息隊列與work數量一致,也限制了工作池中協程的數量,tasksSize為每一個消息隊列的最大容量,TaskQueue:make([]chan IConnection,cap)創建了消息隊列的集合。
func NewWorkerPool(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan IConnection,cap), } }
啟動工作池與消息隊列
每一個任務隊列對應一個協程,在這里我們為每一個worker開辟了任務隊列。
//啟動一個worker工作池,開啟工作池只能發生一次 func (W *WorkerPool)StartWorkPool(){ //根據任務隊列的大小,分別開啟worker,每個worker用go來承載,每一個worker對應一個任務隊列 for i:=0;i<W.cap;i++{ //為每個worker開辟緩沖信道(任務隊列) W.TaskQueue[i] = make(chan IConnection,W.tasksSize) //啟動worker,阻塞等待任務從channel中到來 go W.StartOneWorker(i,W.TaskQueue[i]) } }
等待任務執行
func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){ for{ select { case request :=<- taskqueue: //如果有消息過來,則處理業務 request.Start() default: continue } } }
源碼
package znet //限制 //1.worker工作池的任務隊列的最大值 //2.任務隊列中任務的最大數量 //協程池 type WorkerPool struct { cap int tasksSize int TaskQueue []chan IConnection //信道集合 } //啟動一個worker工作池,開啟工作池只能發生一次 func (W *WorkerPool)StartWorkPool(){ //根據任務隊列的大小,分別開啟worker,每個worker用go來承載,每一個worker對應一個任務隊列 for i:=0;i<W.cap;i++{ //為每個worker開辟緩沖信道(任務隊列) W.TaskQueue[i] = make(chan IConnection,W.tasksSize) //啟動worker,阻塞等待任務從channel中到來 go W.StartOneWorker(i,W.TaskQueue[i]) } } func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){ for{ select { case request :=<- taskqueue: //如果有消息過來,則處理業務 request.Start() default: continue } } } //將任務公平的分發,使用取模(客戶端鏈接與工作池的協程數) func (W *WorkerPool)Put(Connection IConnection){ index := Connection.GetConnID()%uint32(W.cap) W.TaskQueue[index] <- Connection } func NewWorkerPool(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan IConnection,cap), } }