Redis 實戰 —— 09. 實現任務隊列、消息拉取和文件分發


任務隊列 P133

通過將待執行任務的相關信息放入隊列里面,並在之后對隊列進行處理,可以推遲執行那些耗時對操作,這種將工作交給任務處理器來執行對做法被稱為任務隊列 (task queue) 。 P133

先進先出隊列 P133

可以 Redis 的列表結構存儲任務的相關信息,並使用 RPUSH 將待執行任務的相關信息推入列表右端,使用阻塞版本的彈出命令 BLPOP 從隊列中彈出待執行任務的相關信息(因為任務處理器除了執行任務不需要執行其他工作)。 P134

發送任務

// 將任務參數推入指定任務對應的列表右端
func SendTask(conn redis.Conn, queueName string, param string) (bool, error) {
	count, err := redis.Int(conn.Do("RPUSH", queueName, param))
	if err != nil {
		return false, nil
	}
	// 只有成功推入 1 個才算成功發送
	return count == 1, nil
}

執行任務

// 不斷從任務對應的列表中獲取任務參數,並執行任務
func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) {
	for ; ; {
		result, err := redis.Strings(conn.Do("BLPOP", queueName, 10))
		// 如果成功獲取任務信息,則執行任務
		if err != nil && len(result) == 2 {
			taskHandler(result[1])
		}
	}
}

以上代碼是任務隊列與 Redis 交互的通用版本,使用方式簡單,只需要將入參信息序列化成字符串傳入即可發送一個任務,提供一個處理任務的方法回調即可執行任務。

任務優先級 P136

在此基礎上可以講原有的先進先出任務隊列改為具有優先級的任務隊列,即高優先級的任務需要在低優先級的任務之前執行。 BLPOP 將彈出第一個非空列表的第一個元素,所以我們只需要將所有任務隊列名數組按照優先級降序排序,讓任務隊列名數組作為 BLPOP 的入參即可實現上述功能(當然這種如果高優先級任務的生成速率大於消費速率,那么低優先級的任務就永遠不會執行)。 P136

優先執行高優先級任務

// 不斷從任務對應的列表中獲取任務參數,並執行任務
// queueNames 從前往后的優先級依次降低
func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) {
	// 校驗是否所有任務都有對應的處理方法
	for _, queueName := range queueNames {
		if _, exists := queueNameToTaskHandler[queueName]; !exists {
			panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName))
		}
	}
	// 將所有入參放入同一個數組
	length := len(queueNames)
	args := make([]interface{}, length + 1)
	for i := 0; i < length; i++ {
		args[i] = queueNames[i]
	}
	args[length] = 10
	for ; ; {
		result, err := redis.Strings(conn.Do("BLPOP", args...))
		// 如果成功獲取任務信息,則執行任務
		if err != nil && len(result) == 2 {
			// 找到對應的處理方法並執行
			taskHandler := queueNameToTaskHandler[result[0]]
			taskHandler(result[1])
		}
	}
}
延遲任務 P136

實際業務場景中還存在某些任務需要在指定時間進行操作,例如:郵件定時發送等。此時還需要存儲任務執行的時間,並將可以執行的任務放入剛剛的任務隊列中。可以使用有序集合進行存儲,時間戳作為分值,任務相關信息及隊列名等信息的 json 串作為鍵。

發送延遲任務

// 存儲延遲任務的相關信息,用於序列化和反序列化
type delayedTaskInfo struct {
	UnixNano  int64  `json:"unixNano"`
	QueueName string `json:"queueName"`
	Param     string `json:"param"`
}
// 發送一個延遲任務
func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) {
	// 如果已到執行時間,則直接發送到任務隊列
	if executeAt.UnixNano() <= time.Now().UnixNano() {
		return SendTask(conn, queueName, param)
	}
	// 還未到執行時間,需要放入有序集合
	// 序列化相關信息
	infoJson, err := json.Marshal(delayedTaskInfo{
		UnixNano: time.Now().UnixNano(),
		QueueName:queueName,
		Param:param,
	})
	if err != nil {
		return false, err
	}
	// 放入有序集合
	count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano()))
	if err != nil {
		return false, err
	}
	// 只有成功加入 1 個才算成功
	return count == 1, nil
}

拉取可執行的延遲任務,放入任務隊列

// 輪詢延遲任務,將可執行的任務放入任務隊列
func PollDelayedTask(conn redis.Conn) {
	for ; ; {
		// 獲取最早需要執行的任務
		infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES"))
		if err != nil || len(infoMap) != 1 {
			// 睡 1ms 再繼續
			time.Sleep(time.Millisecond)
			continue
		}
		for infoJson, unixNano := range infoMap {
			// 已到時間,放入任務隊列
			executeAt, err := strconv.Atoi(unixNano)
			if err != nil {
				log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano)
				// 做一些后續處理,例如:刪除該條信息,防止耽誤其他延遲任務
			}
			if int64(executeAt) <= time.Now().UnixNano() {
				// 反序列化
				info := new(delayedTaskInfo)
				err := json.Unmarshal([]byte(infoJson), info)
				if err != nil {
					log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano)
					// 做一些后續處理,例如:刪除該條信息,防止耽誤其他延遲任務
				}
				// 從有序集合刪除該信息,並放入任務隊列
				count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson))
				if err != nil && count == 1 {
					_, _ = SendTask(conn, info.QueueName, info.Param)
				}
			} else {
				// 未到時間,睡 1ms 再繼續
				time.Sleep(time.Millisecond)
			}
		}
	}
}

有序集合不具備列表的阻塞彈出機制,所以程序需要不斷循環,並嘗試從隊列中獲取要被執行的任務,這一操作會增大網絡和處理器的負載。可以通過在函數里面增加一個自適應方法 (adaptive method) ,讓函數在一段時間內都沒有發現可執行的任務時,自動延長休眠時間,或者根據下一個任務的執行時間來決定休眠的時長,並將休眠時長的最大值限制為 100ms ,從而確保任務可以被及時執行。 P138

消息拉取 P139

兩個或多個客戶端在互相發送和接收消息的時候,通常會使用以下兩種方法來傳遞信息: P139

  • 消息推送 (push messaging) :即由發送者來確保所有接受者已經成功接收到了消息。 Redis 內置了用於進行消息推送的 PUBLISH 命令和 SUBSCRIBE 命令(05. Redis 其他命令簡介 介紹了這兩個命令的用法和缺陷)
  • 消息拉取 (pull messaging) :即由接受者自己去獲取存儲的信息
單個接受者 P140

單個接受者時,只需要將發送的信息保存至每個接收者對應的列表中即可,使用 RPUSH 可以向執行接受者發送消息,使用 LTRIM 可以移除列表中的前幾個元素來獲取收到的消息。 P140

多個接受者 P141

多個接受者的情況類似群組,即群組內的人發消息,其他人都可以收到。我們可以使用以下幾個數據結構存儲所需數據,以便實現我們的所需的功能:

  • STRING: 群組的消息自增 id
    • INCR: 實現 id 自增並獲取
  • ZSET: 存儲該群組中的每一條信息,分值為當前群組內的消息自增 id
    • ZRANGEBYSCORE: 獲得未獲取的消息
  • ZSET: 存儲該群組中每個人獲得的最新一條消息的 id ,所有消息均未獲取時為 0
    • ZCARD: 獲取群組人數
    • ZRANGE: 經過處理后,可實現哪些消息成功被哪些人接收了的功能
    • ZRANGE: 獲取 id 最小數據,可實現刪除被所有人獲取過的消息的功能
  • ZSET: 存儲一個人所有群組獲得的最新一條消息的 id ,離開群組時自動刪除,加入群組時初始化為 0
    • ZCARD: 獲取所在的群組個數
    • ZRANGE: 經過處理后,可實現批量拉取所有群組的未獲取的消息的功能

文件分發 P145

根據地理位置聚合用戶數據 P146

現在擁有每個 ip 每天進行活動的時間和具體操作,現需要計算每天每個城市的人操作數量(類似於統計日活)。

原始數據十分巨大,所以需要分批讀入內存進行聚合統計,而聚合后的數據相對來說很小,所以完全可以在內存中進行聚合統計,完成后再將結果寫入 Redis 中,可以有效減少程序與 Redis 服務的通信次數,縮短任務時間。

日志分發及處理

現在有一台機器的本地日志需要交給多個日志處理器進行不同的分析。

這種場景類似群組,所以我們可以復用上面提到的支持多個接受者的消息拉取組件。

本地機器:

  1. 將所有日志發送至群組,最后再發送一條結束消息
  2. 等待所有日志處理器處理完(群組對應的完成標識 = 群組內的成員數 - 1)
  3. 清理本次發送的所有日志

日志處理器:

  1. 不斷從群組中拉取消息,並進入相關處理,直至拉取到結束消息
  2. 對群組對應的完成標識進行 INCR ,表示當前日志處理器已完成處理

本文首發於公眾號:滿賦諸機(點擊查看原文) 開源在 GitHub :reading-notes/redis-in-action


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM