七、消息發送重試機制
1 說明
Producer對發送失敗的消息進行重新發送的機制,稱為消息發送重試機制,也稱為消息重投機制。
對於消息重投,需要注意以下幾點:
-
生產者在發送消息時,若采用同步或異步發送方式,發送失敗會重試,但oneway消息發送方式 發送失敗是沒有重試機制的
-
只有普通消息有發送重試機制,順序消息是沒有的(只有默認自帶的發送選擇才有這個功能,若手動實現選擇器,則無法實現重試避錯機制,也不需要)
-
消息重投機制可以保證消息盡可能發送成功、不丟失,但可能會造成消息重復。消息重復在 RocketMQ中是無法避免的問題
-
消息重復在一般情況下不會發生,當出現消息量大、網絡抖動,消息重復就會成為大概率事件
-
producer主動重發、consumer負載變化(發生Rebalance,不會導致消息重復,但可能出現重復 消費)也會導致重復消息
-
消息重復無法避免,但要避免消息的重復消費。
-
避免消息重復消費的解決方案是,為消息添加唯一標識(例如消息key),使消費者對消息進行消 費判斷來避免重復消費
-
消息發送重試有三種策略可以選擇:同步發送失敗策略、異步發送失敗策略、消息刷盤失敗策略
2 同步發送失敗策略
對於普通消息,消息發送默認采用round-robin策略來選擇所發送到的隊列。如果發送失敗,默認重試2 次。但在重試時是不會選擇上次發送失敗的Broker,而是選擇其它Broker。當然,若只有一個Broker其 也只能發送到該Broker,但其會盡量發送到該Broker上的其它Queue。
// 創建一個producer,參數為Producer Group名稱
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定nameServer地址
producer.setNamesrvAddr("rocketmqOS:9876");
// 設置同步發送失敗時重試發送的次數,默認為2次
producer.setRetryTimesWhenSendFailed(3);
// 設置發送超時時限為5s,默認3s
producer.setSendMsgTimeout(5000);
同時,Broker還具有失敗隔離功能,使Producer盡量選擇未發生過發送失敗的Broker作為目標 Broker。其可以保證其它消息盡量不發送到問題Broker,可以降低發送失敗的概率,提升消息發送效率,降低消息發送耗時。
思考:讓我們自己實現失敗隔離功能,如何來做?
1)方案一:Producer中維護某JUC的Map集合,其key是發生失敗的時間戳,value為Broker實 例。Producer中還維護着一個Set集合,其中存放着所有未發生發送異常的Broker實例。選擇目 標Broker是從該Set集合中選擇的。再定義一個定時任務,定期從Map集合中將長期未發生發送 異常的Broker清理出去,並添加到Set集合。
2)方案二:為Producer中的Broker實例添加一個標識,例如是一個AtomicBoolean屬性。只要該 Broker上發生過發送異常,就將其置為true。選擇目標Broker就是選擇該屬性值為false的 Broker。再定義一個定時任務,定期將Broker的該屬性置為false。
3)方案三:為Producer中的Broker實例添加一個標識,例如是一個AtomicLong屬性。只要該 Broker上發生過發送異常,就使其值增一。選擇目標Broker就是選擇該屬性值最小的Broker。若 該值相同,采用輪詢方式選擇。
如果超過重試次數,則拋出異常,由Producer去保證消息不丟。當然當生產者出現 RemotingException、MQClientException和MQBrokerException時,Producer會自動重投消息。
3 異步發送失敗策略
異步發送失敗重試時,異步重試不會選擇其他broker,僅在同一個broker上做重試,所以該策略無法保 證消息不丟。
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.setRetryTimesWhenSendAsyncFailed(3);
4 消息刷盤失敗策略
消息刷盤超時(Master或Slave)或slave不可用(slave在做數據同步時向master返回狀態不是 SEND_OK)時,默認是不會將消息嘗試發送到其他Broker的。不過,對於重要消息可以通過在Broker 的配置文件設置retryAnotherBrokerWhenNotStoreOK
屬性為true來開啟。
八、消息消費重試機制
1 順序消息的消費重試
對於順序消息,需要嚴格保證拉取消費的順序性,所以不會錯過任何一個消息的消費,所以當Consumer消費消息失敗后,為了保證消息的順序性,其會自動不斷地進行消息重 試,直到消費成功。消費重試默認間隔時間為1000毫秒。重試期間應用會出現消息消費被阻塞的情 況。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
// 順序消息消費失敗的消費重試時間間隔,單位毫秒,默認為1000,其取值范圍為[10,30000]
consumer.setSuspendCurrentQueueTimeMillis(100);
由於對順序消息的重試是無休止的,不間斷的,直至消費成功,所以,對於順序消息的消費, 務必要保證應用能夠及時監控並處理消費失敗的情況,避免消費被永久性阻塞。
注意,順序消息沒有發送失敗重試機制,但具有消費失敗重試機制
2 無序消息的消費重試
對於無序消息(普通消息、延時消息、事務消息),當Consumer消費消息失敗時,可以通過設置返回狀態達到消息重試的效果。不過需要注意,無序消息的重試只對集群消費方式生效,廣播消費方式不提供失敗重試特性。即對於廣播消費,消費失敗后,失敗消息不再重試,繼續消費后續消息。
對於無序消息集群消費下的重試消費,每條消息默認最多重試16次,但每次重試的間隔時間是不同的,會逐漸變長。每次重試的間隔時間如下表。
重試次數 | 與上次重試的間隔時間 | 重試次數 | 與上次重試的間隔時間 |
---|---|---|---|
1 | 10秒 | 9 | 7分鍾 |
2 | 30秒 | 10 | 8分鍾 |
3 | 1分鍾 | 11 | 9分鍾 |
4 | 2分鍾 | 12 | 10分鍾 |
5 | 3分鍾 | 13 | 20分鍾 |
6 | 4分鍾 | 14 | 30分鍾 |
7 | 5分鍾 | 15 | 1小時 |
8 | 6分鍾 | 16 | 2小時 |
若一條消息在一直消費失敗的前提下,將會在正常消費后的第4小時46分后進行第16次重試。 若仍然失敗,則將消息投遞到死信隊列
修改消費重試次數
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg"); // 修改消費重試次數 consumer.setMaxReconsumeTimes(10);
對於修改過的重試次數,將按照以下策略執行:
- 若修改值小於16,則按照指定間隔進行重試
- 若修改值大於16,則超過16次的重試時間間隔均為2小時
對於Consumer Group,若僅修改了一個Consumer的消費重試次數,則會應用到該Group中所有 其它Consumer實例。若出現多個Consumer均做了修改的情況,則采用覆蓋方式生效。即最后被 修改的值會覆蓋前面設置的值。
3. 重試隊列
對於需要重試消費的消息,並不是Consumer在等待了指定時長后再次去拉取原來的消息進行消費,而 是將這些需要重試消費的消息放入到了一個特殊Topic的隊列中,而后進行再次消費的。這個特殊的隊 列就是重試隊列。
當出現需要進行重試消費的消息時,Broker會為每個消費組都設置一個Topic名稱 為%RETRY%consumerGroup@consumerGroup
的重試隊列。
1)這個重試隊列是針對消息才組的,而不是針對每個Topic設置的(一個Topic的消息可以讓多 個消費者組進行消費,所以會為這些消費者組各創建一個重試隊列)
2)只有當出現需要進行重試消費的消息時,才會為該消費者組創建重試隊列
注意,消費重試的時間間隔與延時消費的延時等級十分相似,除了沒有延時等級的前兩個時間 外,其它的時間都是相同的
Broker對於重試消息的處理是通過延時消息實現的。先將消息保存到SCHEDULE_TOPIC_XXXX
延遲隊 列中,延遲時間到后,會將消息投遞到%RETRY%consumerGroup@consumerGroup
重試隊列中。
4. 消費重試配置方式
集群消費方式下,消息消費失敗后若希望消費重試,則需要在消息監聽器接口的實現中明確進行如下三 種方式之一的配置:
- 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推薦)
- 方式2:返回Null
- 方式3:拋出異常
5. 消費不重試配置方式
集群消費方式下,消息消費失敗后若不希望消費重試,則在捕獲到異常后同樣也返回與消費成功后的相 同的結果,即ConsumeConcurrentlyStatus.CONSUME_SUCCESS,則不進行消費重試。
九、死信隊列
1 什么是死信隊列
當一條消息初次消費失敗,消息隊列會自動進行消費重試;達到最大重試次數后,若消費依然失敗,則 表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送 到該消費者對應的特殊隊列中。這個隊列就是死信隊列(Dead-Letter Queue,DLQ),而其中的消息 則稱為死信消息(Dead-Letter Message,DLM)。
死信隊列是用於處理無法被正常消費的消息的。
2 死信隊列的特征
死信隊列具有如下特征:
-
死信隊列中的消息不會再被消費者正常消費,即DLQ對於消費者是不可見的
-
死信存儲有效期與正常消息相同,均為 3 天(commitlog文件的過期時間),3 天后會被自動刪除
-
死信隊列就是一個特殊的Topic,名稱為
%DLQ%consumerGroup@consumerGroup
,即每個消 費者組都有一個死信隊列 -
如果⼀個消費者組未產生死信消息,則不會為其創建相應的死信隊列
3 死信消息的處理
實際上,當⼀條消息進入死信隊列,就意味着系統中某些地方出現了問題,從而導致消費者無法正常消 費該消息,比如代碼中原本就存在Bug。因此,對於死信消息,通常需要開發人員進行特殊處理。最關 鍵的步驟是要排查可疑因素,解決代碼中可能存在的Bug,然后再將原來的死信消息再次進行投遞消 費