golang 實現延遲消息原理與方法


實現延遲消息具體思路我是看的下面這篇文章

https://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ

實現延遲消息最主要的兩個結構:

環形隊列:通過golang中的數組實現,分成3600個slot。

任務集合:通過map[key]*Task,每個slot一個map,map的值就是我們要執行的任務。

原理圖如下:

實現代碼如下:

package main;

import (
	"time"
	"errors"
	"fmt"
)

//延遲消息
type DelayMessage struct {
	//當前下標
	curIndex int;
	//環形槽
	slots [3600]map[string]*Task;
	//關閉
	closed chan bool;
	//任務關閉
	taskClose chan bool;
	//時間關閉
	timeClose chan bool;
	//啟動時間
	startTime time.Time;
}

//執行的任務函數
type TaskFunc func(args ...interface{});

//任務
type Task struct {
	//循環次數
	cycleNum int;
	//執行的函數
	exec   TaskFunc;
	params []interface{};
}

//創建一個延遲消息
func NewDelayMessage() *DelayMessage {
	dm := &DelayMessage{
		curIndex:  0,
		closed:    make(chan bool),
		taskClose: make(chan bool),
		timeClose: make(chan bool),
		startTime: time.Now(),
	};
	for i := 0; i < 3600; i++ {
		dm.slots[i] = make(map[string]*Task);
	}
	return dm;
}

//啟動延遲消息
func (dm *DelayMessage) Start() {
	go dm.taskLoop();
	go dm.timeLoop();
	select {
	case <-dm.closed:
		{
			dm.taskClose <- true;
			dm.timeClose <- true;
			break;
		}
	};
}

//關閉延遲消息
func (dm *DelayMessage) Close() {
	dm.closed <- true;
}

//處理每1秒的任務
func (dm *DelayMessage) taskLoop() {
	defer func() {
		fmt.Println("taskLoop exit");
	}();
	for {
		select {
		case <-dm.taskClose:
			{
				return;
			}
		default:
			{
				//取出當前的槽的任務
				tasks := dm.slots[dm.curIndex];
				if len(tasks) > 0 {
					//遍歷任務,判斷任務循環次數等於0,則運行任務
					//否則任務循環次數減1
					for k, v := range tasks {
						if v.cycleNum == 0 {
							go v.exec(v.params...);
							//刪除運行過的任務
							delete(tasks, k);
						} else {
							v.cycleNum--;
						}
					}
				}
			}
		}
	}
}

//處理每1秒移動下標
func (dm *DelayMessage) timeLoop() {
	defer func() {
		fmt.Println("timeLoop exit");
	}();
	tick := time.NewTicker(time.Second);
	for {
		select {
		case <-dm.timeClose:
			{
				return;
			}
		case <-tick.C:
			{
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"));
				//判斷當前下標,如果等於3599則重置為0,否則加1
				if dm.curIndex == 3599 {
					dm.curIndex = 0;
				} else {
					dm.curIndex++;
				}
			}
		}
	}
}

//添加任務
func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {
	if dm.startTime.After(t) {
		return errors.New("時間錯誤");
	}
	//當前時間與指定時間相差秒數
	subSecond := t.Unix() - dm.startTime.Unix();
	//計算循環次數
	cycleNum := int(subSecond / 3600);
	//計算任務所在的slots的下標
	ix := subSecond % 3600;
	//把任務加入tasks中
	tasks := dm.slots[ix];
	if _, ok := tasks[key]; ok {
		return errors.New("該slots中已存在key為" + key + "的任務");
	}
	tasks[key] = &Task{
		cycleNum: cycleNum,
		exec:     exec,
		params:   params,
	};
	return nil;
}

func main() {
	//創建延遲消息
	dm := NewDelayMessage();
	//添加任務
	dm.AddTask(time.Now().Add(time.Second*10), "test1", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{1, 2, 3});
	dm.AddTask(time.Now().Add(time.Second*10), "test2", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{4, 5, 6});
	dm.AddTask(time.Now().Add(time.Second*20), "test3", func(args ...interface{}) {
		fmt.Println(args...);
	}, []interface{}{"hello", "world", "test"});
	dm.AddTask(time.Now().Add(time.Second*30), "test4", func(args ...interface{}) {
		sum := 0;
		for arg := range args {
			sum += arg;
		}
		fmt.Println("sum : ", sum);
	}, []interface{}{1, 2, 3});

	//40秒后關閉
	time.AfterFunc(time.Second*40, func() {
		dm.Close();
	});
	dm.Start();
}

測試結果如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM