RocketMQ(11) 消息重試機制和死信隊列


七、消息發送重試機制

1 說明

Producer對發送失敗的消息進行重新發送的機制,稱為消息發送重試機制,也稱為消息重投機制。

對於消息重投,需要注意以下幾點:

  • 生產者在發送消息時,若采用同步或異步發送方式,發送失敗會重試,但oneway消息發送方式 發送失敗是沒有重試機制的

  • 只有普通消息有發送重試機制,順序消息是沒有的(只有默認自帶的發送選擇才有這個功能,若手動實現選擇器,則無法實現重試避錯機制,也不需要)

  • 消息重投機制可以保證消息盡可能發送成功、不丟失,但可能會造成消息重復。消息重復在 RocketMQ中是無法避免的問題

  • 消息重復在一般情況下不會發生,當出現消息量大、網絡抖動,消息重復就會成為大概率事件

  • producer主動重發、consumer負載變化(發生Rebalance,不會導致消息重復,但可能出現重復 消費)也會導致重復消息

  • 消息重復無法避免,但要避免消息的重復消費。

  • 避免消息重復消費的解決方案是,為消息添加唯一標識(例如消息key),使消費者對消息進行消 費判斷來避免重復消費

  • 消息發送重試有三種策略可以選擇:同步發送失敗策略、異步發送失敗策略、消息刷盤失敗策略

2 同步發送失敗策略

對於普通消息,消息發送默認采用round-robin策略來選擇所發送到的隊列。如果發送失敗,默認重試2 次。但在重試時是不會選擇上次發送失敗的Broker,而是選擇其它Broker。當然,若只有一個Broker其 也只能發送到該Broker,但其會盡量發送到該Broker上的其它Queue。

// 創建一個producer,參數為Producer Group名稱
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("rocketmqOS:9876");
// 設置同步發送失敗時重試發送的次數,默認為2次
producer.setRetryTimesWhenSendFailed(3);
// 設置發送超時時限為5s,默認3s
producer.setSendMsgTimeout(5000);

同時,Broker還具有失敗隔離功能,使Producer盡量選擇未發生過發送失敗的Broker作為目標 Broker。其可以保證其它消息盡量不發送到問題Broker,可以降低發送失敗的概率,提升消息發送效率,降低消息發送耗時。

思考:讓我們自己實現失敗隔離功能,如何來做?

1)方案一:Producer中維護某JUC的Map集合,其key是發生失敗的時間戳,value為Broker實 例。Producer中還維護着一個Set集合,其中存放着所有未發生發送異常的Broker實例。選擇目 標Broker是從該Set集合中選擇的。再定義一個定時任務,定期從Map集合中將長期未發生發送 異常的Broker清理出去,並添加到Set集合。

2)方案二:為Producer中的Broker實例添加一個標識,例如是一個AtomicBoolean屬性。只要該 Broker上發生過發送異常,就將其置為true。選擇目標Broker就是選擇該屬性值為false的 Broker。再定義一個定時任務,定期將Broker的該屬性置為false。

3)方案三:為Producer中的Broker實例添加一個標識,例如是一個AtomicLong屬性。只要該 Broker上發生過發送異常,就使其值增一。選擇目標Broker就是選擇該屬性值最小的Broker。若 該值相同,采用輪詢方式選擇。

如果超過重試次數,則拋出異常,由Producer去保證消息不丟。當然當生產者出現 RemotingException、MQClientException和MQBrokerException時,Producer會自動重投消息。

3 異步發送失敗策略

異步發送失敗重試時,異步重試不會選擇其他broker,僅在同一個broker上做重試,所以該策略無法保 證消息不丟。

DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.setRetryTimesWhenSendAsyncFailed(3);

4 消息刷盤失敗策略

消息刷盤超時(Master或Slave)或slave不可用(slave在做數據同步時向master返回狀態不是 SEND_OK)時,默認是不會將消息嘗試發送到其他Broker的。不過,對於重要消息可以通過在Broker 的配置文件設置retryAnotherBrokerWhenNotStoreOK屬性為true來開啟。

八、消息消費重試機制

1 順序消息的消費重試

對於順序消息,需要嚴格保證拉取消費的順序性,所以不會錯過任何一個消息的消費,所以當Consumer消費消息失敗后,為了保證消息的順序性,其會自動不斷地進行消息重 試,直到消費成功。消費重試默認間隔時間為1000毫秒。重試期間應用會出現消息消費被阻塞的情 況。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認為1000,其取值范圍為[10,30000]
consumer.setSuspendCurrentQueueTimeMillis(100);

由於對順序消息的重試是無休止的,不間斷的,直至消費成功,所以,對於順序消息的消費, 務必要保證應用能夠及時監控並處理消費失敗的情況,避免消費被永久性阻塞。

