RabbitMQ消息發布和消費的確認機制


前言

新公司項目使用的消息隊列是RabbitMQ,之前其實沒有在實際項目上用過RabbitMQ,所以對它的了解都談不上入門。趁着周末休息的時間也猛補習了一波,寫了兩個窗體應用,一個消息發布端和消息消費端。園子里解釋RabbitMQ基礎的很多了,這里就不對RabbitMQ的基礎再做敘述了,來點實際工作中一定會碰到的問題和解決的方案。

RabbitMQ 消息發布確認機制

默認情況下消息發布端執行BasicPublish方法后,消息是否到達指定的隊列的結果發布端是未知的。BasicPublish方法的返回值是void。假設我們想對消息進行監控,針對消息發送失敗后進行補發則需要一個消息確認機制來幫我們實現。

  • 事務機制
  • Confirm確認機制

上面是已知可通過RabbitMQ自帶的特性實現消息確認機制的兩種方式。

事務機制

事務機制依賴三個RabbitMQ提供的方法

  • txSelect()
  • txCommit()
  • txRollback()
    看名字大概知道意思了,特別是Commit和Rollback,使用方式和數據庫的事務使用幾乎一樣,txSelect()聲明事務的開始,txCommit()提交事務,txRollBack()執行提交失敗后的回滾。
    使用代碼如下:
        // 采取RabbitMQ事務方式傳輸消息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與隊列和交換機進行綁定 否則就算事務提交了隊列也不會有數據~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("沒錯!我是故意拋出異常的!看看最終隊列是否寫入了消息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 異常產生時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},異常信息:{ex.Message}";
                            channel.TxRollback();
                            // TODO 進行補發OR其他邏輯處理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"發送消息失敗!{ex.Message}");
            }
        }

  

需要注意的是 這里的事務其實也只能保證在執行BasicPublish方法后且TxCommit方法執行前但凡出現異常則回滾!
上面是什么意思呢?意思就是我只管消息發送到隊列里,且在我定義的事務內沒有出現異常,出現了異常則將發布的數據給撤銷!
但是,如果事務也提交了,但是消息還是有可能不會送達隊列里去

比如,我將上面的代碼改下

        // 采取RabbitMQ事務方式傳輸消息
        private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與隊列和交換機進行綁定 否則就算事務提交了隊列也不會有數據~
                       //  channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            channel.TxSelect();
                            channel.BasicPublish(exchangeName, routingKey, null, messagebuffer);
                            //if (1 == 1) throw new Exception("沒錯!我是故意拋出異常的!看看最終隊列是否寫入了消息~");
                            channel.TxCommit();
                        }
                        catch (Exception ex)
                        {
                            Rtx_Receive.Text = Rtx_Receive.Text + $"\r 異常產生時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},異常信息:{ex.Message}";
                            channel.TxRollback();
                            // TODO 進行補發OR其他邏輯處理
                        }
                        Rtx_Receive.Text = Rtx_Receive.Text + $"\r 發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"發送消息失敗!{ex.Message}");
            }
        }

  上面代碼我將`channel.QueueBind(queueName, exchangeName, routingKey);` 這一行代碼注釋掉,不將routingKey進行綁定,然后在RabbitMQ管理頁面將隊列、交換機刪除。如下圖

 

 

 

再執行代碼,發現隊列是創建了,交換器也創建了,但是隊列里沒有數據!

 

當然,問題出在沒有將隊列和交換器以及routingKey進行綁定,我們的消息沒有進入到隊列的路由,最終導致了消息進入了所謂的“黑洞”。

所以上面的隊列不是說能完全保證只要執行了TxRollback()我們的消息隊列就一定會有數據!!!

Confirm確認機制

Confirm確認機制也很容易理解,它要求消息生產端(Producer)對消息發送后RabbitMQ服務端返回一個已接收的指令,Producer收到該指令則認為該消息已經發送成功。同時消費端(Consumer)也有同樣的機制,在從RabbitMQ服務端接收到消息后,需要返回一個已處理的指令給服務端,服務端收到后則會認為該消息已被消費。


下面是采取Confirm確認機制后的發布消息代碼

        // 采取確認機制方式傳輸消息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與隊列和交換機進行綁定 否則就算事務提交了隊列也不會有數據~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 啟用服務器確認機制方式
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 消息發送成功! 發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"發送消息失敗!{ex.Message}");
            }
        }

  

關鍵代碼在於執行BasicPublish之前調用channel.ConfirmSelect()啟用服務器確認,然后在發布后通過調用 WaitForConfirms()得到消息發布結果,
true則表示消息已發布到了隊列里。 OK,現在試下,到服務器上刪除掉隊列、交換機信息。然后代碼去掉綁定交換機和路由鍵后試下,看看是否和事務方式一樣無法確認消息是否真正抵達隊列。

刪除隊列和交換機,接下來更改代碼,直接把上面的 channel.QueueBind(queueName, exchangeName, routingKey);注釋掉,然后執行下,看看channel.WaitForConfirms()返回true還是false~
哈哈哈~執行結果是true 但是我們隊列是不會有消息進來的,所以確認機制和事務機制對消息的發布是否抵達隊列監控是一樣的,沒有說哪一種方式能絕對保證消息抵達了隊列

針對消息提交到了指定交換機但是最終沒有寫入到隊列的消息如何追蹤

我們有一種方式可以捕獲發布了消息但是該消息最終沒有寫入到隊列的情況,我們需要注冊IModel的BasicReturn事件,更新后的代碼如下:

       // 采取確認機制方式傳輸消息
        private void SendMessageByAck(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName,
         string routingKey, EnumRabbitExchangeType exchangeType, string message)
        {
            try
            {
                ConnectionFactory rabbitMqFactory = new ConnectionFactory()
                {
                    HostName = mqConnection.HostName,
                    UserName = mqConnection.UserName,
                    Password = mqConnection.Password,
                    Port = mqConnection.Port
                };
                using (IConnection conn = rabbitMqFactory.CreateConnection())
                {
                    using (IModel channel = conn.CreateModel())
                    {
                        channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: false, autoDelete: false, arguments: null);
                        channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                        // 必須執行QueueBind 需要將routingKey與隊列和交換機進行綁定 否則就算事務提交了隊列也不會有數據~
                        channel.QueueBind(queueName, exchangeName, routingKey);
                        byte[] messagebuffer = Encoding.UTF8.GetBytes(message);
                        channel.ConfirmSelect(); // 啟用服務器確認機制方式
                        channel.BasicReturn += Channel_BasicReturn;
                        //mandatory為true表示交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列,那么RabbitMQ 會調用Basic.Return 命令將消息返回給生產者
                        channel.BasicPublish(exchangeName, routingKey, mandatory: true, null, messagebuffer);
                        if (channel.WaitForConfirms())
                        {
                            Rtx_Receive.Text = $"\r 消息發送成功! 發送時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}";
                        }
                    }
                }
            }
            catch (Exception ex)
            {
                MessageBox.Show($"發送消息失敗!{ex.Message}");
            }
        }

        /// <summary>
        /// 當消息發送不到隊列時候觸發
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Channel_BasicReturn(object sender, RabbitMQ.Client.Events.BasicReturnEventArgs e)
        {
            BeginInvoke(new Action(() => { Rtx_Receive.Text = $"\r 消息發送失敗!"; }));
        }  

消息發布確認機制結論

1、事務機制與確認機制都無法百分之百確認消息是否寫入到了緩存,可以理解為兩者都只能確認發布的動作是否成功~但是消息有無進入隊列是無法給予客戶端准確結果;
2、兩者性能比較而言事務的性能損耗更大;
3、注冊IModel的BasicReturn事件可以追蹤到沒有寫入到隊列的消息

RabbitMQ 消息消費確認機制

上面已經知道了如何對消息的發布進行確認,那么消費數據時候我們肯定也想在消費完成后確認該消息已經處理,希望隊列對其進行刪除。
而不是在我們的消費端程序未將消息處理后,隊列就將其刪除了。

在此之前說下RabbitMQ消費者對象的兩種實現方式

  • 繼承DefaultBasicConsumer類
  • 實例化EventingBasicConsumer對象

繼承DefaultBasicConsumer方式

DefaultBasicConsumer是RabbitMQ.Client提供的一個消費者基類,該類實現了IBasicConsumer接口。
繼承DefaultBasicConsumer類后可重寫基類的部分方法來實現消息獲取以及當前消費者各個狀態變更的事件,本文的示例代碼即采用這種方式實現消費者對象。

實例化EventingBasicConsumer對象

這種方式采取注冊事件的方式接受消息發布者推送到隊列的消息,代碼如下:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => {
     string message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine($"接受到消息為:{message}");
 };  

消費端確認消息已處理有兩種方式

  • 自動確認
  • 手動確認

自動確認

自動確認我理解的就是服務端認為你接受到消息后即確認了,但是如果當你拿到消息后,依賴此消息的業務邏輯未處理完畢,但是卻中途異常了的話,此消息也會消失掉!所以建議消費端采取手動確認!

手動確認

手動確認可以完美解決上面自動確認出現的問題,但是它也意味着我們開發者需要對確認的流程進行一個完整的閉環。即所有的消息在消費端獲取到后必須有一個明確的結果返回給服務端(Broker)。 我們對消息的處理結果要么是確認處理,要么是拒絕該消息(返回給Broker,再分發給其他消費者)。如果我們沒有對消息接收后進行任何反饋的話該條消息在隊列的狀態會變成Unacked 直到我們消費端AMQP連接中斷后該消息狀態又會變成Ready。狀態為Unacked的消息會導致所有消費者都無法對該消息進行二次消費(包含當前消費者),所以此類消息越多則占用的內存資源也會越多。當消息變回Ready也會很煩人,因為我們已經對該消息進行過一次處理了,如若我們沒有對消息進行校驗則又會執行一遍。 所以手動確認必須執行回執!!!!!!

下面是手動確認消息的代碼:

        private void ReceiveMessage(RabbitMQConnectionDTO connectionDTO, string exchangeName, string queueName, string routtingKey)
        {
            try
            {
                var factory = new ConnectionFactory
                {
                    HostName = connectionDTO.HostName,
                    Password = connectionDTO.Password,
                    UserName = connectionDTO.UserName,
                    Port = connectionDTO.Port,
                };

                UseDefaultBasicConsumerType(factory, queueName);
                //DirectAcceptExchangeEvent(factory, exchangeName, queueName, routtingKey);
            }
            catch (Exception ex)
            {
                Rtx_SendContext.Text = $"出現異常:{ex.Message}";
            }
        }

        private void UseDefaultBasicConsumerType(ConnectionFactory factory, string queueName)
        {
            var connection = factory.CreateConnection();
            _channel = connection.CreateModel();
            // accept only one unack-ed message at a time
            // uint prefetchSize, ushort prefetchCount, bool global
            _channel.BasicQos(0, 1, false);

            //定義一個繼承了DefaultBasicConsumer類的消費類(DefaultBasicConsumer是繼承了IBasicConsumer接口的一個基類,里面存在許多可重寫的方法)
            MessageReceiver messageReceiver = new MessageReceiver(_channel, (string msg, ulong deliveryTag) =>
            {
                string key = Txt_Key.Text.Trim();
                string keyNoReturn = Txt_KeyNoReturn.Text.Trim();
                bool isExecFlag = false;
                if (!string.IsNullOrWhiteSpace(key) && msg.StartsWith(key)) // 這里要小心 如果只有當前1個消費者那你懂的~~~~~~
                    _channel.BasicReject(deliveryTag, requeue: true); //requeue表示消息被拒絕后是否重新放回queue中
                else if (!string.IsNullOrWhiteSpace(keyNoReturn) && msg.StartsWith(keyNoReturn))
                    _channel.BasicReject(deliveryTag, requeue: false); //requeue表示消息被拒絕后是否重新放回queue中
                else
                {
                    _channel.BasicAck(deliveryTag, multiple: false); //確認已處理消息 multiple表示是否確認多條
                    isExecFlag = true;
                }
                BeginInvoke(new Action(() => { Rtx_SendContext.Text = Rtx_SendContext.Text + "\r" + $"處理標識{isExecFlag.ToString()} " + string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msg); }));
            });
            _channel.BasicConsume(queueName, false, messageReceiver); //不開啟自動確認回執
        }

