消息重試_高級特性_功能與特性_產品簡介_消息隊列 RocketMQ 版-阿里雲 https://help.aliyun.com/document_detail/43490.html
本文介紹消息隊列 RocketMQ 版的消息重試機制和配置方式。
順序消息的重試
對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
本文介紹消息隊列 RocketMQ 版的消息重試機制和配置方式。
順序消息的重試
對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 版會自動不斷地進行消息重試(每次間隔時間為 1 秒),這時,應用會出現消息消費被阻塞的情況。因此,建議您使用順序消息時,務必保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
無序消息的重試
對於無序消息(普通、定時、延時、事務消息),當消費者消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
無序消息的重試只針對集群消費方式生效;廣播方式不提供失敗重試特性,即消費失敗后,失敗消息不再重試,繼續消費新的消息。
重試次數
消息隊列 RocketMQ 版默認允許每條消息最多重試 16 次,每次重試的間隔時間如下。
第幾次重試 | 與上次重試的間隔時間 | 第幾次重試 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10 秒 | 9 | 7 分鍾 |
2 | 30 秒 | 10 | 8 分鍾 |
3 | 1 分鍾 | 11 | 9 分鍾 |
4 | 2 分鍾 | 12 | 10 分鍾 |
5 | 3 分鍾 | 13 | 20 分鍾 |
6 | 4 分鍾 | 14 | 30 分鍾 |
7 | 5 分鍾 | 15 | 1 小時 |
8 | 6 分鍾 | 16 | 2 小時 |
如果消息重試 16 次后仍然失敗,消息將不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,將會在接下來的 4 小時 46 分鍾之內進行 16 次重試,超過這個時間范圍消息將不再重試投遞。
配置方式
- 消費失敗后,重試配置方式
集群消費方式下,消息消費失敗后期望消息重試,需要在消息監聽器接口的實現中明確進行配置(三種方式任選一種):
- 方式 1:返回 Action.ReconsumeLater(推薦)
- 方式 2:返回 Null
- 方式 3:拋出異常
示例代碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //消息處理邏輯拋出異常,消息將重試 doConsumeMessage(message); //方式 1:返回 Action.ReconsumeLater,消息將重試 return Action.ReconsumeLater; //方式 2:返回 null,消息將重試 return null; //方式 3:直接拋出異常,消息將重試 throw new RuntimeException("Consumer Message exception"); } }
- 消費失敗后,無需重試的配置方式
集群消費方式下,消息失敗后期望消息不重試,需要捕獲消費邏輯中可能拋出的異常,最終返回 Action.CommitMessage,此后這條消息將不會再重試。
示例代碼
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { //捕獲消費邏輯中的所有異常,並返回 Action.CommitMessage; return Action.CommitMessage; } //消息處理正常,直接返回 Action.CommitMessage; return Action.CommitMessage; } }
- 自定義消息最大重試次數
說明 自定義 消息隊列 RocketMQ 版的客戶端日志配置,請升級 TCP Java SDK 版本到 1.2.2 及以上。
消息隊列 RocketMQ 版允許 Consumer 啟動的時候設置最大重試次數,重試時間間隔將按照以下策略:
- 最大重試次數小於等於 16 次,則重試時間間隔同上表描述。
- 最大重試次數大於 16 次,超過 16 次的重試時間間隔均為每次 2 小時。
配置方式如下:
Properties properties = new Properties(); //配置對應 Group ID 的最大消息重試次數為 20 次,最大重試次數為字符串類型 properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); Consumer consumer =ONSFactory.createConsumer(properties);
注意- 消息最大重試次數的設置對相同 Group ID 下的所有 Consumer 實例有效。
- 如果只對相同 Group ID 下兩個 Consumer 實例中的其中一個設置了 MaxReconsumeTimes,那么該配置對兩個 Consumer 實例均生效。
- 配置采用覆蓋的方式生效,即最后啟動的 Consumer 實例會覆蓋之前的啟動實例的配置。
獲取消息重試次數
消費者收到消息后,可按照以下方式獲取消息的重試次數:
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { //獲取消息的重試次數 System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
阿里雲幫助中心-阿里雲,領先的雲計算服務提供商 https://help.aliyun.com/knowledge_detail/54318.html
消息隊列 RocketMQ 版消費失敗如何重新消費消息?
集群消費方式
消費業務邏輯代碼如果返回 Action.ReconsumerLater
,或者 NULL
,或者拋出異常,消息都會走重試流程,至多重試 16 次,如果重試 16 次后,仍然失敗,則消息丟棄。每次重試的間隔時間如下:
第幾次重試 | 每次重試間隔時間 |
---|---|
1 | 10 秒 |
2 | 30 秒 |
3 | 1 分鍾 |
4 | 2 分鍾 |
5 | 3 分鍾 |
6 | 4 分鍾 |
7 | 5 分鍾 |
8 | 6 分鍾 |
9 | 7 分鍾 |
10 | 8 分鍾 |
11 | 9 分鍾 |
12 | 10 分鍾 |
13 | 20 分鍾 |
14 | 30 分鍾 |
15 | 1 小時 |
16 | 2 小時 |
可以通過調用 message.getReconsumeTimes() 方法來獲取消息的重試次數。
廣播消費方式
廣播消費方式仍然能保證一條消息至少被消費一次,但消費失敗后不做重試操作。