注意,順序消息沒有發送失敗重試機制,但具有消費失敗重試機制

2 無序消息的消費重試

對於無序消息(普通消息、延時消息、事務消息),當Consumer消費消息失敗時,可以通過設置返回狀態達到消息重試的效果。不過需要注意,無序消息的重試只對集群消費方式生效,廣播消費方式不提供失敗重試特性。即對於廣播消費,消費失敗后,失敗消息不再重試,繼續消費后續消息。

對於無序消息集群消費下的重試消費,每條消息默認最多重試16次,但每次重試的間隔時間是不同的,會逐漸變長。每次重試的間隔時間如下表。

重試次數 與上次重試的間隔時間 重試次數 與上次重試的間隔時間
1 10秒 9 7分鍾
2 30秒 10 8分鍾
3 1分鍾 11 9分鍾
4 2分鍾 12 10分鍾
5 3分鍾 13 20分鍾
6 4分鍾 14 30分鍾
7 5分鍾 15 1小時
8 6分鍾 16 2小時

若一條消息在一直消費失敗的前提下,將會在正常消費后的第4小時46分后進行第16次重試。 若仍然失敗,則將消息投遞到死信隊列

修改消費重試次數

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 修改消費重試次數
consumer.setMaxReconsumeTimes(10);

對於修改過的重試次數,將按照以下策略執行:

  • 若修改值小於16,則按照指定間隔進行重試
  • 若修改值大於16,則超過16次的重試時間間隔均為2小時

對於Consumer Group,若僅修改了一個Consumer的消費重試次數,則會應用到該Group中所有 其它Consumer實例。若出現多個Consumer均做了修改的情況,則采用覆蓋方式生效。即最后被 修改的值會覆蓋前面設置的值。

3. 重試隊列

對於需要重試消費的消息,並不是Consumer在等待了指定時長后再次去拉取原來的消息進行消費,而 是將這些需要重試消費的消息放入到了一個特殊Topic的隊列中,而后進行再次消費的。這個特殊的隊 列就是重試隊列。

當出現需要進行重試消費的消息時,Broker會為每個消費組都設置一個Topic名稱 為%RETRY%consumerGroup@consumerGroup 的重試隊列。

1)這個重試隊列是針對消息才組的,而不是針對每個Topic設置的(一個Topic的消息可以讓多 個消費者組進行消費,所以會為這些消費者組各創建一個重試隊列)

2)只有當出現需要進行重試消費的消息時,才會為該消費者組創建重試隊列

注意,消費重試的時間間隔延時消費的延時等級十分相似,除了沒有延時等級的前兩個時間 外,其它的時間都是相同的

Broker對於重試消息的處理是通過延時消息實現的。先將消息保存到SCHEDULE_TOPIC_XXXX延遲隊 列中,延遲時間到后,會將消息投遞到%RETRY%consumerGroup@consumerGroup重試隊列中。

4. 消費重試配置方式

集群消費方式下,消息消費失敗后若希望消費重試,則需要在消息監聽器接口的實現中明確進行如下三 種方式之一的配置:

  • 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推薦)
  • 方式2:返回Null
  • 方式3:拋出異常

image-20220111232349353

5. 消費不重試配置方式

image-20220111232300838

集群消費方式下,消息消費失敗后若不希望消費重試,則在捕獲到異常后同樣也返回與消費成功后的相 同的結果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,則不進行消費重試。

九、死信隊列

1 什么是死信隊列

當一條消息初次消費失敗,消息隊列會自動進行消費重試;達到最大重試次數后,若消費依然失敗,則 表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送 到該消費者對應的特殊隊列中。這個隊列就是死信隊列(Dead-Letter Queue,DLQ),而其中的消息 則稱為死信消息(Dead-Letter Message,DLM)。

死信隊列是用於處理無法被正常消費的消息的。

2 死信隊列的特征

死信隊列具有如下特征:

  • 死信隊列中的消息不會再被消費者正常消費,即DLQ對於消費者是不可見的

  • 死信存儲有效期與正常消息相同,均為 3 天(commitlog文件的過期時間),3 天后會被自動刪除

  • 死信隊列就是一個特殊的Topic,名稱為%DLQ%consumerGroup@consumerGroup ,即每個消 費者組都有一個死信隊列

  • 如果⼀個消費者組未產生死信消息,則不會為其創建相應的死信隊列

3 死信消息的處理

實際上,當⼀條消息進入死信隊列,就意味着系統中某些地方出現了問題,從而導致消費者無法正常消 費該消息,比如代碼中原本就存在Bug。因此,對於死信消息,通常需要開發人員進行特殊處理。最關 鍵的步驟是要排查可疑因素,解決代碼中可能存在的Bug,然后再將原來的死信消息再次進行投遞消 費


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM