rabbitmq延遲隊列的實現(利用死信隊列)


普通的延遲隊列不細說了,無論是設置統一的隊列TTL,還是設置消息的TTL,全都是利用DeadLetterQueue:消息失效后扔到死信隊列,消費者從死信隊列里讀消息。但在消息失效的過程中存在一個問題,比如如下場景:

延遲隊列中依次收到如下消息 Message A: TTL 2000 Message B: TTL 100 Message C: TTL 5000 當延遲隊列中消息超時后,移至死信隊列

實際執行情況是rabbitMQ從隊列頭取到消息A,等待2秒,超時,發至死信隊列 -> 判斷消息B,發現已經超時,立即發至死信隊列。原因是TTL被存在消息的內部,rabbitMQ一直去掃描每條消息的TTL,而是只判斷隊列頭消息是否失效,於是消息B實際失效時間是2000ms。

目前rabbitMQ是不支持任意超時時間的(據說rocketMQ提供有限支持,沒用過),但可以通過安裝一個插件來解決:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安裝過程很簡單,下載下來,在rabbitMQ的sbin目錄下執行安裝

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件使用Exchange交換機來實現的TTL,而不是rabbitMQ那樣通過死信隊列:首先設置一個可延遲的Exchange,該Exchange會將收到的消息存放,按照延遲時間排序,直到達到延遲,才被轉發到實際的執行隊列。

啟用插件后,進入rabbitMQ的管理頁面進行配置:

新建一個Exchange,Type必須選擇x-delayed-message,添加參數x-delayed-type=direct,然后綁定到執行隊列

$msg = new AMQPMessage(json_encode([ 'event' => $eventName, 'params' => $params ]), [ 'delivery_mode' => 2, ]); $headers = new AMQPTable(['x-delay' => $delay]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, self::EXCHANGE_DELAY_ANY_NAME, self::EXCHANGE_KEY_NOW);

接下來,為投遞到該Exchange的消息添加插件所需的header:x-delay,值就是要延遲的時間,單位毫秒,如果不添加這個頭,所有流入該交換機的消息都會立即被轉發到執行隊列。注意如果使用php-amqplib的話,需要用AMQPTable設置這個header

接下來按照剛才的三條消息測試一下,發現B消息成功比A消息進入隊列了。搞定

 

轉載:http://www.xuyanzhe.cn/?p=115

具體實現參考鏈接: https://juejin.im/post/5a12ffd451882578da0d7b3a

高級特性:https://zhuanlan.zhihu.com/p/60141062


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM