一、死信隊列
死信,顧名思義就是無法被消費的消息,一般來說 Producer 將消息投遞到 broker 或者直接丟到 queue 中,Consumer 從 Queue 中取出消息進行消費,但是某些時候由於特定的原因導致 Queue 中的某些消息無法被消費,這樣的消息如果沒有后續的處理就變成了死信,有死信自然就有了死信隊列
死信隊列有其特殊的應用場景,例如用戶在商城下單成功並點擊去支付的時候,如果在指定的時間內未支付,那么就可以將該下單消息投遞到死信隊列中,至於后續怎么處理死信隊列需要結合具體的應用場景
二、死信的來源
通常死信的來源有下面幾種方式
1、消息 TTL (Time To Live) 過期
2、隊列達到了最大長度,無法再添加消息到 MQ 中了
3、消息被拒,並且沒有重新入隊(basic.reject || basic.Nack) && (requeue = false)

三、消息 TTL 過期
1、Consumer01
public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String NORMAL_ROUTING_KEY = "normal";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明正常消息的交換機(類型為 direct)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
// 正常消息交換機綁定正常消息隊列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
// 消息成功之后的回調
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消費者的回調
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消費者時的回調接口");
};
// 消費者消費消息
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
System.out.println("Consumer01 開始消費消息");
}
}
2、Consumer02
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_QUEUE = "dead_queue";
private static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道對象
Channel channel = RabbitmqUtils.getChannel();
// 聲明死信交換機(topic 類型)
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 聲明死信隊列
channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
// 死信交換機綁定死信隊列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
// 消息成功之后的回調
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消費者的回調
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消費者時的回調接口");
};
// 消費者消費消息
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);
System.out.println("Consumer02 開始消費消息");
}
}
3、Producer
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_ROUTING_KEY = "normal";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明一個 direct 類型的交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 消息發送 10 s 之后,如果沒有消費者進行消費,那么該消息就稱為死信,它就會進入死信隊列中
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
// 待發送的消息
String message = "我是一只機智的小毛毛,很可愛,很機智";
for (int i = 1; i < 11; i++) {
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, properties, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully...");
}
}
4、測試過程及結果
啟動 Consumer01 將普通交換機、普通隊列注冊到 RabbitMQ 上,啟動 Consumer02 將死信交換機、死信隊列注冊到 RabbitMQ 上


然后為了演示消息超時之后可以進入死信隊列,我們關閉 Consumer01,模擬其接收不到消息,為了不讓死信消息被消費者消費掉,我們關閉 Consumer02,然后啟動生產者 Producer

10 s 之后普通隊列里的消息進入死信隊列中

接着啟動消費者 Consumer02 消費掉死信隊列中的消息

四、隊列達到最大長度
1、Consumer01
public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String NORMAL_ROUTING_KEY = "normal";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明正常消息的交換機(類型為 direct)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// 設置正常隊列的最大長度
arguments.put("x-max-length",6);
channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
// 正常消息交換機綁定正常消息隊列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
// 消息成功之后的回調
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody());
System.out.println(msg);
};
// 取消消費者的回調
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消費者時的回調接口");
};
// 消費者消費消息
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, cancelCallback);
System.out.println("Consumer01 開始消費消息");
}
}
2、Consumer02 代碼不變
3、Producer
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_ROUTING_KEY = "normal";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明一個 direct 類型的交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 待發送的消息
String message = "我是一只機智的小毛毛,很可愛,很機智";
for (int i = 1; i < 11; i++) {
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully...");
}
}
4、測試過程及結果
刪除掉原先的正常交換機、正常隊列、死信交換機、死信隊列,然后按照上面的方式啟動 Consumer01、Consumer02 重新注冊正常交換機、正常隊列、死信交換機、死信隊列,接着關閉 Consumer01、Consumer02,最后啟動 Producer 發送消息(如果 Consumer01 是一直打開的情況下,正常隊列的消息就不會堆積到 6 條)

啟動 Consumer01、Consumer02,發現 Consumer01 消費了 6 條消息,Consumer02 消費了四條消息

五、消息被拒
1、Consumer01
public class Consumer01 {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_QUEUE = "normal_queue";
private static final String NORMAL_ROUTING_KEY = "normal";
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明正常消息的交換機(類型為 direct)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 正常隊列關聯死信交換機(正常隊列出現了故障之后,消息就會通過死信交換機傳遞到死信隊列中)
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
channel.queueDeclare(NORMAL_QUEUE, true, false, false, arguments);
// 正常消息交換機綁定正常消息隊列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, arguments);
// 消息成功之后的回調
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody());
if (msg.contains("很機智4")) {
System.out.println("Consumer01 接收到消息" + msg + "並拒絕簽收該消息");
//requeue 設置為 false 代表拒絕重新入隊 該隊列如果配置了死信交換機將發送到死信隊列中
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("Consumer01 接收到消息" + msg);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
// 取消消費者的回調
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消費者時的回調接口");
};
// 消費者消費消息(一定要開啟手動應答,如果你開啟了自動應答,根本不存在拒絕消息的情況)
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);
System.out.println("Consumer01 開始消費消息");
}
}
2、Consumer02
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_QUEUE = "dead_queue";
private static final String DEAD_ROUTING_KEY = "dead";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道對象
Channel channel = RabbitmqUtils.getChannel();
// 聲明死信交換機(topic 類型)
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 聲明死信隊列
channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
// 死信交換機綁定死信隊列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
// 消息成功之后的回調
DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
String msg = new String(message.getBody());
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
System.out.println(msg);
};
// 取消消費者的回調
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消費者時的回調接口");
};
// 消費者消費消息
channel.basicConsume(DEAD_QUEUE, false, deliverCallback, cancelCallback);
System.out.println("Consumer02 開始消費消息");
}
}
3、Producer
public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String NORMAL_ROUTING_KEY = "normal";
public static void main(String[] args) throws Exception {
// 自定義工具類獲取信道
Channel channel = RabbitmqUtils.getChannel();
// 聲明一個 direct 類型的交換機
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 待發送的消息
String message = "我是一只機智的小毛毛,很可愛,很機智";
for (int i = 1; i < 11; i++) {
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes(StandardCharsets.UTF_8));
}
System.out.println("Producer send message successfully...");
}
}
4、測試過程及結果
刪除掉原先的正常交換機、正常隊列、死信交換機、死信隊列,然后重新啟動 Consumer01、Consumer02 注冊正常交換機、正常隊列、死信交換機、死信隊列,接着關閉 Consumer02,啟動 Producer 發送消息
這里有幾點需要注意一下
1、因為只有被拒絕的消息才能進入死信隊列中,所以 Consumer01 不能關閉,為了能看到死信隊列里的消息,不讓它被消費掉,所以需要關閉 Consumer02
2、Consumer01 一定要開啟手動確認,因為自動確認的場景下根本不存在消息被拒絕的情況

打開死信隊列查看被拒絕的消息
啟動 Consumer02 消費死信消息

