一、死信隊列
死信隊列其實和普通的隊列沒啥大的區別,都需要創建自己的Queue
、Exchange
,然后通過RoutingKey
綁定到Exchange
上去,只不過死信隊列的RoutingKey
和Exchange
要作為參數,綁定到正常的隊列上去,一種應用場景是正常隊列里面的消息被basicNack
或者reject
時,消息就會被路由到正常隊列綁定的死信隊列中,還有一種還有常用的場景就是開啟了自動簽收,然后消費者消費消息時出現異常,超過了重試次數,那么這條消息也會進入死信隊列,如果配置了話,當然還有其他的應用場景,這里不一一討論。
1.1、死信隊列和交換器配置
這里有兩個隊列,正常的業務隊列emailQueue
和與之綁定的死信隊列
,這里只演示,手動簽收,消費者捕獲異常Nack
1.1.2、yml配置
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# 發送確認
publisher-confirms: true
# 路由失敗回調
publisher-returns: true
template:
# 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
mandatory: true
listener:
simple:
# 每次從RabbitMQ獲取的消息數量
prefetch: 1
default-requeue-rejected: false
# 每個隊列啟動的消費者數量
concurrency: 1
# 每個隊列最大的消費者數量
max-concurrency: 1
# 簽收模式為手動簽收-那么需要在代碼中手動ACK
acknowledge-mode: manual
#郵件隊列
email:
queue:
name: demo.email
#郵件交換器名稱
exchange:
name: demoTopicExchange
#死信隊列
dead:
letter:
queue:
name: demo.dead.letter
exchange:
name: demoDeadLetterTopicExchange
1.1.3、死信隊列配置
/**
* rabbitmq 配置
*
* @author DUCHONG
* @since 2020-08-23 14:05
**/
@Configuration
@Slf4j
public class RabbitmqConfig {
@Value("${email.queue.name}")
private String emailQueue;
@Value("${exchange.name}")
private String topicExchange;
@Value("${dead.letter.queue.name}")
private String deadLetterQueue;
@Value("${dead.letter.exchange.name}")
private String deadLetterExchange;
@Bean
public Queue emailQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 綁定死信交換機
arguments.put("x-dead-letter-exchange", deadLetterExchange);
// 綁定死信的路由key
arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");
return new Queue(emailQueue,true,false,false,arguments);
}
@Bean
TopicExchange emailExchange() {
return new TopicExchange(topicExchange);
}
@Bean
Binding bindingEmailQueue() {
return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
}
//私信隊列和交換器
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueue);
}
@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchange);
}
@Bean
Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
}
}
1.2、消息發送方
@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Scheduled(cron = "0 0/2 * * * ?")
public void sendEmailMessage() {
String msg = RandomStringUtils.randomAlphanumeric(8);
JSONObject email=new JSONObject();
email.put("content",msg);
email.put("to","duchong@qq.com");
CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
log.info("---發送 email 消息---{}---messageId---{}",email,correlationData.getId());
}
}
1.3、消息消費方
@Component
@Slf4j
public class MessageHandler {
/**
* 郵件消費者
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.email")
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主動異常
int m=1/0;
//手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleEmailMessage捕獲到異常,拒絕重新入隊---消息ID---{}",headers.get("spring_returned_message_correlation"));
//異常,ture 重新入隊,或者false,進入死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
/**
* 死信消費者,自動簽收開啟狀態下,超過重試次數,或者手動簽收,reject或者Nack
* @param message
*/
@RabbitListener(queues = "demo.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
//可以考慮數據庫記錄,每次進來查數量,達到一定的數量,進行預警,人工介入處理
log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
//回復ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
1.4、結果
二、延時隊列
延時隊列顧名思義,不是及時的隊列,也就是發送者發給的消息要延時一段時間,消費者才能接受的到,這里有個典型的應用場景就是訂單30分鍾內未支付就關閉訂單,當然死信隊列也是可以實現的,這里只演示消息的延時消費邏輯,訂單邏輯就一個判斷,這里不做討論。
2.1、延時隊列和交換器配置
使用延時隊列之前,需要先安裝延時隊列插件,安裝方法,前面已經介紹過了,這里放個鏈接
2.1.1、yml配置
spring:
rabbitmq:
host: 192.168.99.12
port: 5672
username: guest
password: guest
# 發送確認
publisher-confirms: true
# 路由失敗回調
publisher-returns: true
template:
# 必須設置成true 消息路由失敗通知監聽者,false 將消息丟棄
mandatory: true
#消費端
listener:
simple:
# 每次從RabbitMQ獲取的消息數量
prefetch: 1
default-requeue-rejected: false
# 每個隊列啟動的消費者數量
concurrency: 1
# 每個隊列最大的消費者數量
max-concurrency: 1
# 簽收模式為手動簽收-那么需要在代碼中手動ACK
acknowledge-mode: manual
#郵件隊列
email:
queue:
name: demo.email
#郵件交換器名稱
exchange:
name: demoTopicExchange
#死信隊列
dead:
letter:
queue:
name: demo.dead.letter
exchange:
name: demoDeadLetterTopicExchange
#延時隊列
delay:
queue:
name: demo.delay
exchange:
name: demoDelayTopicExchange
2.1.2、延時隊列配置
/**
* rabbitmq 配置
*
* @author DUCHONG
* @since 2020-08-23 14:05
**/
@Configuration
@Slf4j
public class RabbitmqConfig {
@Value("${email.queue.name}")
private String emailQueue;
@Value("${exchange.name}")
private String topicExchange;
@Value("${dead.letter.queue.name}")
private String deadLetterQueue;
@Value("${dead.letter.exchange.name}")
private String deadLetterExchange;
@Value("${delay.queue.name}")
private String delayQueue;
@Value("${delay.exchange.name}")
private String delayExchange;
@Bean
public Queue emailQueue() {
Map<String, Object> arguments = new HashMap<>(2);
// 綁定死信交換機
arguments.put("x-dead-letter-exchange", deadLetterExchange);
// 綁定死信的路由key
arguments.put("x-dead-letter-routing-key", deadLetterQueue+".#");
return new Queue(emailQueue,true,false,false,arguments);
}
@Bean
TopicExchange emailExchange() {
return new TopicExchange(topicExchange);
}
@Bean
Binding bindingEmailQueue() {
return BindingBuilder.bind(emailQueue()).to(emailExchange()).with(emailQueue+".#");
}
//私信隊列和交換器
@Bean
public Queue deadLetterQueue() {
return new Queue(deadLetterQueue);
}
@Bean
TopicExchange deadLetterExchange() {
return new TopicExchange(deadLetterExchange);
}
@Bean
Binding bindingDeadLetterQueue() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(deadLetterQueue+".#");
}
//延時隊列
@Bean
public Queue delayQueue() {
return new Queue(delayQueue);
}
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic");
//參數二為類型:必須是x-delayed-message
return new CustomExchange(delayExchange, "x-delayed-message", true, false, args);
}
@Bean
Binding bindingDelayQueue() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(delayQueue+".#").noargs();
}
}
2.2、消息發送方
30分鍾時間太久了,這里延時2分鍾來看效果
@Configuration
@EnableScheduling
@Slf4j
public class ScheduleController {
@Autowired
RabbitTemplate rabbitTemplate;
@Value("${exchange.name}")
private String topicExchange;
@Value("${delay.exchange.name}")
private String delayTopicExchange;
@Scheduled(cron = "0 0/1 * * * ?")
public void sendEmailMessage() {
String msg = RandomStringUtils.randomAlphanumeric(8);
JSONObject email=new JSONObject();
email.put("content",msg);
email.put("to","duchong@qq.com");
CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(topicExchange,"demo.email.x",email.toJSONString(),correlationData);
log.info("---發送 email 消息---{}---messageId---{}",email,correlationData.getId());
}
@Scheduled(cron = "0 0/1 * * * ?")
public void sendDelayOrderMessage() throws Exception{
//訂單號 id實際是保存訂單后返回的,這里用uuid代替
String orderId = UUID.randomUUID().toString();
// 模擬訂單信息
JSONObject order=new JSONObject();
order.put("orderId",orderId);
order.put("goodsName","vip充值");
order.put("orderAmount","99.00");
CorrelationData correlationData=new CorrelationData(orderId);
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(orderId);
//30分鍾時間太長,這里延時120s消費
messageProperties.setHeader("x-delay", 120000);
Message message = new Message(order.toJSONString().getBytes(CharEncoding.UTF_8), messageProperties);
rabbitTemplate.convertAndSend(delayTopicExchange,"demo.delay.x",message,correlationData);
log.info("---發送 order 消息---{}---orderId---{}",order,correlationData.getId());
//睡一會,為了看延遲效果
TimeUnit.MINUTES.sleep(10);
}
}
2.3、消息消費方
@Component
@Slf4j
public class MessageHandler {
/**
* 郵件發送
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.email")
@RabbitHandler
public void handleEmailMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
jsonObject.put("messageId",headers.get("spring_returned_message_correlation"));
log.info("---接受到消息---{}",jsonObject);
//主動異常
int m=1/0;
//手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleEmailMessage捕獲到異常,拒絕重新入隊---消息ID---{}", headers.get("spring_returned_message_correlation"));
//異常,ture 重新入隊,或者false,進入死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
}
/**
* 死信消費者,自動簽收開啟狀態下,超過重試次數,或者手動簽收,reject或者Nack
* @param message
*/
@RabbitListener(queues = "demo.dead.letter")
public void handleDeadLetterMessage(Message message, Channel channel,@Headers Map<String,Object> headers) throws IOException {
//可以考慮數據庫記錄,每次進來查數量,達到一定的數量,進行預警,人工介入處理
log.info("接收到死信消息:---{}---消息ID---{}", new String(message.getBody()),headers.get("spring_returned_message_correlation"));
//回復ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* 延時隊列消費
* @param message
* @param channel
* @param headers
* @throws IOException
*/
@RabbitListener(queues ="demo.delay")
@RabbitHandler
public void handleOrderDelayMessage(Message message, Channel channel, @Headers Map<String,Object> headers) throws IOException {
try {
String msg=new String(message.getBody(), CharEncoding.UTF_8);
JSONObject jsonObject = JSON.parseObject(msg);
log.info("---接受到訂單消息---orderId---{}",message.getMessageProperties().getMessageId());
log.info("---訂單信息---order---{}",jsonObject);
//業務邏輯,根據訂單id獲取訂單信息,如果還未支付,設置關閉狀態,如果已支付,不做任何處理
//手動簽收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
catch (Exception e) {
log.info("handleOrderDelayMessage捕獲到異常,重新入隊---orderId---{}", headers.get("spring_returned_message_correlation"));
//異常,ture 重新入隊,或者false,進入死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
}
}
}
2.4、結果
運行結果顯示,同一個訂單號的消息,發送過后2分鍾,消費者才接受到,符合預期。