消費者確認解決的問題是確認消息是否被消費者"成功消費".
它有個前提條件,那就是生產者發布的消息已經"成功"發送出去了.
因此還需要一個機制來告訴生產者,你發送的消息真的"成功"發送了.
在標准的AMQP 0-9-1,保證消息不會丟失的唯一方法是使用事務:在通道上開啟事務,發布消息,提交事務.但是事務是非常重量級的,它使得RabbitMQ的吞吐量降低250倍.為了解決這個問題,RabbitMQ 引入了 發布者確認(Publisher Confirms) 機制,它是模仿AMQP協議中的消費者消息確認機制.
事務機制
生產者部分代碼:
try { channel.TxSelect();//開啟事務機制 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes("hello world")); channel.TxCommit();//提交 Console.WriteLine($"send {msg}"); } catch (Exception e) { channel.TxRollback();//回滾 Console.WriteLine(e); }
發布者確認
一旦在信道上使用 confirm.select 方法,就認為該信道處於Publisher Confirms模式.事務信道不能進入Publisher Confirms模式,一旦信道處於Publisher Confirms模式,不能開啟事務.即事務和Publisher Confirms模式只能二選一.
發布的消息什么時候會被broker確認?
對於不可路由的消息,broker 將在 exchange 驗證消息不會路由到任何隊列(發回一個空的隊列列表)后發出確認;如果消息被設置為"必需消息"發布,即 BasicPublish() 方法的 "mandatory" 入參為true,那么 BasicReturn 事件將在 BasicAcks 事件之前觸發.否定確認 BasicNacks 事件也是如此.
對於可路由消息,當所有隊列都接受消息時才觸發 BasicAcks 事件,對於路由到持久話隊列的持久性消息,這意味着持久化到磁盤后才會觸發 BasicAcks 事件;對於消息的鏡像隊列,這意味着所有鏡像都已接受該消息后才會觸發 BasicAcks 事件.
發布者確認分為同步和異步兩種.
一.同步
生產者部分代碼
//開啟confirm機制 channel.ConfirmSelect(); string msg = "hello world "; for (int i = 0; i < 10; i++) { channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i)); } //可以發送一批消息后,調用該方法;也可以每發一條調用一次. if (channel.WaitForConfirms()) { Console.WriteLine("send is success"); } else { Console.WriteLine("send is failed");
//實際應用中,這里需要添加發送消息失敗的處理邏輯.
//如果是發送一批消息,那么只要有一條失敗,則所有的消息發送都會失敗. }
二.異步
生產者部分代碼
channel.ConfirmSelect();
//肯定確認 channel.BasicAcks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最后成功的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 成功發送 "); } }; //否定確認 channel.BasicNacks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最后失敗的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 發送失敗 "); } };
發布者的否定確認(BasicNacks)
- 在特殊情況下,當 broker 無法成功處理消息而不是 BasicAck 時,broker 將發送 BasicNack.在這種情況下,BasicNack 的字段與 BasicAck 相對應的字段意義相同,並且 requeue 字段是沒有意義的.是否重發消息由發送者自己決定;
- 將channel設置為發布者確認模式后,所有后續發布的消息都只會被 confirm 一次或者 nack 一次;
- 沒有機制保證消息需要多久被 confirmed;
- 消息不會同時被confirmed和nack`d;
- BasicNacks 事件只在負責隊列的Erlang進程中發生內部錯誤時才會觸發;
持久化消息的延遲肯定確認
前面說到,
如果是持久化的消息,要等到消息持久化到磁盤后才會觸發 BasicAcks 事件;對於消息的鏡像隊列,要等到所有鏡像都已接受該消息后才會觸發 BasicAcks 事件.
而為了保證持久化效率, RabbitMQ不是來一條存一條,而是定時批量地持久化消息到磁盤.RabbitMQ 消息存儲一段時間(幾百毫秒)之后或者當隊列空閑時,才會批量寫到磁盤.
這意味着在恆定負載下,BasicAck 的延遲可以達到幾百毫秒.如果隊列支持鏡像隊列,則延遲時間更大.
所以,為了提高吞吐量,強烈建議應用程序采用異步確認方式,或者發布批量消息后等待確認.
發布者確認的注意事項
在大多數情況下,RabbitMQ將以與發布時相同的順序向發布者確認消息(這適用於在單個頻道上發布的消息).但是,發布者確認是異步發出的,可以確認單個消息或一組消息.發出確認的確切時刻取決於消息的傳遞模式(持久性與瞬態)以及消息路由到的隊列的屬性.也就是說,RabbitMQ可能不以消息發布的順序向發布者發送確認消息.生產者端盡量不要依賴消息確認的順序處理業務邏輯.