這里的消費者對象通過繼承DefaultBasicConsumer對象而實現

代碼如下:

    public class MessageReceiver : DefaultBasicConsumer
    { 
        private readonly Logger _logger;
        private readonly Action<string, ulong> _action;
        public MessageReceiver(IModel channel, Action<string, ulong> action)
        {
            _action = action; 
            _logger = LogManager.GetCurrentClassLogger();
        }

        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
        {
            string msg = Encoding.UTF8.GetString(body);
            _logger.Debug($"***************************Consuming Topic Message  時間:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}*********************************");
            _logger.Debug(string.Concat("Message received from the exchange ", exchange));
            _logger.Debug(string.Concat("Consumer tag: ", consumerTag));
            _logger.Debug(string.Concat("Delivery tag: ", deliveryTag));
            _logger.Debug(string.Concat("Routing tag: ", routingKey));
            _logger.Debug(string.Concat("Message: ", msg));
            _action?.Invoke(msg, deliveryTag);
        }

        /// <summary>
        /// 捕獲通道連接的關閉事件
        /// </summary>
        /// <param name="model"></param>
        /// <param name="reason"></param>
        public override void HandleModelShutdown(object model, ShutdownEventArgs reason)
        {
            _logger.Debug($"進入MessageReceiver.HandleModelShutdown方法");
            base.HandleModelShutdown(model, reason);
        }

        public override void HandleBasicConsumeOk(string consumerTag)
        {
            _logger.Debug($"進入MessageReceiver.HandleBasicConsumeOk方法 consumerTag:{consumerTag}");
            base.HandleBasicConsumeOk(consumerTag);
        }

        /// <summary>
        ///  刪除隊列 會進入
        /// </summary>
        /// <param name="consumerTag"></param>
        public override void HandleBasicCancel(string consumerTag)
        {
            _logger.Debug($"進入MessageReceiver.HandleBasicCancel方法 consumerTag:{consumerTag}");
            base.HandleBasicCancel(consumerTag);
        }
    }

上面有引用到的_logger和_action先不用管,重點是_channel.BasicReject方法和_channel.BasicAck方法。他們分別是代表拒絕消費和確認消費。

上面的代碼是消費者消費數據后給予了Broker明確的回執,我們試下將回執代碼注釋掉后看下隊列里的消息變成什么樣子了。 先刪除掉交換器和隊列,然后再發布數據,看看消費數據后不回執的消息狀態~
這是發布消息到隊列后,Ready狀態的消息為1條

 接下來,我們去消費數據但是不進行回執確認,看看結果如何

 如上圖,還是那條數據,狀態從Ready變成了Unacked,這時候是因為我的消費端應用還沒關閉,AMQP的鏈接也還在。我們到任務管理器內將消費應用關閉~

關閉后又變成了Ready, 意味着我們再次開啟消費端程序又可以從隊列獲取到之前的消息了~
我們將上面的回執代碼部分注釋取消,看看回執成功后隊列內的消息狀態是什么樣?

 可以看到回執確認后,我們的消息就從隊列里移除了~

消息消費確認機制結論

1、自動確認雖然省代碼但是可能會出現消息丟失業務未處理完畢的情況;
2、手動確認消息則是在獲取到消息后,在沒有返回回執前,消息會一致存儲在隊列

 

本文對應的代碼已上傳至Github,地址:https://github.com/QQ897878763/RabbitMQ_Sample

 程序的運行截圖如下:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM