消息隊列(七)--- RocketMQ延時發送和消息重試(半原創)


本文圖片和部分總結來自於參考資料,半原創,侵刪

問題

  • Rocketmq 重試是否有超時問題,假如超時了如何解決,是重新發送消息呢?還是一直等待
  • 假如某個 msg 進入了重試隊列(%RETRY_XXX%),然后成功消費了

概述

    文章介紹了RocketMQ 的重試機制和消息重試的機制。

定時任務

定時任務概述

    rocketmq為定時任務創建一個單獨的 topic ,而 rocketmq的定時任務是定的時間是分等級的,而不同等級對應topic內不同的隊列,然后通過一個“執行定時任務的服務”定時執行多個隊列內的任務,執行時需要更改該定時任務實際要發送的 topic 和 tag 。

發送例子

發送例子

Message msg =
    new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
msg.setDelayTimeLevel(i + 1);

    時間等級

public class MessageStoreConfig {

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
}

寫入定時任務

    寫入的時候是在寫入commitLog 的時候寫入的,這一點很重要,因為這也是實現消費失敗重試的基礎。 CommitLog 會將這條消息的話題和隊列 ID 替換成專門用於定時的話題和相應的級別對應的隊列 ID。真實的話題和隊列 ID 會作為屬性放置到這條消息中,后面處理的時候會自己從這個隊列id 進行發送消息。

    public class CommitLog {

        public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) {

                topic = ScheduleMessageService.SCHEDULE_TOPIC;
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                // 替換 Topic 和 QueueID
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
            
        }
        
    }

處理定時任務

    執行定時任務的服務,ScheduleMessageService 的 start 方法

    public void start() {

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                //Timer 持有多個定時任務,然后時間到了就執行該任務,
                // 但是 Timer 內部只有一個線程在執行任務,也就不能保證時間的正確性(因為當一個線程在執行的時候,某個任務的時間已經到了)
                // 注意,是為每個延時時間等級建一個任務Task
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

class DeliverDelayedMessageTimerTask extends TimerTask {

    public void executeOnTimeup() {
        // ...
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
            // 是否到時間
            long countdown = deliverTimestamp - now;

            if (countdown <= 0) {
                // 取出消息
                MessageExt msgExt =
                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                // 修正消息,設置上正確的話題和隊列 ID
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                // 重新存儲消息
                PutMessageResult putMessageResult =
                    ScheduleMessageService.this.defaultMessageStore
                    .putMessage(msgInner);
            } else {
                // countdown 后投遞此消息
                ScheduleMessageService.this
                    .timer
                    .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
                // 更新偏移量
            }
        } // end of for

        // 更新偏移量
    }
    
}

    同時該定時任務也進行持久化,一個是消費進度,一個消息對應的位移量

1297993-20200106151538670-2043955717.png

1297993-20200106151602350-1881388344.png

消息消費重試

    RocketMQ中遇到以下情況就會進行消息重試 :

  • 拋出異常
  • 返回 NULL 狀態
  • 返回 RECONSUME_LATER 狀態
  • 超時 15 分鍾沒有響應

1297993-20200107155108508-1498228666.png

consumer 注冊訂閱重試隊列

    consumer 在啟動的時候就會訂閱“%RETRY_XXX%”的topic,為的就是當某個topic消費失敗處理重試消息。如下圖所示 :

1297993-20200106162252037-330088757.png

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            // ...
            this.copySubscription();
            // ...
        
            this.serviceState = ServiceState.RUNNING;
            break;
        }
    }

    private void copySubscription() throws MQClientException {
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            break;
            
        case CLUSTERING:
            // 重試話題組
            final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                                retryTopic, SubscriptionData.SUB_ALL);
            this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
            break;
            
        default:
            break;
        }
    }
    
}

超時消費

    我們思考一個問題,假如消費者掉線了,那么消息直接發不過去了,而要是消費者的消費邏輯執行了太久的業務邏輯,那么應該有一個動作來觸發 消費超時,進行重試.

ConsumeMessageConcurrentlyService 的 start 方法。

    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

這個定時周期任務每過 getConsumeTimeout 時間就會掃描消費超時的任務,調用 sendMessageBack 方法,該方法會調用 RPC發送消息給 broker ,消費失敗進行重試。

    上一篇我們講到消息消費的過程,當集群模式下,消息消費成功會本地的消息消費進度,而失敗了會調用RPC 發送消息給broker ,而broker 處理的邏輯在 SendMessageProcessor

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {

            //消費者消費失敗的情況
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:

                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

  需要注意的是 consumerTimeOut 的時間是 15 分鍾,生產的時候可以配置短點。 

批量處理的問題

    批量處理一批數據要是返回 RECONSUME_LATER ,那么這批數據就會重新發給 broker ,進行消息重試,所以在業務邏輯的時候就要考慮消費者重新消費的冪等性。

    ConsumeRequest的 run 方法

        @Override
        public void run() {
            ....

            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                //NO.1 業務實現
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                hasException = true;
            }

            ...

            if (!processQueue.isDropped()) {
                //NO.2 處理消息消費的結果
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }

我們可以設置最大批量處理的數量為 1 ,那么就會針對每一條消息進行重試,但是那樣的話就會性能相對於批量處理肯定差一些。 

ack 機制

    public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                //發送給broker , 該批數據進行消息重試
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        //刪除消息樹中的已消費的消息節點,並返回消息樹中最小的節點,更新最小的節點為當前進度!!
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

可以看到消息消費完成后,更新的進度都是對應的 processqueue中對應的消息樹里的最小節點(即偏移量最小的節點),那么有可能存在這樣的問題,下面來自 參考

 

這鍾方式和傳統的一條message單獨ack的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重復問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。

在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了(注:consumerOffset=2201)。

在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。

總結

    從參考資料中我學習到了自己學習與別人的差異是總結的能力,通過濃縮代碼片段,總結核心的邏輯步驟,加深對邏輯的理解。

參考資料

  • https://www.jianshu.com/p/5843cdcd02aa
  • http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM