一、什么是死信隊列
當消息在一個隊列中變成一個死信之后,它將被重新publish到另一個交換機上,這個交換機我們就叫做死信交換機,私信交換機將死信投遞到一個隊列上就是死信隊列。具體原理如下圖:

消息變成死信的三種情況:
- 消息被拒絕(basic.reject / basic.nack),並且requeue = false
- 消息TTL過期
- 隊列達到最大長度
二、手動簽收應答模式
應答模式分為兩種,手動簽收和自動簽收,自動應答就是消費者消費了一條消息就自動告訴隊列刪除消息。這樣的弊端就是不管消費邏輯有沒有成功,都會將消息刪除,這樣就會造成消息丟失。而使用手動簽收后,就是在消費邏輯處理成功后,手動告訴隊列消費成功,然后隊列再去刪除這條消息。
- 再消費者配置文件中開啟手動簽收模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual
- 在消費邏輯處理成功后手動簽收,修改消費者代碼
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(Message message,@Headers Map<String,Object> headers, Channel channel) throws Exception {
Jedis jedis = new Jedis("localhost", 6379);
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"UTF-8");
System.out.println("接收導的消息為:"+msg+"==消息id為:"+messageId);
String messageIdRedis = jedis.get("messageId");
if(messageId == messageIdRedis){
return;
}
JSONObject jsonObject = JSONObject.parseObject(msg);
String email = jsonObject.getString("email");
String content = jsonObject.getString("timestamp");
String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
// 如果發生異常則返回null
String body = HttpUtils.httpGet(httpUrl, "utf-8");
//
if(body == null){
throw new Exception();
}
jedis.set("messageId",messageId);
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
// 手動簽收
channel.basicAck(deliveryTag,false);
}
三、產生死信的三種情況演示
- 消息被拒絕
我們繼續修改一下消費者代碼,嘗試讓消費者消費的時候發生異常。然后在catch塊中拒絕消息。
// 拒絕消息,給死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
我們運行程序后發現,當消息消費異常后在隊列”zb-byte1“中的消息被消費了,同時發現在死信隊列”dead-byte-zb“中有一條未被消費的消息。消息到死信隊列后,然后我們在創建一個消費者去消費消息就可以了。當然死信隊列也需要去手動簽收消息。
- 消息TTL過期
這種模式我們也叫做延遲消費,有一種特別經典的案例就是用戶在一個商品搶購系統中,用戶搶到商品后需要在30分鍾時間內支付,不然訂單無效。這時候我們就可以通過消息TTL過期來實現,設置隊列消息過期時間為30分鍾,30分鍾后publish到死信隊列,我們在死信隊列中消費訂單狀態是否支付成功來判斷該訂單是否有效。
非常簡單,我們只需要在配置死信交換機的時候設置有效時間就可以了
@Bean
public Queue queue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
map.put("x-message-ttl",7200); // 隊列過期時間
Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
return queue;
}
- 隊列達到最大長度
設置隊列長度即可:
@Bean
public Queue queue(){
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
map.put("x-max-length",3);
// map.put("x-message-ttl",7200); // 隊列過期時間
Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
return queue;
}
設置好之后,我們先不要啟動消費者,然后調用生成者往隊列中發送消息,當消息長度大於3時,我們發現消息進入了死信隊列。
注意:前文中也提到過,隊列不能被修改,也就是說已經創建好的隊列設置了過期時常為7200s,然后我們注釋掉,增加隊列長度是3的代碼,這樣運行會報錯,必須在rabbitmq中將該隊列刪除,然后重新生成隊列才可以。
如果文章對您有幫助,請記得點贊關注喲~
歡迎大家關注我的公眾號:字節傳說,每日推送技術文章供大家學習參考。
