實現延遲消息具體思路我是看的下面這篇文章
https://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ
實現延遲消息最主要的兩個結構:
環形隊列:通過golang中的數組實現,分成3600個slot。
任務集合:通過map[key]*Task,每個slot一個map,map的值就是我們要執行的任務。
原理圖如下:
實現代碼如下:
package main; import ( "time" "errors" "fmt" ) //延遲消息 type DelayMessage struct { //當前下標 curIndex int; //環形槽 slots [3600]map[string]*Task; //關閉 closed chan bool; //任務關閉 taskClose chan bool; //時間關閉 timeClose chan bool; //啟動時間 startTime time.Time; } //執行的任務函數 type TaskFunc func(args ...interface{}); //任務 type Task struct { //循環次數 cycleNum int; //執行的函數 exec TaskFunc; params []interface{}; } //創建一個延遲消息 func NewDelayMessage() *DelayMessage { dm := &DelayMessage{ curIndex: 0, closed: make(chan bool), taskClose: make(chan bool), timeClose: make(chan bool), startTime: time.Now(), }; for i := 0; i < 3600; i++ { dm.slots[i] = make(map[string]*Task); } return dm; } //啟動延遲消息 func (dm *DelayMessage) Start() { go dm.taskLoop(); go dm.timeLoop(); select { case <-dm.closed: { dm.taskClose <- true; dm.timeClose <- true; break; } }; } //關閉延遲消息 func (dm *DelayMessage) Close() { dm.closed <- true; } //處理每1秒的任務 func (dm *DelayMessage) taskLoop() { defer func() { fmt.Println("taskLoop exit"); }(); for { select { case <-dm.taskClose: { return; } default: { //取出當前的槽的任務 tasks := dm.slots[dm.curIndex]; if len(tasks) > 0 { //遍歷任務,判斷任務循環次數等於0,則運行任務 //否則任務循環次數減1 for k, v := range tasks { if v.cycleNum == 0 { go v.exec(v.params...); //刪除運行過的任務 delete(tasks, k); } else { v.cycleNum--; } } } } } } } //處理每1秒移動下標 func (dm *DelayMessage) timeLoop() { defer func() { fmt.Println("timeLoop exit"); }(); tick := time.NewTicker(time.Second); for { select { case <-dm.timeClose: { return; } case <-tick.C: { fmt.Println(time.Now().Format("2006-01-02 15:04:05")); //判斷當前下標,如果等於3599則重置為0,否則加1 if dm.curIndex == 3599 { dm.curIndex = 0; } else { dm.curIndex++; } } } } } //添加任務 func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error { if dm.startTime.After(t) { return errors.New("時間錯誤"); } //當前時間與指定時間相差秒數 subSecond := t.Unix() - dm.startTime.Unix(); //計算循環次數 cycleNum := int(subSecond / 3600); //計算任務所在的slots的下標 ix := subSecond % 3600; //把任務加入tasks中 tasks := dm.slots[ix]; if _, ok := tasks[key]; ok { return errors.New("該slots中已存在key為" + key + "的任務"); } tasks[key] = &Task{ cycleNum: cycleNum, exec: exec, params: params, }; return nil; } func main() { //創建延遲消息 dm := NewDelayMessage(); //添加任務 dm.AddTask(time.Now().Add(time.Second*10), "test1", func(args ...interface{}) { fmt.Println(args...); }, []interface{}{1, 2, 3}); dm.AddTask(time.Now().Add(time.Second*10), "test2", func(args ...interface{}) { fmt.Println(args...); }, []interface{}{4, 5, 6}); dm.AddTask(time.Now().Add(time.Second*20), "test3", func(args ...interface{}) { fmt.Println(args...); }, []interface{}{"hello", "world", "test"}); dm.AddTask(time.Now().Add(time.Second*30), "test4", func(args ...interface{}) { sum := 0; for arg := range args { sum += arg; } fmt.Println("sum : ", sum); }, []interface{}{1, 2, 3}); //40秒后關閉 time.AfterFunc(time.Second*40, func() { dm.Close(); }); dm.Start(); }
測試結果如下: