基於redis的延遲消息隊列設計


需求背景

  • 用戶下訂單成功之后隔20分鍾給用戶發送上門服務通知短信
  • 訂單完成一個小時之后通知用戶對上門服務進行評價
  • 業務執行失敗之后隔10分鍾重試一次

    類似的場景比較多 簡單的處理方式就是使用定時任務 假如數據比較多的時候 有的數據可能延遲比較嚴重,而且越來越多的定時業務導致任務調度很繁瑣不好管理。

隊列設計

目前可以考慮使用rabbitmq來滿足需求 但是不打算使用,因為目前太多的業務使用了另外的MQ中間件。

開發前需要考慮的問題?

  • 及時性 消費端能按時收到
  • 同一時間消息的消費權重
  • 可靠性 消息不能出現沒有被消費掉的情況
  • 可恢復 假如有其他情況 導致消息系統不可用了 至少能保證數據可以恢復
  • 可撤回 因為是延遲消息 沒有到執行時間的消息支持可以取消消費
  • 高可用 多實例 這里指HA/主備模式並不是多實例同時一起工作
  • 消費端如何消費

    當然初步選用redis作為數據緩存的主要原因是因為redis自身支持zset的數據結構(score 延遲時間毫秒) 這樣就少了排序的煩惱而且性能還很高,正好我們的需求就是按時間維度去判定執行的順序 同時也支持map list數據結構。

簡單定義一個消息數據結構

 
private String topic;/***topic**/
private String id;/***自動生成 全局惟一 snowflake**/
private String bizKey;
private long delay;/***延時毫秒數**/
private int priority;//優先級
private long ttl;/**消費端消費的ttl**/
private String body;/***消息體**/
private long createTime=System.currentTimeMillis();
private int status= Status.WaitPut.ordinal();

運行原理:

  1. Map來存儲元數據。id作為key,整個消息結構序列化(json/…)之后作為value,放入元消息池中。
  2. id放入其中(有N個)一個zset有序列表中,以createTime+delay+priority作為score。修改狀態為正在延遲中
  3. 使用timer實時監控zset有序列表中top 10的數據 。 如果數據score<=當前時間毫秒就取出來,根據topic重新放入一個新的可消費列表(list)中,在zset中刪除已經取出來的數據,並修改狀態為待消費
  4. 客戶端獲取數據只需要從可消費隊列中獲取就可以了。並且狀態必須為待消費 運行時間需要<=當前時間的 如果不滿足 重新放入zset列表中,修改狀態為正在延遲。如果滿足修改狀態為已消費。或者直接刪除元數據。

客戶端

因為涉及到不同程序語言的問題,所以當前默認支持http訪問方式。

  1. 添加延時消息添加成功之后返回消費唯一ID POST /push {…..消息體}
  2. 刪除延時消息 需要傳遞消息ID GET /delete?id=
  3. 恢復延時消息 GET /reStore?expire=true|false expire是否恢復已過期未執行的消息。
  4. 恢復單個延時消息 需要傳遞消息ID GET /reStore/id
  5. 獲取消息 需要長連接 GET /get/topic

用nginx暴露服務,配置為輪詢 在添加延遲消息的時候就可以流量平均分配。

目前系統中客戶端並沒有采用HTTP長連接的方式來消費消息,而是采用MQ的方式來消費數據這樣客戶端就可以不用關心延遲消息隊列。只需要在發送MQ的時候攔截一下 如果是延遲消息就用延遲消息系統處理。

消息可恢復

實現恢復的原理 正常情況下一般都是記錄日志,比如mysqlbinlog等。

這里我們直接采用mysql數據庫作為記錄日志。

目前打算創建以下2張表:

  1. 消息表 字段包括整個消息體
  2. 消息流轉表 字段包括消息ID、變更狀態、變更時間、zset掃描線程Name、host/ip

定義zset掃描線程Name是為了更清楚的看到消息被分發到具體哪個zset中。前提是zset的key和監控zset的線程名稱要有點關系 這里也可以是zset key。

舉個栗子

假如redis服務器宕機了,重啟之后發現數據也沒有了。所以這個恢復是很有必要的,只需要從表1也就是消息表中把消息狀態不等於已消費的數據全部重新分發到延遲隊列中去,然后同步一下狀態就可以了。

當然恢復單個任務也可以這么干。

關於高可用

分布式協調還是選用zookeeper吧。

如果有多個實例最多同時只能有1個實例工作 這樣就避免了分布式競爭鎖帶來的壞處,當然如果業務需要多個實例同時工作也是支持的,也就是一個消息最多只能有1個實例處理,可以選用zookeeper或者redis就能實現分布式鎖了。

最終做了一下測試多實例同時運行,可能因為會涉及到鎖的問題性能有所下降,反而單機效果很好。所以比較推薦基於docker的主備部署模式。

擴展

支持zset隊列個數可配置 避免大數據帶來高延遲的問題。

目前存在日志和redis元數據有可能不一致的問題 如mysql掛了,寫日志不會成功。

設計圖:

 

 

另外分享一個不完整簡陋的開源版本  https://github.com/peachyy/sdmq.git  后期會進行模塊拆分 優化

轉自:https://www.cnblogs.com/peachyy/p/7398430.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM