C# RabbitMQ 失敗消息重新投遞 設置消息投遞次數與間隔時間 死信隊列使用方法


1.1、了解 RabbitMQ 失敗消息重新投遞機制

《Rabbit 失敗消息重新投遞機制》

 

1.2、了解什么是消息確認機制

  • MQ消息確認類似於數據庫中用到的 commit 語句,用於告訴broker本條消息是被消費成功了還是失敗了;
  • 平時默認消息在被接收后就被自動確認了,需要在創建消費者時、設置 autoAck: false 即可使用手動確認模式;

1.3、了解什么是死信隊列

  • 死信隊列是用於接收普通隊列發生失敗的消息,其原理與普通隊列相同;

> 失敗消息如:被消費者拒絕的消息、TTL超時的消息、隊列達到最大數量無法寫入的消息;

  • 死信隊列創建方法:

> 在創建普通隊列時,在參數"x-dead-letter-exchange"中定義失敗消息轉發的目標交換機;

> 再創建一個臨時隊列,訂閱"x-dead-letter-exchange"中指定的交換機;

> 此時的臨時隊列就能接收到普通隊列失敗的消息了;

> 可在消息的 Properties.headers.x-death 屬性中查詢到消息投遞源信息和消息被投遞的次數;

2、實現業務的模擬代碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
 
public class Program
{
    private static string _exchangeNormal = "Exchange.Normal";  //定義一個用於接收 正常 消息的交換機
    private static string _exchangeRetry = "Exchange.Retry";    //定義一個用於接收 重試 消息的交換機
    private static string _exchangeFail = "Exchange.Fail";      //定義一個用於接收 失敗 消息的交換機
    private static string _queueNormal = "Queue.Noraml";        //定義一個用於接收 正常 消息的隊列
    private static string _queueRetry = "Queue.Retry";          //定義一個用於接收 重試 消息的隊列
    private static string _queueFail = "Queue.Fail";            //定義一個用於接收 失敗 消息的隊列
 
    public static void Main()
    {
        var factory = new ConnectionFactory();
        {
            factory.HostName = "127.0.0.1";
            factory.Port = 5672;
            factory.VirtualHost = "VH1"; //選擇虛擬主機
            factory.UserName = "user1";
            factory.Password = "123456";
            factory.AutomaticRecoveryEnabled = true; //開啟自動恢復連接(默認為false,在異常中斷時無法自動恢復連接)
            //factory.ClientProvidedName = "測試程序",
            factory.ClientProperties.Add("connection_name", "測試程序");     //#為兼容3.5.7顯示客戶端名稱問題
            factory.ClientProperties.Add("tag", "測試程序");                 //#為解決5.2.0版本不顯示客戶端名稱問題
        }
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
 
        //聲明交換機
        channel.ExchangeDeclare(exchange: _exchangeNormal, type: "topic");
        channel.ExchangeDeclare(exchange: _exchangeRetry, type: "topic");
        channel.ExchangeDeclare(exchange: _exchangeFail, type: "topic");
 
        //定義隊列參數
        var queueNormalArgs = new Dictionary<string, object>();
        {
            queueNormalArgs.Add("x-dead-letter-exchange", _exchangeFail);   //指定死信交換機,用於將 Noraml 隊列中失敗的消息投遞給 Fail 交換機
        }
        var queueRetryArgs = new Dictionary<string, object>();
        {
            queueRetryArgs.Add("x-dead-letter-exchange", _exchangeNormal);  //指定死信交換機,用於將 Retry 隊列中超時的消息投遞給 Noraml 交換機
            queueRetryArgs.Add("x-message-ttl", 60000);                     //定義 queueRetry 的消息最大停留時間 (原理是:等消息超時后由 broker 自動投遞給當前綁定的死信交換機)
                                                                            //定義最大停留時間為防止一些 待重新投遞 的消息、沒有定義重試時間而導致內存溢出
        }
        var queueFailArgs = new Dictionary<string, object>();
        {
            //暫無
        }
 
        //聲明隊列
        channel.QueueDeclare(queue: _queueNormal, durable: true, exclusive: false, autoDelete: false, arguments: queueNormalArgs);
        channel.QueueDeclare(queue: _queueRetry, durable: true, exclusive: false, autoDelete: false, arguments: queueRetryArgs);
        channel.QueueDeclare(queue: _queueFail, durable: true, exclusive: false, autoDelete: false, arguments: queueFailArgs);
 
        //為隊列綁定交換機
        channel.QueueBind(queue: _queueNormal, exchange: _exchangeNormal, routingKey: "#");
        channel.QueueBind(queue: _queueRetry, exchange: _exchangeRetry, routingKey: "#");
        channel.QueueBind(queue: _queueFail, exchange: _exchangeFail, routingKey: "#");
 
        #region 創建一個普通消息消費者
        {
            var consumer = new EventingBasicConsumer(channel);
 
            consumer.Received += (sender, e) =>
            {
                var _sender = (EventingBasicConsumer)sender;            //消息傳送者
                var _channel = _sender.Model;                           //消息傳送通道
                var _message = (BasicDeliverEventArgs)e;                //消息傳送參數
                var _headers = _message.BasicProperties.Headers;        //消息頭
                var _content = Encoding.UTF8.GetString(_message.Body);  //消息內容
                var _death = default(Dictionary<string, object>);       //死信參數
 
                if (_headers != null && _headers.ContainsKey("x-death"))
                    _death = (Dictionary<string, object>)(_headers["x-death"] as List<object>)[0];
 
                try
                #region 消息處理
                {
                    Console.WriteLine();
                    Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.0)消息接收:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");
 
                    throw new Exception("模擬消息處理失敗效果。");
 
                    //處理成功時
                    Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.1)處理成功:\r\n\t[deliveryTag={_message.DeliveryTag}]");
 
                    //消息確認 (銷毀當前消息)
                    _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                }
                #endregion
                catch (Exception ex)
                #region 消息處理失敗時
                {
                    var retryCount = (long)(_death?["count"] ?? default(long)); //查詢當前消息被重新投遞的次數 (首次則為0)
 
                    Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(1.2)處理失敗:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[retryCount={retryCount}]");
 
                    if (retryCount >= 2)
                    #region 投遞第3次還沒消費成功時,就轉發給 exchangeFail 交換機
                    {
                        //消息拒絕(投遞給死信交換機,也就是上邊定義的 ("x-dead-letter-exchange", _exchangeFail))
                        _channel.BasicNack(deliveryTag: _message.DeliveryTag, multiple: false, requeue: false);
                    }
                    #endregion
                    else
                    #region 否則轉發給 exchangeRetry 交換機
                    {
                        var interval = (retryCount + 1) * 10; //定義下一次投遞的間隔時間 (單位:秒)
                                                                //如:首次重試間隔10秒、第二次間隔20秒、第三次間隔30秒
 
                        //定義下一次投遞的間隔時間 (單位:毫秒)
                        _message.BasicProperties.Expiration = (interval * 1000).ToString();
 
                        //將消息投遞給 _exchangeRetry (會自動增加 death 次數)
                        _channel.BasicPublish(exchange: _exchangeRetry, routingKey: _message.RoutingKey, basicProperties: _message.BasicProperties, body: _message.Body);
 
                        //消息確認 (銷毀當前消息)
                        _channel.BasicAck(deliveryTag: _message.DeliveryTag, multiple: false);
                    }
                    #endregion
                }
                #endregion
            };
            channel.BasicConsume(queue: _queueNormal, noAck: false, consumer: consumer);
        }
        #endregion
 
        #region 創建一個失敗消息消費者
        {
            var consumer = new EventingBasicConsumer(channel);
 
            consumer.Received += (sender, e) =>
            {
                var _message = (BasicDeliverEventArgs)e;                //消息傳送參數
                var _content = Encoding.UTF8.GetString(_message.Body);  //消息內容
 
                Console.WriteLine();
                Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t(2.0)發現失敗消息:\r\n\t[deliveryTag={_message.DeliveryTag}]\r\n\t[consumerID={_message.ConsumerTag}]\r\n\t[exchange={_message.Exchange}]\r\n\t[routingKey={_message.RoutingKey}]\r\n\t[content={_content}]");
            };
 
            channel.BasicConsume(queue: _queueFail, noAck: true, consumer: consumer);
        }
        #endregion
 
        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t正在運行中...");
 
        var cmd = default(string);
        while ((cmd = Console.ReadLine()) != "close")
        #region 模擬正常消息發布
        {
            var msgProperties = channel.CreateBasicProperties();
            var msgContent = $"消息內容_{DateTime.Now.ToString("HH:mm:ss.fff")}_{cmd}";
 
            channel.BasicPublish(exchange: _exchangeNormal, routingKey: "亞洲.中國.經濟", basicProperties: msgProperties, body: Encoding.UTF8.GetBytes(msgContent));
 
            Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t發送成功:{msgContent}");
            Console.WriteLine();
        }
        #endregion
 
        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t正在關閉...");
 
        channel.ExchangeDelete(_exchangeNormal);
        channel.ExchangeDelete(_exchangeRetry);
        channel.ExchangeDelete(_exchangeFail);
        channel.QueueDelete(_queueNormal);
        channel.QueueDelete(_queueRetry);
        channel.QueueDelete(_queueFail);
        //channel.Abort();
        channel.Close(200, "Goodbye!");
        channel.Dispose();
        connection.Close(200, "Goodbye!");
        connection.Dispose();
 
        Console.WriteLine($"{DateTime.Now.ToString("HH:mm:ss.fff")}\t運行結束。");
        Console.ReadKey();
    }
}

  

3、查看 RabbitMQ 后台管理端的顯示效果

《新建的3個交換機》
《新建的3個隊列》

 

《最終投遞失敗被轉移到 Queue.Fail 的消息內容》

4、查看程序模擬的顯示效果 

模擬重新投遞效果

 

 

原文地址: https://blog.csdn.net/rcr676/article/details/107687818

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM