RabbitMQ (十二) 消息確認機制 - 發布者確認


消費者確認解決的問題是確認消息是否被消費者"成功消費".

它有個前提條件,那就是生產者發布的消息已經"成功"發送出去了.

因此還需要一個機制來告訴生產者,你發送的消息真的"成功"發送了.

在標准的AMQP 0-9-1,保證消息不會丟失的唯一方法是使用事務:在通道上開啟事務,發布消息,提交事務.但是事務是非常重量級的,它使得RabbitMQ的吞吐量降低250倍.為了解決這個問題,RabbitMQ 引入了 發布者確認(Publisher Confirms) 機制,它是模仿AMQP協議中的消費者消息確認機制.

事務機制

生產者部分代碼:

         try
            {               
                channel.TxSelect();//開啟事務機制        
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes("hello world"));              
 channel.TxCommit();//提交
                Console.WriteLine($"send {msg}");
            }
            catch (Exception e)
            {                
 channel.TxRollback();//回滾
                Console.WriteLine(e);
            }

 

發布者確認

一旦在信道上使用 confirm.select 方法,就認為該信道處於Publisher Confirms模式.事務信道不能進入Publisher Confirms模式,一旦信道處於Publisher Confirms模式,不能開啟事務.即事務和Publisher Confirms模式只能二選一.

發布的消息什么時候會被broker確認?

對於不可路由的消息,broker 將在 exchange 驗證消息不會路由到任何隊列(發回一個空的隊列列表)后發出確認;如果消息被設置為"必需消息"發布,即 BasicPublish() 方法的 "mandatory" 入參為true,那么 BasicReturn 事件將在 BasicAcks 事件之前觸發.否定確認 BasicNacks 事件也是如此.

對於可路由消息,當所有隊列都接受消息時才觸發 BasicAcks 事件,對於路由到持久話隊列的持久性消息,這意味着持久化到磁盤后才會觸發 BasicAcks 事件;對於消息的鏡像隊列,這意味着所有鏡像都已接受該消息后才會觸發 BasicAcks 事件.

發布者確認分為同步和異步兩種.

一.同步

生產者部分代碼

            //開啟confirm機制
            channel.ConfirmSelect();
            string msg = "hello world ";           
            for (int i = 0; i < 10; i++)
            {
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
            }
          
        //可以發送一批消息后,調用該方法;也可以每發一條調用一次.       
            if (channel.WaitForConfirms())
            {
                Console.WriteLine("send is success");
            }
            else
            {
                Console.WriteLine("send is failed");
         //實際應用中,這里需要添加發送消息失敗的處理邏輯.
         //如果是發送一批消息,那么只要有一條失敗,則所有的消息發送都會失敗. }

 

二.異步

生產者部分代碼

          channel.ConfirmSelect();  

//肯定確認 channel.BasicAcks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最后成功的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 成功發送 "); } }; //否定確認 channel.BasicNacks += (s, e) => { //多條 if (e.Multiple) { Console.WriteLine("最后失敗的一條是 : " + e.DeliveryTag); } //單條 else { Console.WriteLine(e.DeliveryTag + " 發送失敗 "); } };

 

發布者的否定確認(BasicNacks)

  • 在特殊情況下,當 broker 無法成功處理消息而不是 BasicAck 時,broker 將發送 BasicNack.在這種情況下,BasicNack 的字段與 BasicAck 相對應的字段意義相同,並且 requeue 字段是沒有意義的.是否重發消息由發送者自己決定;
  • 將channel設置為發布者確認模式后,所有后續發布的消息都只會被 confirm 一次或者 nack 一次;
  • 沒有機制保證消息需要多久被 confirmed;
  • 消息不會同時被confirmed和nack`d;
  • BasicNacks 事件只在負責隊列的Erlang進程中發生內部錯誤時才會觸發;

持久化消息的延遲肯定確認

前面說到,

如果是持久化的消息,要等到消息持久化到磁盤后才會觸發 BasicAcks 事件;對於消息的鏡像隊列,要等到所有鏡像都已接受該消息后才會觸發 BasicAcks 事件.

而為了保證持久化效率, RabbitMQ不是來一條存一條,而是定時批量地持久化消息到磁盤.RabbitMQ 消息存儲一段時間(幾百毫秒)之后或者當隊列空閑時,才會批量寫到磁盤.

這意味着在恆定負載下,BasicAck 的延遲可以達到幾百毫秒.如果隊列支持鏡像隊列,則延遲時間更大.

所以,為了提高吞吐量,強烈建議應用程序采用異步確認方式,或者發布批量消息后等待確認.

發布者確認的注意事項

在大多數情況下,RabbitMQ將以與發布時相同的順序向發布者確認消息(這適用於在單個頻道上發布的消息).但是,發布者確認是異步發出的,可以確認單個消息或一組消息.發出確認的確切時刻取決於消息的傳遞模式(持久性與瞬態)以及消息路由到的隊列的屬性.也就是說,RabbitMQ可能不以消息發布的順序向發布者發送確認消息.生產者端盡量不要依賴消息確認的順序處理業務邏輯. 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM