摘自:https://www.cnblogs.com/toov5/p/10288260.html
關於RabbitMQ死信隊列
死信隊列 聽上去像 消息“死”了 其實也有點這個意思,死信隊列 是 當消息在一個隊列 因為下列原因:
消息被拒絕(basic.reject/ basic.nack)並且不再重新投遞 requeue=false
消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
隊列超載
變成了 “死信” 后 被重新投遞(publish)到另一個Exchange 該Exchange 就是DLX 然后該Exchange 根據綁定規則 轉發到對應的 隊列上 監聽該隊列 就可以重新消費 說白了 就是 沒有被消費的消息 換個地方重新被消費
生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者
什么是死信呢?什么樣的消息會變成死信呢?
消息被拒絕(basic.reject或basic.nack)並且requeue=false.
消息TTL過期
隊列達到最大長度(隊列滿了,無法再添加數據到mq中)
應用場景分析
在定義業務隊列的時候,可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被發送到該死信隊列上,這樣就方便我們查看消息失敗的原因了
死信隊列 聽上去像 消息“死”了 ,其實也有點這個意思,
死信隊列 是 當消息在一個隊列 因為下列原因:
1.消息被拒絕(basic.reject或basic.nack)並且requeue=false.
2.消息TTL過期
3.隊列達到最大長度(隊列滿了,無法再添加數據到mq中)
應用場景分析
在定義業務隊列的時候,可以考慮指定一個死信交換機,並綁定一個死信隊列,當消息變成死信時,該消息就會被發送到該死信隊列上,這樣就方便我們查看消息失敗的原因了
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丟棄消息
如何使用死信交換機呢?
定義業務(普通)隊列的時候指定參數
x-dead-letter-exchange: 用來設置死信后發送的交換機
x-dead-letter-routing-key:用來設置死信的routingKey
如果高並發情況到來 某一個隊列比如郵件隊列滿了 或者異常 或者消息過期 或者消費者拒絕消息

郵件隊列 綁定一個死信交換機 一旦郵件隊列滿了的情況下 為了防止數據丟失情況 消息不再郵件隊列存放了 放到死信交換機 然后交給死信郵件隊列 最終交給 死信消費者
步驟:
1、 創建 死信交換機 死信隊列 以及綁定
之前的隊列沒有綁定死信隊列和死信交換機 不能做更改綁定死信交互機
之前創建好的郵件隊列 刪除掉 已經創建好的隊列不能做更改 交換機也清理掉
config:
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
//Fanout 類型 發布訂閱模式
@Component
public class FanoutConfig {
/**
* 定義死信隊列相關信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信隊列 交換機標識符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信隊列交換機綁定鍵標識符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
// 郵件隊列
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
// 短信隊列
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
// fanout 交換機
private String EXCHANGE_NAME = "fanoutExchange";
// 1.定義郵件隊列
@Bean
public Queue fanOutEamilQueue() {
// 將普通隊列綁定到死信隊列交換機上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(FANOUT_EMAIL_QUEUE, true, false, false, args);
return queue;
}
// 2.定義短信隊列
@Bean
public Queue fanOutSmsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
// 2.定義交換機
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME);
}
// 3.隊列與交換機綁定郵件隊列
@Bean
Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange);
}
// 4.隊列與交換機綁定短信隊列
@Bean
Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange);
}
/**
* 創建配置死信郵件隊列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}
/*
* 創建死信交換機
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}
/*
* 死信隊列與死信交換機綁定
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}
}
生產者 timestamp 設置為0
package com.itmayiedu.rabbitmq;
import java.util.UUID;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
@Component
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(String queueName) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("email", "xx@163.com");
jsonObject.put("timestamp", 0);
String jsonString = jsonObject.toJSONString();
System.out.println("jsonString:" + jsonString);
// 設置消息唯一id 保證每次重試消息id唯一
Message message = MessageBuilder.withBody(jsonString.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "").build(); //消息id設置在請求頭里面 用UUID做全局ID
amqpTemplate.convertAndSend(queueName, message);
}
}

@RabbitListener(queues = "fanout_email_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 獲取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId);
JSONObject jsonObject = JSONObject.parseObject(msg);
Integer timestamp = jsonObject.getInteger("timestamp");
try {
int result = 1/timestamp;
System.out.println("result"+result);
// // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//拒絕消費消息(丟失消息) 給死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
}
System.out.println("執行結束....");
}

添加死信隊列的消費者,並啟動后:
package com.itmayiedu.rabbitmq;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
//死信隊列
@Component
public class FanoutDeadEamilConsumer {
@RabbitListener(queues = "dead_queue")
public void process(Message message, @Headers Map<String, Object> headers, Channel channel) throws Exception {
// 獲取消息Id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(), "UTF-8");
System.out.println("死信郵件消費者獲取生產者消息msg:"+msg+",消息id"+messageId);
// // 手動ack
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收
channel.basicAck(deliveryTag, false);
System.out.println("執行結束....");
}
}

