二、RabbitMQ 進階特性及使用場景 [.NET]


前言

經過上一篇的介紹,相信大家對RabbitMQ 的各種概念有了一定的了解,及如何使用RabbitMQ.Client 去發送和消費消息。

特性及使用場景

1. TTL 過期時間

TTL可以用來指定queue 和message多久會被去掉;在短期message數量很大,或者訂單需要特定失效(例如15min支付)等場景,設置消息的過期時間可以減輕rabbitmq的壓力,后者可以幫助方便的實現業務。

那么如何設置消息過期時間呢?

  • 為queue中的消息整體設置
 var ttlSetting = new Dictionary<string, Object>();
  ttlSetting.Add("x-message-ttl", 10000);
queueName =  _channel.QueueDeclare(arguments: ttlSetting).QueueName;
  • 為每條message設置過期時間
    var properties = _channel.CreateBasicProperties();
    properties.Expiration = "10000";
     var messageBytes = ObjectToByteArray(message);
    _channel.BasicPublish(TicketExchangeName, optType, false, properties, messageBytes);

     

隊列TTL 設置

我們不但可以對message 設置過期時間,還可以對消息隊列設置生存時間,當超過設定時間未被使用,該消息隊列可以被自動刪除。

設置方法同樣是在聲明隊列的時候傳入參數即可:

var ttlSetting = new Dictionary<string, Object>();         
ttlSetting.Add("x-expires", 6000);
 queueName =  _channel.QueueDeclare(arguments: ttlSetting).QueueName;

 2.  死信交換器和死信隊列

何為死信?

  • 被拒絕的消息
  • 過期的消息
  • 消息隊列達到最大長度

當死信發生時,死信會通過死信交換機進入到聲明的死信隊列,可以通過參數聲明:

_channel.ExchangeDeclare(DLExchangeName, ExchangeType.Direct);            
var arguments = new Dictionary<string, Object>();
arguments.Add("x-dead-letter-exchange", DLExchangeName);
_channel.QueueDeclare("dlqueue", arguments: arguments);

死信消息的路由,如果在消息初始被投送到的隊列上設置了  x-dead-letter-routing-key 參數,則死信消息路由時以這個為准,否則按照它被產生時的路由key為准。例如,如果您將消息發布到具有路由密鑰 foo 的交換,並且該消息是死信的,它將被發布到其具有路由密鑰 foo 的死信交換。如果消息最初登陸的隊列已聲明為 x-dead-letter-routing-key 設置為 bar,則消息將發布到其帶有路由鍵 bar 的死信交換。

查看死信隊列中的消息可以幫助我們了解分析程序的運行健康情況,明確異常發生的原因。

 

3. 交換機,隊列,消息的持久化

 交換機的持久化是我們在使用rabbitmq經常需要做的事情,聲明交換器時將 durable 參數設置為 true 來實現的。如果不設置持久化屬性的話,當 RabbitMQ 服務重啟后交換器的數據就會丟失,需要注意的是,是交換器的數據丟失,消息不會丟失,只是不能將消息發送到這個交換器中了,一般生產環境使用都會把該屬性設置為持久化。

隊列的持久化:先介紹隊列的持久化,是因為消息的持久化是依賴在隊列持久化之上的。通過在聲明時將durable 參數設置為 true 達到目的。

_channel.QueueDeclare(“myqueue”, true, false, arguments: ttlSetting)

消息的持久化:通過在publish 消息時,設置基本屬性 DeliveryMode 來實現消息的持久性。

var properties = _channel.CreateBasicProperties();
properties.DeliveryMode = 2;

 

4.  消息的確認機制

相比於http請求和RPC,message queue的應用確實帶來了不少便利,但引入新的application不可避免的增加了可靠性的考慮;如何確保消息從生產者准確的投遞到了rabbitmq broker,又如何確保消費者真的消費了消息,又或者消費者沒有重復消費消息?

這些都是我們在使用消息隊列時候要考慮的問題,如何你使用何種技術(RabbitMQ或者是Kafka)。

 其中,在保證消息的准確投遞和消費上,RabbitMQ提供了確認機制幫助我們確保消息的可靠性。

生產端的消息確認機制:

消息確認的處理辦法分為:單個消息同步確認(吞吐量低),批量消息同步確認,消息異步確認。這里只介紹異步confirm的情況。

var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
  // code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
  //code when message is nack-ed
};
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

如果希望在投遞失敗后,重新投遞,可以用 ConcurrentDictionary 存儲deliverTag 和 消息體,在投遞失敗后,重新投遞。

 

消費端的消息確認機制:

1. 自動應答

消費者在消費消息的時候,如果設定應答模式為自動,則消費者收到消息后,消息就會立即被 RabbitMQ 從 隊列中刪除掉。
因此,在實際開發者,我們基本上是將消費應答模式設置為手動確認更為妥當一些。

_channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

2. 手動應答

消費者在收到消息后:

  • 可以在既定的正常情況下進行確認(告訴 RabbitMQ,我已經消費過該消息了,你可以刪除該條數據了);
  • 可以在既定的異常情況下不進行確認(RabbitMQ 會繼續保留該條數據),這樣下一次可以繼續消費該條數據。
_channel.BasicAck(ea.DeliveryTag, false);

 本文介紹了過期時間,死信隊列,應答機制和持久化的設置和使用場景,后續還會繼續介紹RabbitMQ的其它特性和集群搭建,歡迎大家關注和提出意見。

---------------------------------------------------


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM