前言
經過上一篇的介紹,相信大家對RabbitMQ 的各種概念有了一定的了解,及如何使用RabbitMQ.Client 去發送和消費消息。
特性及使用場景
1. TTL 過期時間
TTL可以用來指定queue 和message多久會被去掉;在短期message數量很大,或者訂單需要特定失效(例如15min支付)等場景,設置消息的過期時間可以減輕rabbitmq的壓力,后者可以幫助方便的實現業務。
那么如何設置消息過期時間呢?
- 為queue中的消息整體設置
var ttlSetting = new Dictionary<string, Object>(); ttlSetting.Add("x-message-ttl", 10000); queueName = _channel.QueueDeclare(arguments: ttlSetting).QueueName;
- 為每條message設置過期時間
var properties = _channel.CreateBasicProperties(); properties.Expiration = "10000"; var messageBytes = ObjectToByteArray(message); _channel.BasicPublish(TicketExchangeName, optType, false, properties, messageBytes);
隊列TTL 設置
我們不但可以對message 設置過期時間,還可以對消息隊列設置生存時間,當超過設定時間未被使用,該消息隊列可以被自動刪除。
設置方法同樣是在聲明隊列的時候傳入參數即可:
var ttlSetting = new Dictionary<string, Object>(); ttlSetting.Add("x-expires", 6000); queueName = _channel.QueueDeclare(arguments: ttlSetting).QueueName;
2. 死信交換器和死信隊列
何為死信?
- 被拒絕的消息
- 過期的消息
- 消息隊列達到最大長度
當死信發生時,死信會通過死信交換機進入到聲明的死信隊列,可以通過參數聲明:
_channel.ExchangeDeclare(DLExchangeName, ExchangeType.Direct); var arguments = new Dictionary<string, Object>(); arguments.Add("x-dead-letter-exchange", DLExchangeName); _channel.QueueDeclare("dlqueue", arguments: arguments);
死信消息的路由,如果在消息初始被投送到的隊列上設置了 x-dead-letter-routing-key 參數,則死信消息路由時以這個為准,否則按照它被產生時的路由key為准。
例如,如果您將消息發布到具有路由密鑰 foo 的交換,並且該消息是死信的,它將被發布到其具有路由密鑰 foo 的死信交換。如果消息最初登陸的隊列已聲明為 x-dead-letter-routing-key 設置為 bar,則消息將發布到其帶有路由鍵 bar 的死信交換。
查看死信隊列中的消息可以幫助我們了解分析程序的運行健康情況,明確異常發生的原因。
3. 交換機,隊列,消息的持久化
交換機的持久化是我們在使用rabbitmq經常需要做的事情,聲明交換器時將 durable 參數設置為 true 來實現的。如果不設置持久化屬性的話,當 RabbitMQ 服務重啟后交換器的數據就會丟失,需要注意的是,是交換器的數據丟失,消息不會丟失,只是不能將消息發送到這個交換器中了,一般生產環境使用都會把該屬性設置為持久化。
隊列的持久化:先介紹隊列的持久化,是因為消息的持久化是依賴在隊列持久化之上的。通過在聲明時將durable 參數設置為 true 達到目的。
_channel.QueueDeclare(“myqueue”, true, false, arguments: ttlSetting)
消息的持久化:通過在publish 消息時,設置基本屬性 DeliveryMode 來實現消息的持久性。
var properties = _channel.CreateBasicProperties(); properties.DeliveryMode = 2;
4. 消息的確認機制
相比於http請求和RPC,message queue的應用確實帶來了不少便利,但引入新的application不可避免的增加了可靠性的考慮;如何確保消息從生產者准確的投遞到了rabbitmq broker,又如何確保消費者真的消費了消息,又或者消費者沒有重復消費消息?
這些都是我們在使用消息隊列時候要考慮的問題,如何你使用何種技術(RabbitMQ或者是Kafka)。
其中,在保證消息的准確投遞和消費上,RabbitMQ提供了確認機制幫助我們確保消息的可靠性。
生產端的消息確認機制:
消息確認的處理辦法分為:單個消息同步確認(吞吐量低),批量消息同步確認,消息異步確認。這里只介紹異步confirm的情況。
var channel = connection.CreateModel(); channel.ConfirmSelect(); channel.BasicAcks += (sender, ea) => { // code when message is confirmed }; channel.BasicNacks += (sender, ea) => { //code when message is nack-ed };
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
如果希望在投遞失敗后,重新投遞,可以用 ConcurrentDictionary 存儲deliverTag 和 消息體,在投遞失敗后,重新投遞。
消費端的消息確認機制:
1. 自動應答
消費者在消費消息的時候,如果設定應答模式為自動,則消費者收到消息后,消息就會立即被 RabbitMQ 從 隊列中刪除掉。
因此,在實際開發者,我們基本上是將消費應答模式設置為手動確認更為妥當一些。
_channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
2. 手動應答
消費者在收到消息后:
- 可以在既定的正常情況下進行確認(告訴 RabbitMQ,我已經消費過該消息了,你可以刪除該條數據了);
- 可以在既定的異常情況下不進行確認(RabbitMQ 會繼續保留該條數據),這樣下一次可以繼續消費該條數據。
_channel.BasicAck(ea.DeliveryTag, false);
本文介紹了過期時間,死信隊列,應答機制和持久化的設置和使用場景,后續還會繼續介紹RabbitMQ的其它特性和集群搭建,歡迎大家關注和提出意見。
---------------------------------------------------