MQ任意延時消息(一)實現原理概述


前置知識

  • 以RocketMQ為例

  • 使用IDR啟動RocketMQ,參考Eclpse啟動RocketMq

  • 大多數消息中間件都支持固定延時隊列,比如RocketMQ支持的默認延時等級messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。參考RocketMQ延時配置

  • Netty和Kafka中用到了時間輪的算法,自行百度

實現思路

  • 配置MQ的延時等級為1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192,16384, 32768, 65536, 131072,單位s
  • 對應1~18延時等級
  • 當發送一個任意延時的消息時,把消息丟到固定等級的延時隊列去,保留剩余延時時間到消息屬性中;消費時檢查剩余延時時間,如果大於0,繼續丟回延時隊列,重復上訴步驟直至剩余延時為0,此時真正的消費這條消息
  • 舉例,需要發送27s延時的消息,16<27<64,於是先丟到固定延時16s的隊列中,然后設置剩余延時為11s,16s后這個消息被消費,檢查剩余延時,此時8<11<16,再丟到8s延時隊列中,重復上訴步驟直至剩余延時為0,把消息交給真正的消費中

注意

  • 延時等級的設置可以使用1)2的倍數,2)斐波那契數列,3)1~59s+1~59m+1-23h+1-29d等形式的一百多個等級,時間輪降級會很快
  • 延時最大的等級最好是一個比較大的值,比如支持最大30天,如果最大的是1天,那么30天的延時需要投遞30次才會降級,所以最后的三個等級設置為1天,7天,15天比較合理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM