RabbitMQ筆記-死信隊列與延時隊列


死信隊列:

DLX(Dead-Letter-Exchange),可以稱為死信交換器。當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱為死信隊列。

消息變成死信隊列有下面幾個情況:

  • 消息被拒絕(channel.basicNack或channel.basicReject),並且設置requeue參數為false;
  • 消息過期;
  • 隊列達到最大長度。

死信隊列案例:

 private void btnBasicPublish_Click(object sender, EventArgs e)
        {
            string exchangeName = "myexchange1";
            string queueName = "myqueue3";
            string bindingKey = "myroutingkey3";
            string routingKey = "myroutingkey3";
            //創建死信隊列
            this.CreateDeadQueue();
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                channel.QueueDeclare(queueName, true, false, false,
                    new Dictionary<string, object> {
                        { "x-message-ttl", 5000 },//過期時間5s
                    { "x-dead-letter-exchange", DEAD_EXCHANGE_NAME },//死信隊列交換機
                    { "x-dead-letter-routing-key", DEAD_QUEUE_NAME }//死信隊列routingKey
                    });
                channel.QueueBind(queueName, exchangeName, bindingKey, null);

                //消息持久化
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;//相當於設置DeliveryMode=2

                //5:發布消息
                for (int i = 0; i < 5; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                    channel.BasicPublish(exchangeName, routingKey, properties, msg);
                }
            }
        }


/// <summary>
        /// 創建死信交換機、死信隊列、並綁定
        /// 死信交換機、隊列就是一個普通交換機、隊列
        /// </summary>
        private void CreateDeadQueue()
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(DEAD_EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
                channel.QueueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
                channel.QueueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME, null);
            }
        }

延時隊列

延時隊列存儲的對象是對應的延遲消息,延遲消息指當消息被發送到隊列后,並不立即被消費者拿到消息,而是等待特定的時間后,消費者才能拿到消息。
延時隊列需要安裝一個插件,下載地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

延時隊列應用場景有:

  • 在訂單系統中,用戶下單后,通常有30分鍾的付款時間。如果30分鍾后沒有付款,則訂單被取消,這里可以使用延時隊列處理訂單。
  • 用戶通過遠程控制家里的智能設備在指定時間進行工作,可以將用戶指定發送到延時隊列,到達時間后再推送到智能設備。
  • 在RabbitMQ中,延時隊列通過前面的DLX和TTL共同作用可以模擬出延遲隊列功能。

通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,並不是rabbitmq本身的

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

發送消息的時候通過在header添加”x-delay”參數來控制消息的延時時間

byte[] messages= "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", properties.build(), messages);

和死信隊列消費者監聽正常隊列不同,延時隊列中消費者監聽的是死信隊列。當消費在設置DLX和TTL后,發送到隊列中經過指定時候后變成死信,死信重新發送到死信隊列,而消費者監聽到死信隊列中有消息而進行消費,這樣就達到了消息的延時。

參考:
https://blog.csdn.net/yaomingyang/article/details/102753004


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM