RocketMQ源碼 — 八、 RocketMQ消息重試


RocketMQ的消息重試包含了producer發送消息的重試和consumer消息消費的重試。

producer發送消息重試

producer在發送消息的時候如果發送失敗了,RocketMQ會自動重試。

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    this.makeSureStateOK();
    Validators.checkMessage(msg, this.defaultMQProducer);

    final long invokeID = random.nextLong();
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    long endTimestamp = beginTimestampFirst;
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        Exception exception = null;
        SendResult sendResult = null;
        // 這是調用的總次數
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
	// 省略部分代碼...
}

重試幾次?

由上面可以看出發送消息的重試次數區分不同的情況:

  • 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#retryTimesWhenSendFailed + 1,默認retryTimesWhenSendFailed是2,所以除了正常調用一次外,發送消息如果失敗了會重試2次
  • 異步發送:不會重試(調用總次數等於1)

什么時候重試?

發生異常的時候,需要注意的是發送的時候並不是catch所有的異常,只有內部異常才會catch住並重試。

怎么重試?

每次重試都會重新進行負載均衡(會考慮發送失敗的因素),重新選擇MessageQueue,這樣增大發送消息成功的可能性。

隔多久重試?

立即重試,中間沒有單獨的間隔時間。

consumer消費重試

消息處理失敗之后,該消息會和其他正常的消息一樣被broker處理,之所以能重試是因為consumer會把失敗的消息發送回broker,broker對於重試的消息做一些特別的處理,供consumer再次發起消費 。

消息重試的主要流程:

  1. consumer消費失敗,將消息發送回broker
  2. broker收到重試消息之后置換topic,存儲消息
  3. consumer會拉取該topic對應的retryTopic的消息
  4. consumer拉取到retryTopic消息之后,置換到原始的topic,把消息交給listener消費

consumer發送重試消息給broker

以非順序消息為例說明消息消費重試,首先,在消息消費失敗后consumer會把消息發送回broker

