RocketMQ的消息重試包含了producer發送消息的重試和consumer消息消費的重試。
producer發送消息重試
producer在發送消息的時候如果發送失敗了,RocketMQ會自動重試。
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 這是調用的總次數
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
// 省略部分代碼...
}
重試幾次?
由上面可以看出發送消息的重試次數區分不同的情況:
- 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默認retryTimesWhenSendFailed是2,所以除了正常調用一次外,發送消息如果失敗了會重試2次
- 異步發送:不會重試(調用總次數等於1)
什么時候重試?
發生異常的時候,需要注意的是發送的時候並不是catch所有的異常,只有內部異常才會catch住並重試。
怎么重試?
每次重試都會重新進行負載均衡(會考慮發送失敗的因素),重新選擇MessageQueue,這樣增大發送消息成功的可能性。
隔多久重試?
立即重試,中間沒有單獨的間隔時間。
consumer消費重試
消息處理失敗之后,該消息會和其他正常的消息一樣被broker處理,之所以能重試是因為consumer會把失敗的消息發送回broker,broker對於重試的消息做一些特別的處理,供consumer再次發起消費 。
消息重試的主要流程:
- consumer消費失敗,將消息發送回broker
- broker收到重試消息之后置換topic,存儲消息
- consumer會拉取該topic對應的retryTopic的消息
- consumer拉取到retryTopic消息之后,置換到原始的topic,把消息交給listener消費
consumer發送重試消息給broker
以非順序消息為例說明消息消費重試,首先,在消息消費失敗后consumer會把消息發送回broker
// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
public void run() {
// 省略部分代碼...
// 這個status是listener返回的,用戶可以指定status,如果業務邏輯代碼消費消息失敗后可以返回org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER
// 來告訴RocketMQ需要重新消費
// 如果是多個消息,用戶還可以指定從哪一個消息開始需要重新消費
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
// 根據不同的status判斷是否成功
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
// 用戶返回null或者拋出未處理的異常,RocketMQ默認會重試
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (!processQueue.isDropped()) {
// 上面的結果在這個方法中具體處理
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
上面這個方法區分出不同的消費結果:
- org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#CONSUME_SUCCESS:消費成功,如果多個消息,用戶可以指定從哪一個消息開始重試
- org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER:重試所有的消息
- 用戶返回status為RECONSUME_LATER
- 用戶返回null
- 用戶業務邏輯處理拋出異常
在確定是否需要重試的時候,進一步處理哪些消息需要重試,也就是哪些消息會發送回broker
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
// 從哪里開始重試
// ackIndex默認是int最大值,除非用戶自己指定了從哪些消息開始重試
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
// 即使是CONSUME_SUCCESS,也可能部分消息需要重試
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
// 如果status是RECONSUME_LATER的時候會所有消息都會重試所以ackIndex設為-1
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 廣播的消息不會重試
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
// 集群消費的消息才會重試
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 將消息發送回broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 如果上面發送失敗后后面會重新發送
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新消費進度
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
consumer發送消費失敗的消息和普通的producer發送消息的調用路徑前面不太一樣,其中關鍵的區別是下面的方法
// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
// 如果消費失敗的消息發送回broker失敗了,會再重試一次,和try里面的方法不一樣的地方是這里直接修改topic
// 為重試topic然后和producer發送消息的方法一樣發送到broker
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
// org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
// 和普通的發送消息的RequestCode不一樣,broker處理的方法也不一樣
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
requestHeader.setGroup(consumerGroup);
// 因為重試的消息被broker拿到后會修改topic,所以這里設置原始的topic
requestHeader.setOriginTopic(msg.getTopic());
// broker會根據offset查詢原始的消息
requestHeader.setOffset(msg.getCommitLogOffset());
// 設置delayLevel,這個值決定了該消息是否會被延時消費、延時多久,
// 用戶可以設置延時等級,默認是0,不延時(但是broker端會有邏輯:如果為0會加3)
requestHeader.setDelayLevel(delayLevel);
// 設置最初的msgId
requestHeader.setOriginMsgId(msg.getMsgId());
// 設置最多被重試的次數,默認是16
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
發送重試的消息的時候有幾個關鍵屬性:
originTopic:因為重試的消息被broker拿到后會修改topic,投遞到所以需要保留一個原始的topic
delayLevel:該消息是否會被延時消費
maxReconsumeTimes:這個消息最多可以重試(消費)多少次
broker接收重試消息
broker處理重試消息的方式和普通消息略有不同
- 檢查是否配置了重試的消息隊列,隊列是否可寫
- 查詢原始消息
- 判斷是否超過最大重試次數或者delayLevel小於0,消息不會被重試,而是會被投遞到死信隊列(不會再被消費),topic是%DLQ%+group
- 如果delayLevel是0,0表示會被延時10s(如果是默認的延時等級,關於延時消息的部分詳見:這一篇)
- 根據原始消息構造新消息保存,差異字段為:
- topic:%RETRY%+group
- reconsumeTimes:原來的reconsumeTimes + 1,也就是說每重試一次就加1
- queueId:使用新的topic的queueId
- 新增properties:ORIGIN_MESSAGE_ID,RETRY_TOPIC(如果原來沒有的話)
// 代碼不再贅述,主要方法是
org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
consumer拉取重試的消息
按照正常的消息消費流程,消息保存在broker之后,consumer就可以拉取消費了,和普通消息不一樣的是拉取消息的並不是consumer本來訂閱的topic,而是%RETRY%+group。
這里一直默認一開始retryTopic本身存在,這里說明一下retryTopic的來源,retryTopic創建的時機有以下幾個:
-
consumer啟動后會向broker發送heartbeat數據,如果broker中還沒有對應的SubscriptionGroupConfig
信息,會創建對應topic的retryTopic:org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeatbroker
-
broker在接收到consumer發送回來的重試的時候,如果還沒有創建retryTopic的topicConfig配置,則會新建:org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck
-
broker在處理consumer發送回來的重試消息的時候會創建retryTopic:org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
broker創建retryTopic之后,和正常的topic配置一樣同步到namesrv,然后consumer就可以從namesrv獲取到retryTopic配置了。
所以consumer會拉取%RETRY%+group對應的消息:
- consumer發送重試消息給broker以后,broker存儲在新的retryTopic下,作為一個新的topic,consume會拉取這個新的topic的消息
- consumer拉取到這個retryTopic的消息之后再把topic換成原來的topic:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#resetRetryTopic,然后交給consume的listener處理
總結
在業務處理出錯的時候,常常需要重新處理,這個時候業務可以返回RECONSUME_LATER,RocketMQ就會重新將消息發送回broker,讓consumer重試。而且,用戶也可以根據實際情況,指定一些配置,比如:重試次數,是否延時消費等。但是需要注意的是如果業務拋出異常后無需重試,一定要catch住所有異常,避免把異常拋給RocketMQ,否則RocketMQ會認為該消息需要重試,當然也不能返回null。