死信隊列的作用
死信交換機有什么用呢? 在創建隊列的時候 可以給這個隊列附帶一個交換機, 那么這個隊列作廢的消息就會被重新發到附帶的交換機,然后讓這個交換機重新路由這條消息。
死信消息產生的來源
- 消息被拒絕(basic.reject或basic.nack)並且requeue=false
- 消息TTL過期
- 隊列達到最大長度(隊列滿了,無法再添加數據到mq中)
死信隊列處理的方式
- 丟棄,如果不是很重要,可以選擇丟棄
- 記錄死信入庫,然后做后續的業務分析或處理
- 通過死信隊列,由負責監聽死信的應用程序進行處理
消息超時進入死信隊列
通俗的說,就是消息產生之后,因為設置了超時時間,在這段時間內消息沒有被消費就會被扔到死信隊列里面。
// 交換機名稱
private static final String DESTINATION_NAME = "rabbitMq_topic";
//消息隊列
private static final String queueName = "topic_queue";
//routingKey
private static final String routingKey = "topic.#";
//配置死信隊列
private static final String dlxExchangeName = "dlx.exchange";
private static final String dlxQueueName = "dlx.queue";
private static final String dlxRoutingKey = "#";
@Test
public void producer() throws IOException, TimeoutException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 為隊列設置隊列交換器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
// 設置隊列中的消息 60s 鍾后過期
arguments.put("x-message-ttl", 60000);
//正常生產者綁定交換機 參數1 交換機名稱 參數2 交換機類型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消費聲明隊列
channel.queueDeclare(queueName, true, false, false, arguments);
//消費者隊列綁定交換機 綁定路由件 路由鍵
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測試消息超時,傳遞到死信隊列";
// 創建死信交換器和隊列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生產者發送消息者
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.err.println("消息發送完成......");
}
只監聽了死信隊列的消息,正常消息無需監聽接收
/**
* 監聽死信隊列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創建通道
Channel channel = connection.createChannel();
System.out.println("死信消費者啟動 ..........");
Thread.sleep(65000);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信隊列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}
消息被退回
這個我在之前的整合SpringBoot的時候有實驗過。
channel.basicNack(envelope.getDeliveryTag(),false,false);
隊列達到最大長度
這個和消息超時差不多,只不過是設置了隊列的最大容量而已。
只需要把上面的代碼修改一下就可以了。
@Test
public void producer() throws IOException, TimeoutException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創建通道
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 為隊列設置隊列交換器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
//設置隊列長度為3
arguments.put("x-max-length", 3);
//正常生產者綁定交換機 參數1 交換機名稱 參數2 交換機類型
channel.exchangeDeclare(DESTINATION_NAME, "topic", true, false, null);
//消費聲明隊列
channel.queueDeclare(queueName, true, false, false, arguments);
//消費者隊列綁定交換機 綁定路由件 路由鍵
channel.queueBind(queueName, DESTINATION_NAME, routingKey);
String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 測試消息超時,傳遞到死信隊列";
// 創建死信交換器和隊列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);
//生產者發送消息者
for (int i = 0; i < 5; i++) {
channel.basicPublish(DESTINATION_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());
}
System.out.println("消息發送完成......");
}
@Test
public void consumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創建通道
Channel channel = connection.createChannel();
//此處設置一次只消費1個,且必須是ASK之后的消息才能算
channel.basicQos(1);
System.out.println("消費者啟動 ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("正常隊列:" + new String(body));
System.out.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, consumer);
}
/**
* 監聽死信隊列
*
* @throws IOException
* @throws TimeoutException
* @throws InterruptedException
*/
@Test
public void dlxConsumer() throws IOException, TimeoutException, InterruptedException {
//獲取連接
Connection connection = MQConnectionUtils.newConnection();
//創建通道
Channel channel = connection.createChannel();
System.out.println("死信消費者啟動 ..........");
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("死信隊列接收到消息:" + new String(body));
System.err.println("deliveryTag:" + envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(dlxQueueName, consumer);
}