背景
在業務發展過程中,會出現一些需要延時處理的場景,比如:
a.訂單下單之后超過30分鍾用戶未支付,需要取消訂單
b.訂單一些評論,如果48h用戶未對商家評論,系統會自動產生一條默認評論
c.點我達訂單下單后,超過一定時間訂單未派出,需要超時取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是完全沒問題,但是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,導致數據庫的慢查或者查詢超時。所以在處理這類需求時候,采用了延時隊列來完成。
幾種延時隊列
延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:
1.Java中java.util.concurrent.DelayQueue
優點:JDK自身實現,使用方便,量小適用
缺點:隊列消息處於jvm內存,不支持分布式運行和消息持久化
2.Rocketmq延時隊列
優點:消息持久化,分布式
缺點:不支持任意時間精度,只支持特定level的延時消息
3.Rabbitmq延時隊列(TTL+DLX實現)
優點:消息持久化,分布式
缺點:延時相同的消息必須扔在同一個隊列
根據自身業務和公司情況,如果實現一個自己的延時隊列服務需要考慮一下幾點:
* 消息存儲
* 過期延時消息實時獲取
* 高可用性
基於Redis實現
1.0版本
功能特性
* 消息可靠性,消息持久化,消息至少被消費一次
* 實時性:存在一定的時間誤差(定時任務間隔)
* 支持指定消息remove
* 高可用性
整體結構
- Messages Pool所有的延時消息存放,結構為KV結構,key為消息ID,value為一個具體的message(這里選擇Redis Hash結構主要是因為hash結構能存儲較大的數據量,數據較多時候會進行漸進式rehash擴容,並且對於HSET和HGET命令來說時間復雜度都是O(1))
- Delayed Queue是16個有序隊列(隊列支持水平擴展),結構為ZSET,value為messages pool中消息ID,score為過期時間(分為多個隊列是為了提高掃描的速度)
- Timed Task定時任務,負責掃描處理每個隊列過期消息
消息結構
每個延時消息必須包括以下參數:
* tags:消息過期之后發送mq的tags
* keys:消息過期之后發送mq的keys
* body:消息過期之后發送mq的body,提供給消費這做具體的消息處理
* delayTime:延時發送時間(默認,delayTime、expectDate有一個即可)
* expectDate:期望發送時間
流程
注:上圖1、2、3或者2、3是一個事務操作
取出過期消息過程是通過一個外部定時任務每隔1min分鍾去查詢隊列中過期的消息,然后發送mq && remove
2.0版本
1.0上有一個可改進的地方就是隊列中過期的消息是通過定時任務觸發查詢。所有有了2.0
2.0版本在1.0上做了一個優化,廢棄掉了1min定時任務觸發過期消息發送,采用了java Lock await/singlal方式實現過期消息的實時發送低延時
多節點部署結構:
- pull job:這里分別為每一個隊列創建了一個pull job thread,功能很簡單,就是負責去隊列中拉取過期的消息數據(這里保證一個隊列有且只有一個pull job)
- worker:pull job拉取到的過期消息會交給一個worker thread去處理,這樣的好處是處理過期的消息實時性更高(pull job不必等去除過期消息全部處理完成在繼續去拉取新的過期數據)
- zookeeper coordinate:通過zk的操作來完成對隊列的重新分配工作,daemon thread監聽zk節點的創建和刪除
主要流程:
服務啟動會注冊zk,獲取分配處理的queues,啟動后台線程監聽zk
為每個分配queue創建一個pull job
pull job首先會去queue中查詢是否有過期消息:
Y:將取出消息交給worker處理
N:查詢queue中最后一個成員(zset結構默認按score遞增排序),如果為空,則await;不為空則await(成員score-System.currentTimeMillis())
由於過期消息發送成功才會從隊列中remove,所以pull job會記錄上一次查詢隊列的一個offset,每次獲取到過期消息會將offset向前偏移,過期消息交給worker處理,當worker由於某些異常原因處理失敗會重置pull job中offset,這樣可以避免消息發送一次失敗之后沒辦法在繼續處理(除了新節點add || remove時候)
當部署服務有新增,延時隊列服務會重新計算得到當前處理隊列,並將之前創建pull job cancel,為新處理隊列重新創建pull job。刪除同理。
</ol>