RocketMQ 死信隊列 | 消費者出現異常如何處理?


RocketMQ 重復消費問題 | 訂單系統核心流程引入冪等性機制一文中,我們討論了消息重復消費的問題,比較好的方案是采用在消費側使用業務判斷法來保證接口的冪等性,這樣就能避免消息重復消費的問題。

今天要討論的是消費者代碼執行過程中出現異常,我們應該如何處理?

手動提交 offset

首先來看一段代碼,Consumer 類是一個消費者類,它我們為它注冊了一個監聽器,在處理完消息之后,會將消息的狀態返回給 RocketMQ,執行成功返回的是消息狀態是 CONSUME_SUCCESS

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

        // 設置 NameServer 地址
        consumer.setNamesrvAddr("");
        // 訂閱 Topic
        consumer.subscribe("TopicTest", "*");
        // 這次回調接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 對消息的處理,比如發放優惠券、積分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

畫一張圖來表示向 RocketMQ 提交消息狀態的流程,如圖所示:

消息者業務代碼出現異常怎么辦?

再來看一下消費者的代碼中監聽器的部分,它說如果消息處理成功,那么就返回消息狀態為 CONSUME_SUCCESS,也有可能發放優惠券、積分等操作出現了異常,比如說數據庫掛掉了。這個時候應該怎么處理呢?

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 對消息的處理,比如發放優惠券、積分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

我們可以把代碼改一改,捕獲異常之后返回消息的狀態為 RECONSUME_LATER 表示稍后重試。

// 這次回調接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    // 對消息的處理,比如發放優惠券、積分等
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 萬一發生數據庫宕機等異常,返回稍后重試消息的狀態
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            }
        });

重試隊列

這個時候,消息會進入到 RocketMQ 的重試隊列中。

  • 比如說消費者所屬的消息組名稱為AAAConsumerGroup
  • 其重試隊列名稱就叫做%RETRY%AAAConsumerGroup
  • 重試隊列中的消息過一段時間會再次發送給消費者,如果還是無法正常執行會再次進入重試隊列
  • 默認重試16次,還是無法執行,消息就會從重試隊列進入到死信隊列

死信隊列

  • 重試隊列中的消息重試16次任然無法執行,將會進入到死信隊列
  • 死信隊列的名字是 %DLQ%AAAConsumerGroup
  • 死信隊列中的消息可以后台開一個線程,訂閱%DLQ%AAAConsumerGroup,並不停重試

總結

本文從消費者的業務代碼出現異常講起,介紹了 RocketMQ 的重試隊列和死信隊列:

  1. 代碼正常執行返回消息狀態為CONSUME_SUCCESS,執行異常返回RECONSUME_LATER
  2. 狀態為RECONSUME_LATER的消息會進入到重試隊列,重試隊列的名稱為 %RETRY% + ConsumerGroupName
  3. 重試16次消息任然沒有處理成功,消息就會進入到死信隊列%DLQ% + ConsumerGroupName;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM