總結:消息隊列的一些特性。
過期時間(TTL)
Time To Live,也就是生存時間,是一條消息在隊列中的最大存活時間,單位是毫秒。了解Redis的朋友應該一看就明白,二者很像。
RabbitMQ可以對消息和隊列設置TTL。
RabbitMQ支持設置消息的過期時間,在消息發送的時候可以進行指定,每條消息的過期時間可以不同。
RabbitMQ支持設置隊列的過期時間,從消息入隊列開始計算,直到超過了隊列的超時時間配置,那么消息會變成死信,自動清除。
如果兩種方式一起使用,則過期時間以兩者中較小的那個數值為准。
當然也可以不設置TTL,不設置表示消息不會過期;如果設置為0,則表示除非此時可以直接將消息投遞到消費者,否則該消息將被立即丟棄。
消息確認
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供了消息確認機制。消費者訂閱隊列的時候,可以指定autoAck參數,當autoAck為true的時候,RabbitMQ采用自動確認模式,RabbitMQ自動把發送出去的消息設置為確認,然后從內存或者硬盤中刪除,而不管消費者是否真正消費到了這些消息。當autoAck為false的時候,RabbitMQ會等待消費者回復的確認信號,收到確認信號之后才從內存或者磁盤中刪除消息。
消息確認機制是RabbitMQ消息可靠性投遞的基礎,只要設置autoAck參數為false,消費者就有足夠的時間處理消息,不用擔心處理消息的過程中消費者進程掛掉后消息丟失的問題。
持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保證消息可靠性的呢?答案就是消息持久化。持久化可以防止在異常情況下丟失數據。RabbitMQ的持久化分為三個部分:交換器持久化、隊列持久化和消息的持久化。
交換器持久化可以通過在聲明隊列時將durable參數設置為true。如果交換器不設置持久化,那么在RabbitMQ服務重啟之后,相關的交換器元數據會丟失,不過消息不會丟失,只是不能將消息發送到這個交換器了。
隊列的持久化能保證其本身的元數據不會因異常情況而丟失,但是不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將其設置為持久化。隊列的持久化可以通過在聲明隊列時將durable參數設置為true。
設置了隊列和消息的持久化,當RabbitMQ服務重啟之后,消息依然存在。如果只設置隊列持久化或者消息持久化,重啟之后消息都會消失。
當然,也可以將所有的消息都設置為持久化,但是這樣做會影響RabbitMQ的性能,因為磁盤的寫入速度比內存的寫入要慢得多。對於可靠性不是那么高的消息可以不采用持久化處理以提高整體的吞吐量。魚和熊掌不可兼得,關鍵在於選擇和取舍。在實際中,需要根據實際情況在可靠性和吞吐量之間做一個權衡。
死信隊列
當消息在一個隊列中變成死信之后,他能被重新發送到另一個交換器中,這個交換器成為死信交換器,與該交換器綁定的隊列稱為死信隊列。。
消息變成死信有下面幾種情況:
- 消息被拒絕。通過調用basic.reject或者basic.nack並且設置requeue=false。
- 消息過期
- 隊列達到最大長度
DLX也是一個正常的交換器,和一般的交換器沒有區別,他能在任何的隊列上面被指定,實際上就是設置某個隊列的屬性。當這個隊列中有死信的時候,RabbitMQ會自動將這個消息重新發送到設置的交換器上,進而被路由到另一個隊列,我們可以監聽這個隊列中消息做相應的處理。
死信隊列設置:
-
設置死信隊列的exchange和queue,然后進行綁定
-
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
-
然后進行正常聲明交換器、隊列、綁定,只不過我們需要在隊列上加一個參數即可:arguments.put(“x-dead-letter-exchange”,“dlx.exchange”)
死信隊列有什么用?
當發生異常的時候,消息不能夠被消費者正常消費,被加入到了死信隊列中。后續的程序可以根據死信隊列中的內容分析當時發生的異常,進而改善和優化系統。
延遲隊列
一般的隊列,消息一旦進入隊列就會被消費者立即消費。延遲隊列就是進入該隊列的消息會被消費者延遲消費,延遲隊列中存儲的對象是的延遲消息,“延遲消息”是指當消息被發送以后,等待特定的時間后,消費者才能拿到這個消息進行消費。
延遲隊列用於需要延遲工作的場景。最常見的使用場景:淘寶或者天貓我們都使用過,用戶在下單之后通常有30分鍾的時間進行支付,如果這30分鍾之內沒有支付成功,那么訂單就會自動取消。除了延遲消費,延遲隊列的典型應用場景還有延遲重試。比如消費者從隊列里面消費消息失敗了,可以延遲一段時間以后進行重試。
基於RabbitMQ實現消息延遲隊列方案
延時隊列使用場景
在很多的業務場景中,延時隊列可以實現很多功能,此類業務中,一般上是非實時的,需要延遲處理的,需要進行重試補償的。
- 訂單超時關閉:在支付場景中,一般上訂單在創建后30分鍾或1小時內未支付的,會自動取消訂單。
- 短信或者郵件通知:在一些注冊或者下單業務時,需要在1分鍾或者特定時間后進行短信或者郵件發送相關資料的。本身此類業務於主業務是無關聯性的,一般上的做法是進行異步發送。
- 重試場景:比如消息通知,在第一次通知出現異常時,會在隔幾分鍾之后進行再次重試發送。
RabbitMQ實現延時隊列
本身在
RabbitMQ
中是未直接提供延時隊列功能的,但可以使用TTL(Time-To-Live,存活時間)
和DLX(Dead-Letter-Exchange,死信隊列交換機)
的特性實現延時隊列的功能。
存活時間(Time-To-Live 簡稱 TTL)
RabbitMQ
中可以對隊列和消息分別設置TTL,TTL表明了一條消息可在隊列中存活的最大時間。當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在TTL時間后死亡
成為Dead Letter
。如果既配置了消息的TTL,又配置了隊列的TTL,那么較小的那個值會被取用。
死信交換(Dead Letter Exchanges 簡稱 DLX)
上個知識點也提到了,設置了
TTL
的消息或隊列最終會成為Dead Letter
,當消息在一個隊列中變成死信之后,它能被重新發送到另一個交換機中,這個交換機就是DLX,綁定此DLX的隊列就是死信隊列。
一個消息變成死信一般上是由於以下幾種情況;
- 消息被拒絕
- 消息過期
- 隊列達到了最大長度。
所以,通過TTL
和DLX
的特性可以模擬實現延時隊列的功能。當隊列中的消息超時成為死信后,會把消息死信重新發送到配置好的交換機中,然后分發到真實的消費隊列。故簡單來說,我們可以創建2個隊列,一個隊列用於發送消息,一個隊列用於消息過期后的轉發的目標隊列。
SpringBoot集成RabbitMQ實現延時隊列實戰
以下使用
SpringBoot
集成RabbitMQ
進行實戰說明,在進行http
消息通知時,若通知失敗(地址不可用或者連接超時)時,將此消息轉入延時隊列中,待特定時間后進行重新發送。
源碼地址:
https://github.com/xie19900123/spring-boot-learning/tree/master/chapter-38
RabbitMQ延遲消息的延遲極限是多少?
問題定位
因為不是所有的消息都出現了沒有延遲消息效果的因素,通過有問題的消息特征,大致猜測可能是延遲時間過長導致了消息延遲失敗。為了驗證這個原因,先拿之前文章中的例子,來測試一下延遲時間是否與問題直接相關。
對之前的延遲消息使用樣例(文末的Git倉庫中可以獲取完整代碼)接口做一下微改,增加了一個請求參數delay
來控制延遲時間:
@GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message, @RequestParam Long delay) { log.info("Send: " + message); testTopic.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", delay).build()); return "ok"; }
然后嘗試發起了兩個請求:
請求1:延遲5000毫秒。消息發送到MQ之后確實延遲了5秒之后才得到了消費,沒有任何問題。
curl localhost:8080/sendMessage?message=hello&delay=5000
請求2:延遲1年(31536000000毫秒)。消息發送到MQ之后馬上就被消費者消費了,完全沒有延遲效果。
curl localhost:8080/sendMessage?message=hello&delay=31536000000
問題小結
在明確了問題原因之后,需要對該功能的時候做一些明確的限定(延遲時間的極限),以避免再次出現類似的問題。深入探索下去,這里的失敗主要與消息的過期時間(TTL)有直接的關系。在RabbitMQ中,消息的過期時間必須是非負 32 位整數,即:0 <= n <= 2^32-1,以毫秒為單位。 其中,2^32-1 = 4294967295。
這里我們可以嘗試下面兩個請求,分別設置延遲時間為4294967295何4294967296:
curl localhost:8080/sendMessage?message=hello&delay=4294967295 curl localhost:8080/sendMessage?message=hello&delay=4294967296
可以發現,當延遲時間為4294967295毫秒的時候,延遲消息工作正常;當延遲時間為4294967296毫秒的時候,消息被直接消費,沒有延遲效果。
所以,我們在使用RabbitMQ的延遲消息功能時候,必須注意它的延遲極限是4294967295毫秒(約 49 天)。如果你的業務需求會超過這個臨界值,就必須避開這個坑,采用其他方法來實現需要延遲或者定時執行的任務了。
參考鏈接:
https://www.cnblogs.com/okong/p/springboot-thirty-eight.html
https://www.cnblogs.com/sgh1023/p/11221364.html
https://www.cnblogs.com/didispace/p/11230308.html
書籍:rabbitmq 實戰