死信隊列是什么
死信,Dead Letter,一種消息機制,當消費者去消費隊列中的消息時,如果隊列中的消息出現了以下的情況:
- 消費端執行nack或者reject時,設置requeue=false;
- 消息在隊列中的時間超過設置的TTL(Time To Live)時間;
- 隊列中消息的數量超過設置的最大數量;
那么這些消息就可以被稱之為死信消息,在配置了死信隊列的情況下,死信消息會進入死信隊列,如果沒有配置死信隊列,這些死信消息會被丟棄。
理解死信隊列
死信隊列並不僅僅只是一個queue,還包含死信交換機(Dead Letter Exchange),關於死信隊列和死信交換機要說明幾點:
死信交換機可以是fanout、direct、topic等類型,和普通交換機並無不同;
死信交換機要綁定要業務隊列上才會生效;
給死信交換機綁定的隊列稱之為死信隊列,其實就是普通的隊列,沒有任何特殊之處;
並不是整個項目只能設置一個死信交換機和死信隊列,可以根據業務需要設置多個或者單個死信交換機使用不同的routing-key;
代碼示例
配置文件
spring:
rabbitmq:
addresses: 127.0.0.1:5672
username: lzm
password: lzm
virtual-host: test
listener:
simple:
acknowledge-mode: manual # 手動ack
default-requeue-rejected: false # 設置為false,requeue或reject
創建交換機和隊列以及綁定
/**
* 死信交換機
*/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(dlxExchangeName);
}
/**
* 死信隊列
*/
@Bean
public Queue dlxQueue(){
return new Queue(dlxQueueName);
}
/**
* 死信隊列綁定死信交換機
*/
@Bean
public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey);
}
/**
* 業務隊列
*/
@Bean
public Queue queue(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",dlxExchangeName);//聲明當前隊列綁定的死信交換機
params.put("x-dead-letter-routing-key",dlxRoutingKey);//聲明當前隊列的死信路由鍵
params.put("x-message-ttl",10000);//設置隊列消息的超時時間,單位毫秒,超過時間進入死信隊列
params.put("x-max-length", 10);//生命隊列的最大長度,超過長度的消息進入死信隊列
return QueueBuilder.durable(queueName).withArguments(params).build();
}
/**
* 業務交換機
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(exchangeName,true,false);
}
/**
* 業務隊列和業務交換機的綁定
*/
@Bean
public Binding binding(Queue queue, FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
注意創建業務隊列的部分,設置業務隊列的超時時間是10s,隊列中消息最大數量為10。
上面代碼中,業務交換機為fanout類型的交換機,死信交換機為Direct類型的交換機。
生產者
public void send(){
for (int i = 0; i < 5; i++) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchangeName,"","消息==>"+i,message -> {
message.getMessageProperties().setExpiration(3000+"");//發送消息時設置消息的超時時間
return message;
},correlationData);
}
}
注意:
隊列中消息的超時時間可以是在創建隊列時設置,表示對隊列中所有的消息生效,也可以在發送消息時設置,兩者相比取最小值作為TTL的值。
先不啟動消費者,此時啟動生產者並向其中發送消息,剛發送完消息時如下所示:
三秒后消息自動進入死信隊列中
這也就驗證了上述所說的,當消息在隊列中的時間超過TTL的時間時,消息會自動進入死信隊列。針對這一特性,可以給消息設置過期時間后發送到某個隊列,從而來進行延遲消費
注意看上圖的紅框中的內容:
Lim:表示設置了隊列中消息數量x-max-length參數
DLX:表示設置了死信交換機x-dead-letter-exchange參數
DLK:表示設置了死信路由鍵x-dead-letter-routing-key參數,不設置該值時,消息在進入死信隊列后,路由鍵保持原來的不變,設置了該值,消息的路由鍵就變為新設置的值。
下面我們啟動消費者,並且模擬在某些情況下執行nack操作,先看消費者代碼
@RabbitHandler
@RabbitListener(queues = {"${platform.queue-name}"},concurrency = "1")
public void msgConsumer(String msg, Channel channel, Message message) throws IOException {
try {
if(msg.indexOf("5")>-1){
throw new RuntimeException("拋出異常");
}
log.info("消息{}消費成功",msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("接收消息過程中出現異常,執行nack");
//第三個參數為true表示異常消息重新返回隊列,會導致一直在刷新消息,且返回的消息處於隊列頭部,影響后續消息的處理
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error("消息{}異常",message.getMessageProperties().getHeaders());
}
}
當消息中包含5時拋出異常,執行nack,其他消息都執行ack,生產者發送0-9共10條消息,執行結果如下:
同時查看死信隊列中的數據,確實只有1條
並且消息的交換機以及路由鍵都是我們在代碼中設置好的值
同時消息的headers中也會將進入死信隊列的原因以及次數等進行說明
也就是說在執行nack,同時設置requeue=false時,消息會自動進入死信隊列
最后我們再測試一下最大數量的問題,前面我們設置隊列中最大數量是10,此時關閉消費者,同時刪除隊列的TTL,然后發送20條數據到業務隊列中
可以看到業務隊列和死信隊列各有10條數據,也就是說隊列中的消息數量超過設置的最大數量時,消息會進入死信隊列
總結
死信交換機和死信隊列都只是普通的交換機和隊列,只不過被用來處理死信消息,而死信消息的產生是由於TTL過期或者隊列中的消息數超過最大消息數,再或者時消費端reject或者nack消息時設置了requeue=false,消息變為死信后,由死信交換機路由到死信隊列,再由專門的消費者消費死信隊列中的消息。
死信隊列更多的是用來保證消息的可靠性,主要用於比較重要的隊列,用以確保未被正確消費的消息不會丟失,其實也可以不用死信隊列,在消費端出現異常時,可以將消息從當前隊列ack掉,再將其發送到其他隊列,然后再單獨處理其他隊列,這都是可以的。
本節測試代碼參考碼雲.