原文:http://www.cnblogs.com/williamwsj/p/8108970.html
參考文獻:https://www.rabbitmq.com/dlx.html
死信,顧名思義,就是死掉的消息,死掉的消息是會被一般的隊列丟棄的。如果這些消息很重要,而我們又需要,怎么辦?凡事都有一個退路,現在就有一種方法可將這些死信消息存下來,那就是DLX(Dead Letter Exchanges)。DLX是專門用來存儲死信消息到指定隊列中的一種交換機。需要在聲明隊列時指定DLX和死信存放隊列的路由Key,因為RabbitMQ是通過Exchange去匹配路由key尋找隊列的。
死信消息是怎么產生的呢?
1、消息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);
2、消息在隊列中過期,即當前消息在隊列中的存活時間已經超過了預先設置的TTL(Time To Live)時間;
3、當前隊列中的消息數量已經超過最大長度。
消息進入死信隊列的過程:消息 -> 隊列 (觸發以上條件)-> DLX交換機 -> DLK隊列
以上是對死信及其去向的介紹,接下來上代碼!
var factory = new ConnectionFactory() { HostName = "localhost",UserName="ty2017",Password="123456",VirtualHost="log" };
using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) {
//聲明一個direct類型的交換機,用來當做某個隊列的死信交換機(DLX)
channel.ExchangeDeclare("e.log.dead", //交換機名稱
"direct"); //交換機類型
//聲明一個隊列,用來存放死信消息
channel.QueueDeclare(queue: "q.log.dead",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
//聲明一個隊列,並指定該隊列的DLX和死信路由key,且還需要設置TTL(消息存活時間)
channel.QueueDeclare(queue: "q.log.error",
durable: false,
exclusive: false,
autoDelete: false,
arguments: new Dictionary<string, object> {
{ "x-dead-letter-exchange","e.log.dead"}, //設置當前隊列的DLX
{ "x-dead-letter-routing-key","dead"}, //設置DLX的路由key,DLX會根據該值去找到死信消息存放的隊列
{ "x-message-ttl",10000} //設置消息的存活時間,即過期時間
});
//將DLX和死信存放隊列綁定,並產生一個路由key
channel.QueueBind("q.log.dead", "e.log.dead", "dead");
//綁定消息隊列
channel.QueueBind("q.log.error", //隊列名稱
"e.log", //交換機名稱
"log.error"); //自定義的RoutingKey
//需要發送的消息
var body = Encoding.UTF8.GetBytes(param.Data);
var properties = channel.CreateBasicProperties();
//設置消息持久化
properties.SetPersistent(true);
//發布消息
channel.BasicPublish(exchange: "e.log",
routingKey: "log.error",
basicProperties: properties,
body: body);
}
}
解釋一下代碼:
1.有一個名為q.log.error的消息隊列,該隊列中消息的過期時間為10000毫秒;
2.有一個名為e.log.dead的交換機(Exchange),其類型是direct;
3.有一個名為q.log.dead的消息隊列,等會我們會將死信消息存入該隊列中;
4.將e.log.dead交換機和q.log.dead隊列綁定,並產生一個名為dead的RoutingKey;
5.指定e.log.dead交換機為隊列q.log.error的DLX(死信交換機);
6.指定dead為DLX的RoutingKey;
7.如果q.log.error隊列中產生了死信,RabbitMQ則會將死信消息發送給e.log.dead交換機,該交換機再通過dead(設置的DLX的RoutingKey)去找到q.log.dead隊列,並將該死信消息存放進去。
其實DLX是一個普通的交換機,只是我們在聲明隊列的時候,將其指定為該隊列的死信交換機(x-dead-letter-exchange)而已,也就是死信消息的去處。有交換機就需要有一個RoutingKey,因此,我們在設置DLX的時候,也需要一並設置x-dead-letter-routing-key參數。
讓我們看一下web管理工具中的情況:
消息進入死信隊列的過程:消息 -> q.log.error隊列 (過期TTL)-> e.log.dead交換機DLX (通過DLK) -> q.log.dead隊列
將q.log.dead隊列綁定到e.log.dead交換機
在web管理工具中創建q.log.error隊列時設置屬性 :
x-message-ttl(消息超時時間TTL)
x-dead-letter-exchange=e.log.dead(對應死信交換機DXL)
x-dead-letter-routing-key=q.log.dead(進入死信交換機后的路由鍵,即死信最終路由到的隊列DLK)
消息過期之前,消息保留在q.log.error隊列中。TTL表示當前隊列設置了消息過期時間,DLX表示當前隊列設置了死信交換機,DLK表示當前隊列設置了死信交換機對應的RoutingKey(消息發到死信交換機后,路由到哪個queue中)。具體信息如下圖所示:
消息過期之后就變為了死信消息,消息就通過e.log.dead交換機跑到了q.log.dead隊列中。如下圖所示:
web管理工具中測試發送接收
發送:
Exchange頁面點擊相應的交換機,進入交換機信息頁面,找到 Publish message,設置 路由鍵+內容(Payload)發送消息進入此交換機
接收:
Queues頁面點擊路由鍵對應的接收隊列(先綁定交換機),進入隊列信息頁面,找到 Get messages,設置ack模式點擊Get message,顯示接收的消息內容(四種ack模式依次:Nack+重新入隊;ACK;拒絕+重新入隊;拒絕+不重新入隊)
重新入隊會放回隊列首,需要謹慎避免無法消費造成死循環
https://blog.csdn.net/shanchahua123456/article/details/84328729