消息重試分為兩種:Producer發送消息的重試 和 Consumer消息消費的重試。
一、Producer端重試
Producer端重試是指: Producer往MQ上發消息沒有發送成功,比如網絡原因導致生產者發送消息到MQ失敗。
部分源碼解析:
/**
* 說明 抽取部分代碼
*/
private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) {
//1、獲取當前時間
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev ;
//2、去服務器看下有沒有主題消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
//3、通過這里可以很明顯看出 如果不是同步發送消息 那么消息重試只有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//4、根據設置的重試次數,循環再去獲取服務器主題消息
for (times = 0; times < timesTotal; times++) {
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
beginTimestampPrev = System.currentTimeMillis();
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前后時間對比 如果前后時間差 大於 設置的等待時間 那么直接跳出for循環了 這就說明連接超時是不進行多次連接重試的
if (timeout < costTime) {
callTimeout = true;
break;
}
//6、如果超時直接報錯
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
}
}
通過這段源碼很明顯可以看出以下幾點
如果是異步發送 那么重試次數只有1次
對於同步而言,超時異常也是不會再去重試。
如果發生重試是在一個for 循環里去重試,所以它是立即重試而不是隔一段時間去重試。
實踐出真知!!!
二、 Consumer端重試
消費端比較有意思,而且在實際開發過程中,我們也更應該考慮的是消費端的重試。
消費者端的失敗主要分為2種情況,Exception 和 Timeout。
1、Exception
@Slf4j
@Component
public class Consumer {
/**
* 消費者實體對象
*/
private DefaultMQPushConsumer consumer;
/**
* 消費者組
*/
public static final String CONSUMER_GROUP = "test_consumer";
/**
* 通過構造函數 實例化對象
*/
public Consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr("47.99.203.55:9876;47.99.203.55:9877");
//訂閱topic和 tags( * 代表所有標簽)下信息
consumer.subscribe("topic_family", "*");
//注冊消費的監聽 並在此監聽中消費信息,並返回消費的狀態信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//1、獲取消息
Message msg = msgs.get(0);
try {
//2、消費者獲取消息
String body = new String(msg.getBody(), "utf-8");
//3、獲取重試次數
int count = ((MessageExt) msg).getReconsumeTimes();
log.info("當前消費重試次數為 = {}", count);
//4、這里設置重試大於3次 那么通過保存數據庫 人工來兜底
if (count >= 2) {
log.info("該消息已經重試3次,保存數據庫。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//直接拋出異常
throw new Exception("=======這里出錯了============");
//return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
//啟動監聽
consumer.start();
}
}
這里的代碼意思很明顯: 主動拋出一個異常,然后如果超過3次,那么就不繼續重試下去,而是將該條記錄保存到數據庫由人工來兜底。
看下運行結果
img
注意 消費者和生產者的重試還是有區別的,主要有兩點
1、默認重試次數:Product默認是2次,而Consumer默認是16次。
2、重試時間間隔:Product是立刻重試,而Consumer是有一定時間間隔的。它照1S,5S,10S,30S,1M,2M····2H進行重試。
3、Product在異步情況重試失效,而對於Consumer在廣播情況下重試失效。
2、Timeout
說明 這里的超時異常並非真正意義上的超時,它指的是指獲取消息后,因為某種原因沒有給RocketMQ返回消費的狀態,即沒有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 return ConsumeConcurrentlyStatus.RECONSUME_LATER。
那么 RocketMQ會認為該消息沒有發送,會一直發送。因為它會認為該消息根本就沒有發送給消費者,所以肯定沒消費。
做這個測試很簡單。
//1、消費者獲得消息
String body = new String(msg.getBody(), "utf-8");
//2、獲取重試次數
int count = ((MessageExt) msg).getReconsumeTimes();
log.info("當前消費重試次數為 = {}", count);
//3、這里睡眠60秒
Thread.sleep(60000);
log.info("休眠60秒 看還能不能走到這里。topic={},keys={},msg={}", msg.getTopic(), msg.getKeys(), body);
//返回成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
當獲得 當前消費重試次數為 = 0 后 , 關掉該進程。再重新啟動該進程,那么依然能夠獲取該條消息
consumer消費者 當前消費重試次數為 = 0
休眠60秒 看還能不能走到這里。topic=topic_family,keys=1a2b3c4d5f,msg=小小今年3歲
1
2
其他理解
首先,我們需要明確,只有當消費模式為 MessageModel.CLUSTERING(集群模式) 時,Broker 才會自動進行重試,對於廣播消息是不會重試的。
集群消費模式下,當消息消費失敗,RocketMQ 會通過消息重試機制重新投遞消息,努力使該消息消費成功。
當消費者消費該重試消息后,需要返回結果給 broker,告知 broker 消費成功(ConsumeConcurrentlyStatus.CONSUME*SUCCESS)或者需要重新消費(ConsumeConcurrentlyStatus.RECONSUME*LATER)
這里有個問題,如果消費者業務本身故障導致某條消息一直無法消費成功,難道要一直重試下去嗎?
答案是顯而易見的,並不會一直重試。
事實上,對於一直無法消費成功的消息,RocketMQ 會在達到最大重試次數之后,將該消息投遞至死信隊列。然后我們需要關注死信隊列,並對該死信消息業務做人工的補償操作。
那如何返回消息消費失敗呢?
RocketMQ 規定,以下三種情況統一按照消費失敗處理並會發起重試。
業務消費方返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
業務消費方返回null
業務消費方主動/被動拋出異常
前兩種情況較容易理解,當返回 ConsumeConcurrentlyStatus.RECONSUME_LATER或者 null時,broker 會知道消費失敗,后續就會發起消息重試,重新投遞該消息。
注意 對於拋出異常的情況,只要我們在業務邏輯中顯式拋出異常或者非顯式拋出異常,broker 也會重新投遞消息,如果業務對異常做了捕獲,那么該消息將不會發起重試。因此對於需要重試的業務,消費方在捕獲異常的時候要注意返回 ConsumeConcurrentlyStatus.RECONSUME*LATER 或 null 並輸出異常日志,打印當前重試次數。(推薦返回ConsumeConcurrentlyStatus.RECONSUME*LATER)
死信的業務處理方式
默認的處理機制中,如果我們只對消息做重復消費,達到最大重試次數之后消息就進入死信隊列了。
我們也可以根據業務的需要,定義消費的最大重試次數,每次消費的時候判斷當前消費次數是否等於最大重試次數的閾值。
如:重試三次就認為當前業務存在異常,繼續重試下去也沒有意義了,那么我們就可以將當前的這條消息進行提交,返回 broker 狀態ConsumeConcurrentlyStatus.CONSUME_SUCCES,讓消息不再重發,同時將該消息存入我們業務自定義的死信消息表,將業務參數入庫,相關的運營通過查詢死信表來進行對應的業務補償操作。
RocketMQ 的處理方式為將達到最大重試次數(16 次)的消息標記為死信消息,將該死信消息投遞到 DLQ 死信隊列中,業務需要進行人工干預。實現的邏輯在 SendMessageProcessor的 consumerSendMsgBack方法中,大致思路為首先判斷重試次數是否超過 16 或者消息發送延時級別是否小於 0,如果已經超過 16 或者發送延時級別小於 0,則將消息設置為新的死信。死信 topic 為:%DLQ%+consumerGroup
我們接着看一下死信的源碼實現機制。
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final ConsumerSendMsgBackRequestHeader requestHeader =
(ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
......
// 0.首先判斷重試次數是否大於等於 16,或者消息延遲級別是否小於 0
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) {
// 1. 如果滿足判斷條件,設置死信隊列 topic= %DLQ%+consumerGroup
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
if (null == topicConfig) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("topic[" + newTopic + "] not exist");
return response;
}
} else {
// 如果延遲級別為 0,則設置下一次延遲級別為 3+當前重試消費次數,達到時間衰減效果
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}
msgExt.setDelayTimeLevel(delayLevel);
}
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
msgInner.setBody(msgExt.getBody());
msgInner.setFlag(msgExt.getFlag());
MessageAccessor.setProperties(msgInner, msgExt.getProperties());
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
msgInner.setQueueId(queueIdInt);
msgInner.setSysFlag(msgExt.getSysFlag());
msgInner.setBornTimestamp(msgExt.getBornTimestamp());
msgInner.setBornHost(msgExt.getBornHost());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
// 3.死信消息投遞到死信隊列中並落盤
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
......
return response;
}
我們總結一下死信的處理邏輯:
首先判斷消息當前重試次數是否大於等於 16,或者消息延遲級別是否小於 0
只要滿足上述的任意一個條件,設置新的 topic(死信 topic)為:%DLQ%+consumerGroup
進行前置屬性的添加
將死信消息投遞到上述步驟 2 建立的死信 topic 對應的死信隊列中並落盤,使消息持久化。
PS:
延遲級別有18級,啟動定時任務默認先延遲1S后立馬執行,因此延遲隊列實際為17個 對應第2級到第18級
消費重試直接從第3級開始,一共16次 即10S到2H
————————————————
版權聲明:本文為CSDN博主「Apple_Web」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/belongtocode/article/details/104310781