// org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
public void run() {
    // 省略部分代碼...
    	// 這個status是listener返回的,用戶可以指定status,如果業務邏輯代碼消費消息失敗后可以返回org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER
    	// 來告訴RocketMQ需要重新消費
    	// 如果是多個消息,用戶還可以指定從哪一個消息開始需要重新消費
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
	
	// 根據不同的status判斷是否成功
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

	// 用戶返回null或者拋出未處理的異常,RocketMQ默認會重試
    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
	
    if (!processQueue.isDropped()) {
        // 上面的結果在這個方法中具體處理
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

上面這個方法區分出不同的消費結果:

  • org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#CONSUME_SUCCESS:消費成功,如果多個消息,用戶可以指定從哪一個消息開始重試
  • org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus#RECONSUME_LATER:重試所有的消息
    • 用戶返回status為RECONSUME_LATER
    • 用戶返回null
    • 用戶業務邏輯處理拋出異常

在確定是否需要重試的時候,進一步處理哪些消息需要重試,也就是哪些消息會發送回broker

public void processConsumeResult(
    final ConsumeConcurrentlyStatus status,
    final ConsumeConcurrentlyContext context,
    final ConsumeRequest consumeRequest
) {
	// 從哪里開始重試
	// ackIndex默認是int最大值,除非用戶自己指定了從哪些消息開始重試
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    switch (status) {
        case CONSUME_SUCCESS:
        	// 即使是CONSUME_SUCCESS,也可能部分消息需要重試
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
        	// 如果status是RECONSUME_LATER的時候會所有消息都會重試所以ackIndex設為-1
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
        	// 廣播的消息不會重試
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
        	// 集群消費的消息才會重試
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 將消息發送回broker
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);
                // 如果上面發送失敗后后面會重新發送

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
    	// 更新消費進度
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

consumer發送消費失敗的消息和普通的producer發送消息的調用路徑前面不太一樣,其中關鍵的區別是下面的方法

// org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#sendMessageBack
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    try {
        String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
            : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
        this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
            this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
    } catch (Exception e) {
        log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
		// 如果消費失敗的消息發送回broker失敗了,會再重試一次,和try里面的方法不一樣的地方是這里直接修改topic
        // 為重試topic然后和producer發送消息的方法一樣發送到broker
        Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

        String originMsgId = MessageAccessor.getOriginMessageId(msg);
        MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

        newMsg.setFlag(msg.getFlag());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
        MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
        MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

        this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    }
}

// org.apache.rocketmq.client.impl.MQClientAPIImpl#consumerSendMessageBack
public void consumerSendMessageBack(
    final String addr,
    final MessageExt msg,
    final String consumerGroup,
    final int delayLevel,
    final long timeoutMillis,
    final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
    ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
    // 和普通的發送消息的RequestCode不一樣,broker處理的方法也不一樣
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);

    requestHeader.setGroup(consumerGroup);
    // 因為重試的消息被broker拿到后會修改topic,所以這里設置原始的topic
    requestHeader.setOriginTopic(msg.getTopic());
    // broker會根據offset查詢原始的消息
    requestHeader.setOffset(msg.getCommitLogOffset());
    // 設置delayLevel,這個值決定了該消息是否會被延時消費、延時多久,
    // 用戶可以設置延時等級,默認是0,不延時(但是broker端會有邏輯:如果為0會加3)
    requestHeader.setDelayLevel(delayLevel);
    // 設置最初的msgId
    requestHeader.setOriginMsgId(msg.getMsgId());
    // 設置最多被重試的次數,默認是16
    requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
        request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}

發送重試的消息的時候有幾個關鍵屬性:

originTopic:因為重試的消息被broker拿到后會修改topic,投遞到所以需要保留一個原始的topic

delayLevel:該消息是否會被延時消費

maxReconsumeTimes:這個消息最多可以重試(消費)多少次

broker接收重試消息

broker處理重試消息的方式和普通消息略有不同

  1. 檢查是否配置了重試的消息隊列,隊列是否可寫
  2. 查詢原始消息
  3. 判斷是否超過最大重試次數或者delayLevel小於0,消息不會被重試,而是會被投遞到死信隊列(不會再被消費),topic是%DLQ%+group
  4. 如果delayLevel是0,0表示會被延時10s(如果是默認的延時等級,關於延時消息的部分詳見:這一篇
  5. 根據原始消息構造新消息保存,差異字段為:
    1. topic:%RETRY%+group
    2. reconsumeTimes:原來的reconsumeTimes + 1,也就是說每重試一次就加1
    3. queueId:使用新的topic的queueId
    4. 新增properties:ORIGIN_MESSAGE_ID,RETRY_TOPIC(如果原來沒有的話)
// 代碼不再贅述,主要方法是
org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack

consumer拉取重試的消息

按照正常的消息消費流程,消息保存在broker之后,consumer就可以拉取消費了,和普通消息不一樣的是拉取消息的並不是consumer本來訂閱的topic,而是%RETRY%+group。

這里一直默認一開始retryTopic本身存在,這里說明一下retryTopic的來源,retryTopic創建的時機有以下幾個:

  1. consumer啟動后會向broker發送heartbeat數據,如果broker中還沒有對應的SubscriptionGroupConfig

    信息,會創建對應topic的retryTopic:org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeatbroker

  2. broker在接收到consumer發送回來的重試的時候,如果還沒有創建retryTopic的topicConfig配置,則會新建:org.apache.rocketmq.broker.processor.AbstractSendMessageProcessor#msgCheck

  3. broker在處理consumer發送回來的重試消息的時候會創建retryTopic:org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack

broker創建retryTopic之后,和正常的topic配置一樣同步到namesrv,然后consumer就可以從namesrv獲取到retryTopic配置了。

所以consumer會拉取%RETRY%+group對應的消息:

  1. consumer發送重試消息給broker以后,broker存儲在新的retryTopic下,作為一個新的topic,consume會拉取這個新的topic的消息
  2. consumer拉取到這個retryTopic的消息之后再把topic換成原來的topic:org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#resetRetryTopic,然后交給consume的listener處理

總結

在業務處理出錯的時候,常常需要重新處理,這個時候業務可以返回RECONSUME_LATER,RocketMQ就會重新將消息發送回broker,讓consumer重試。而且,用戶也可以根據實際情況,指定一些配置,比如:重試次數,是否延時消費等。但是需要注意的是如果業務拋出異常后無需重試,一定要catch住所有異常,避免把異常拋給RocketMQ,否則RocketMQ會認為該消息需要重試,當然也不能返回null。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM