本文圖片和部分總結來自於參考資料,半原創,侵刪
問題
- Rocketmq 重試是否有超時問題,假如超時了如何解決,是重新發送消息呢?還是一直等待
- 假如某個 msg 進入了重試隊列(%RETRY_XXX%),然后成功消費了
概述
文章介紹了RocketMQ 的重試機制和消息重試的機制。
定時任務
定時任務概述
rocketmq為定時任務創建一個單獨的 topic ,而 rocketmq的定時任務是定的時間是分等級的,而不同等級對應topic內不同的隊列,然后通過一個“執行定時任務的服務”定時執行多個隊列內的任務,執行時需要更改該定時任務實際要發送的 topic 和 tag 。
發送例子
發送例子
Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */); msg.setDelayTimeLevel(i + 1);
時間等級
public class MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }
寫入定時任務
寫入的時候是在寫入commitLog 的時候寫入的,這一點很重要,因為這也是實現消費失敗重試的基礎。 CommitLog 會將這條消息的話題和隊列 ID 替換成專門用於定時的話題和相應的級別對應的隊列 ID。真實的話題和隊列 ID 會作為屬性放置到這條消息中,后面處理的時候會自己從這個隊列id 進行發送消息。
public class CommitLog { public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); // 替換 Topic 和 QueueID msg.setTopic(topic); msg.setQueueId(queueId); } } }
處理定時任務
執行定時任務的服務,ScheduleMessageService 的 start 方法
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { //Timer 持有多個定時任務,然后時間到了就執行該任務, // 但是 Timer 內部只有一個線程在執行任務,也就不能保證時間的正確性(因為當一個線程在執行的時候,某個任務的時間已經到了) // 注意,是為每個延時時間等級建一個任務Task this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
class DeliverDelayedMessageTimerTask extends TimerTask { public void executeOnTimeup() { // ... for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 是否到時間 long countdown = deliverTimestamp - now; if (countdown <= 0) { // 取出消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy); // 修正消息,設置上正確的話題和隊列 ID MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); // 重新存儲消息 PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); } else { // countdown 后投遞此消息 ScheduleMessageService.this .timer .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); // 更新偏移量 } } // end of for // 更新偏移量 } }
同時該定時任務也進行持久化,一個是消費進度,一個消息對應的位移量
消息消費重試
RocketMQ中遇到以下情況就會進行消息重試 :
- 拋出異常
- 返回 NULL 狀態
- 返回 RECONSUME_LATER 狀態
- 超時 15 分鍾沒有響應
consumer 注冊訂閱重試隊列
consumer 在啟動的時候就會訂閱“%RETRY_XXX%”的topic,為的就是當某個topic消費失敗處理重試消息。如下圖所示 :
public class DefaultMQPushConsumerImpl implements MQConsumerInner { public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: // ... this.copySubscription(); // ... this.serviceState = ServiceState.RUNNING; break; } } private void copySubscription() throws MQClientException { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: // 重試話題組 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } }
超時消費
我們思考一個問題,假如消費者掉線了,那么消息直接發不過去了,而要是消費者的消費邏輯執行了太久的業務邏輯,那么應該有一個動作來觸發 消費超時,進行重試.
ConsumeMessageConcurrentlyService 的 start 方法。
public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { cleanExpireMsg(); } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); }
這個定時周期任務每過 getConsumeTimeout 時間就會掃描消費超時的任務,調用 sendMessageBack 方法,該方法會調用 RPC發送消息給 broker ,消費失敗進行重試。
上一篇我們講到消息消費的過程,當集群模式下,消息消費成功會本地的消息消費進度,而失敗了會調用RPC 發送消息給broker ,而broker 處理的邏輯在 SendMessageProcessor
@Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { //消費者消費失敗的情況 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } this.executeSendMessageHookAfter(response, mqtraceContext); return response; } }
需要注意的是 consumerTimeOut 的時間是 15 分鍾,生產的時候可以配置短點。
批量處理的問題
批量處理一批數據要是返回 RECONSUME_LATER ,那么這批數據就會重新發給 broker ,進行消息重試,所以在業務邏輯的時候就要考慮消費者重新消費的冪等性。
ConsumeRequest的 run 方法
@Override public void run() { .... try { ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs); if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } //NO.1 業務實現 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageConcurrentlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } ... if (!processQueue.isDropped()) { //NO.2 處理消息消費的結果 ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); } }
我們可以設置最大批量處理的數量為 1 ,那么就會針對每一條消息進行重試,但是那樣的話就會性能相對於批量處理肯定差一些。
ack 機制
public void processConsumeResult( final ConsumeConcurrentlyStatus status, final ConsumeConcurrentlyContext context, final ConsumeRequest consumeRequest ) { int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; case CLUSTERING: //發送給broker , 該批數據進行消息重試 List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); boolean result = this.sendMessageBack(msg, context); if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } //刪除消息樹中的已消費的消息節點,並返回消息樹中最小的節點,更新最小的節點為當前進度!! long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
可以看到消息消費完成后,更新的進度都是對應的 processqueue中對應的消息樹里的最小節點(即偏移量最小的節點),那么有可能存在這樣的問題,下面來自 參考
這鍾方式和傳統的一條message單獨ack的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重復問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。
在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了(注:consumerOffset=2201)。
在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。
總結
從參考資料中我學習到了自己學習與別人的差異是總結的能力,通過濃縮代碼片段,總結核心的邏輯步驟,加深對邏輯的理解。
參考資料
- https://www.jianshu.com/p/5843cdcd02aa
- http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/