RabbitMQ死信隊列+延時隊列


原文:http://www.cnblogs.com/williamwsj/p/8108970.html

參考文獻:https://www.rabbitmq.com/dlx.html

死信,顧名思義,就是死掉的消息,死掉的消息是會被一般的隊列丟棄的。如果這些消息很重要,而我們又需要,怎么辦?凡事都有一個退路,現在就有一種方法可將這些死信消息存下來,那就是DLX(Dead Letter Exchanges)。DLX是專門用來存儲死信消息到指定隊列中的一種交換機。需要在聲明隊列時指定DLX和死信存放隊列的路由Key,因為RabbitMQ是通過Exchange去匹配路由key尋找隊列的。

死信消息是怎么產生的呢?

1、消息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);

2、消息在隊列中過期,即當前消息在隊列中的存活時間已經超過了預先設置的TTL(Time To Live)時間;

3、當前隊列中的消息數量已經超過最大長度。

消息進入死信隊列的過程:消息 -> 隊列 (觸發以上條件)-> DLX交換機 -> DLK隊列

以上是對死信及其去向的介紹,接下來上代碼!

 

 var factory = new ConnectionFactory() { HostName = "localhost",UserName="ty2017",Password="123456",VirtualHost="log" };
            using (var connection = factory.CreateConnection()) {
                using (var channel = connection.CreateModel()) {                    
                    //聲明一個direct類型的交換機,用來當做某個隊列的死信交換機(DLX)
                    channel.ExchangeDeclare("e.log.dead", //交換機名稱
                                            "direct"); //交換機類型
                    //聲明一個隊列,用來存放死信消息
                    channel.QueueDeclare(queue: "q.log.dead",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //聲明一個隊列,並指定該隊列的DLX和死信路由key,且還需要設置TTL(消息存活時間)
                    channel.QueueDeclare(queue: "q.log.error",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: new Dictionary<string, object> {
                                         { "x-dead-letter-exchange","e.log.dead"}, //設置當前隊列的DLX
                                         { "x-dead-letter-routing-key","dead"}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
                                         { "x-message-ttl",10000} //設置消息的存活時間,即過期時間
                                         });
                                         
                    //將DLX和死信存放隊列綁定,並產生一個路由key
                    channel.QueueBind("q.log.dead", "e.log.dead", "dead");
                    //綁定消息隊列
                    channel.QueueBind("q.log.error", //隊列名稱
                                      "e.log",      //交換機名稱
                                      "log.error");  //自定義的RoutingKey
                    //需要發送的消息
                    var body = Encoding.UTF8.GetBytes(param.Data);

                    var properties = channel.CreateBasicProperties();
                    //設置消息持久化
                    properties.SetPersistent(true);
                    //發布消息
                    channel.BasicPublish(exchange: "e.log",
                                         routingKey: "log.error",
                                         basicProperties: properties,
                                         body: body);
                }
            }

 

解釋一下代碼

1.有一個名為q.log.error的消息隊列,該隊列中消息的過期時間為10000毫秒;

2.有一個名為e.log.dead的交換機(Exchange),其類型是direct;

3.有一個名為q.log.dead的消息隊列,等會我們會將死信消息存入該隊列中;

4.將e.log.dead交換機和q.log.dead隊列綁定,並產生一個名為dead的RoutingKey;

5.指定e.log.dead交換機為隊列q.log.error的DLX(死信交換機);

6.指定dead為DLX的RoutingKey;

7.如果q.log.error隊列中產生了死信,RabbitMQ則會將死信消息發送給e.log.dead交換機,該交換機再通過dead(設置的DLX的RoutingKey)去找到q.log.dead隊列,並將該死信消息存放進去。

其實DLX是一個普通的交換機,只是我們在聲明隊列的時候,將其指定為該隊列的死信交換機(x-dead-letter-exchange)而已,也就是死信消息的去處。有交換機就需要有一個RoutingKey,因此,我們在設置DLX的時候,也需要一並設置x-dead-letter-routing-key參數。

 

讓我們看一下web管理工具中的情況:

消息進入死信隊列的過程:消息 -> q.log.error隊列 (過期TTL)-> e.log.dead交換機DLX (通過DLK) -> q.log.dead隊列

將q.log.dead隊列綁定到e.log.dead交換機

在web管理工具中創建q.log.error隊列時設置屬性 : 

x-message-ttl(消息超時時間TTL)

x-dead-letter-exchange=e.log.dead(對應死信交換機DXL)

x-dead-letter-routing-key=q.log.dead(進入死信交換機后的路由鍵,即死信最終路由到的隊列DLK)

 

消息過期之前,消息保留在q.log.error隊列中。TTL表示當前隊列設置了消息過期時間,DLX表示當前隊列設置了死信交換機,DLK表示當前隊列設置了死信交換機對應的RoutingKey(消息發到死信交換機后,路由到哪個queue中)。具體信息如下圖所示:

 

消息過期之后就變為了死信消息,消息就通過e.log.dead交換機跑到了q.log.dead隊列中。如下圖所示:

  web管理工具中測試發送接收

發送:

Exchange頁面點擊相應的交換機,進入交換機信息頁面,找到 Publish message,設置 路由鍵+內容(Payload)發送消息進入此交換機

接收:

Queues頁面點擊路由鍵對應的接收隊列(先綁定交換機),進入隊列信息頁面,找到 Get messages,設置ack模式點擊Get message,顯示接收的消息內容(四種ack模式依次:Nack+重新入隊;ACK;拒絕+重新入隊;拒絕+不重新入隊)

重新入隊會放回隊列首,需要謹慎避免無法消費造成死循環

https://blog.csdn.net/shanchahua123456/article/details/84328729

 

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM