消息消費被阻塞的 消息重試 消費失敗如何重新消費消息


消息重試_高級特性_功能與特性_產品簡介_消息隊列 RocketMQ 版-阿里雲 https://help.aliyun.com/document_detail/43490.html

 

本文介紹消息隊列 RocketMQ 版的消息重試機制和配置方式。

順序消息的重試

對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。

 

本文介紹消息隊列 RocketMQ 版的消息重試機制和配置方式。

順序消息的重試

對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。

無序消息的重試

對於無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。

無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。

 
注意 以下內容都只針對無序消息生效。

重試次數

消息隊列 RocketMQ 版默認允許每條消息最多重試 16 次,每次重試的間隔時間如下。

 
第幾次重試 與上次重試的間隔時間 第幾次重試 與上次重試的間隔時間
1 10 秒 9 7 分鍾
2 30 秒 10 8 分鍾
3 1 分鍾 11 9 分鍾
4 2 分鍾 12 10 分鍾
5 3 分鍾 13 20 分鍾
6 4 分鍾 14 30 分鍾
7 5 分鍾 15 1 小時
8 6 分鍾 16 2 小時

如果消息重試 16 次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鍾之內進行 16 次重試,超過這個時間范圍消息將不再重試投遞。

 
注意 一條消息無論重試多少次,這些重試消息的 Message ID 不會改變。

配置方式

  • 消費失敗后,重試配置方式

    集群消費方式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):

    • 方式 1:返回 Action.ReconsumeLater(推薦)
    • 方式 2:返回 Null
    • 方式 3:拋出異常

    示例代碼

     
    public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //消息處理邏輯拋出異常,消息將重試 doConsumeMessage(message); //方式 1:返回 Action.ReconsumeLater,消息將重試 return Action.ReconsumeLater; //方式 2:返回 null,消息將重試 return null; //方式 3:直接拋出異常,消息將重試 throw new RuntimeException("Consumer Message exception"); } }
  • 消費失敗后,無需重試的配置方式

    集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage,此后這條消息將不會再重試。

    示例代碼

     
    public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕獲消費邏輯中的所有異常,並返回 Action.CommitMessage; return Action.CommitMessage; } //消息處理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; } }
  • 自定義消息最大重試次數
     
    說明 自定義 消息隊列 RocketMQ 版的客戶端日志配置,請升級 TCP Java SDK 版本到 1.2.2 及以上。

    消息隊列 RocketMQ 版允許 Consumer 啟動的時候設置最大重試次數,重試時間間隔將按照以下策略:

    • 最大重試次數小於等於 16 次,則重試時間間隔同上表描述。
    • 最大重試次數大於 16 次,超過 16 次的重試時間間隔均為每次 2 小時。

    配置方式如下:

     
    Properties properties = new Properties(); //配置對應 Group ID 的最大消息重試次數為 20 次,最大重試次數為字符串類型 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); Consumer consumer =ONSFactory.createConsumer(properties);
     
    注意
    • 消息最大重試次數的設置對相同 Group ID 下的所有 Consumer 實例有效。
    • 如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那么該配置對兩個 Consumer 實例均生效。
    • 配置采用覆蓋的方式生效,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置。

獲取消息重試次數

消費者收到消息后,可按照以下方式獲取消息的重試次數:

 
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //獲取消息的重試次數 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }

 

 

 

阿里雲幫助中心-阿里雲,領先的雲計算服務提供商 https://help.aliyun.com/knowledge_detail/54318.html

消息隊列 RocketMQ 版消費失敗如何重新消費消息?

 

 

集群消費方式

消費業務邏輯代碼如果返回 Action.ReconsumerLater,或者 NULL,或者拋出異常,消息都會走重試流程,至多重試 16 次,如果重試 16 次后,仍然失敗,則消息丟棄。每次重試的間隔時間如下:

 
第幾次重試 每次重試間隔時間
1 10 秒
2 30 秒
3 1 分鍾
4 2 分鍾
5 3 分鍾
6 4 分鍾
7 5 分鍾
8 6 分鍾
9 7 分鍾
10 8 分鍾
11 9 分鍾
12 10 分鍾
13 20 分鍾
14 30 分鍾
15 1 小時
16 2 小時

可以通過調用 message.getReconsumeTimes() 方法來獲取消息的重試次數。

廣播消費方式

廣播消費方式仍然能保證一條消息至少被消費一次,但消費失敗后不做重試操作。

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM