在RocketMQ 重復消費問題 | 訂單系統核心流程引入冪等性機制一文中,我們討論了消息重復消費的問題,比較好的方案是采用在消費側使用業務判斷法來保證接口的冪等性,這樣就能避免消息重復消費的問題。
今天要討論的是消費者代碼執行過程中出現異常,我們應該如何處理?
手動提交 offset
首先來看一段代碼,Consumer
類是一個消費者類,它我們為它注冊了一個監聽器,在處理完消息之后,會將消息的狀態返回給 RocketMQ,執行成功返回的是消息狀態是 CONSUME_SUCCESS
。
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
// 設置 NameServer 地址
consumer.setNamesrvAddr("");
// 訂閱 Topic
consumer.subscribe("TopicTest", "*");
// 這次回調接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 對消息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
畫一張圖來表示向 RocketMQ 提交消息狀態的流程,如圖所示:
消息者業務代碼出現異常怎么辦?
再來看一下消費者的代碼中監聽器的部分,它說如果消息處理成功,那么就返回消息狀態為 CONSUME_SUCCESS
,也有可能發放優惠券、積分等操作出現了異常,比如說數據庫掛掉了。這個時候應該怎么處理呢?
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 對消息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
我們可以把代碼改一改,捕獲異常之后返回消息的狀態為 RECONSUME_LATER
表示稍后重試。
// 這次回調接口,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 對消息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 萬一發生數據庫宕機等異常,返回稍后重試消息的狀態
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
重試隊列
這個時候,消息會進入到 RocketMQ 的重試隊列中。
- 比如說消費者所屬的消息組名稱為
AAAConsumerGroup
- 其重試隊列名稱就叫做
%RETRY%AAAConsumerGroup
- 重試隊列中的消息過一段時間會再次發送給消費者,如果還是無法正常執行會再次進入重試隊列
- 默認重試16次,還是無法執行,消息就會從重試隊列進入到死信隊列
死信隊列
- 重試隊列中的消息重試16次任然無法執行,將會進入到死信隊列
- 死信隊列的名字是
%DLQ%AAAConsumerGroup
- 死信隊列中的消息可以后台開一個線程,訂閱
%DLQ%AAAConsumerGroup
,並不停重試
總結
本文從消費者的業務代碼出現異常講起,介紹了 RocketMQ 的重試隊列和死信隊列:
- 代碼正常執行返回消息狀態為
CONSUME_SUCCESS
,執行異常返回RECONSUME_LATER
- 狀態為
RECONSUME_LATER
的消息會進入到重試隊列,重試隊列的名稱為%RETRY% + ConsumerGroupName
; - 重試16次消息任然沒有處理成功,消息就會進入到死信隊列
%DLQ% + ConsumerGroupName
;