本文圖片和部分總結來自於參考資料,半原創,侵刪
問題
- Rocketmq 重試是否有超時問題,假如超時了如何解決,是重新發送消息呢?還是一直等待
- 假如某個 msg 進入了重試隊列(%RETRY_XXX%),然后成功消費了
概述
文章介紹了RocketMQ 的重試機制和消息重試的機制。
定時任務
定時任務概述
rocketmq為定時任務創建一個單獨的 topic ,而 rocketmq的定時任務是定的時間是分等級的,而不同等級對應topic內不同的隊列,然后通過一個“執行定時任務的服務”定時執行多個隊列內的任務,執行時需要更改該定時任務實際要發送的 topic 和 tag 。
發送例子
發送例子
Message msg =
new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
msg.setDelayTimeLevel(i + 1);
時間等級
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
寫入定時任務
寫入的時候是在寫入commitLog 的時候寫入的,這一點很重要,因為這也是實現消費失敗重試的基礎。 CommitLog 會將這條消息的話題和隊列 ID 替換成專門用於定時的話題和相應的級別對應的隊列 ID。真實的話題和隊列 ID 會作為屬性放置到這條消息中,后面處理的時候會自己從這個隊列id 進行發送消息。
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 替換 Topic 和 QueueID
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
}
處理定時任務
執行定時任務的服務,ScheduleMessageService 的 start 方法
public void start() {
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
if (timeDelay != null) {
//Timer 持有多個定時任務,然后時間到了就執行該任務,
// 但是 Timer 內部只有一個線程在執行任務,也就不能保證時間的正確性(因為當一個線程在執行的時候,某個任務的時間已經到了)
// 注意,是為每個延時時間等級建一個任務Task
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
class DeliverDelayedMessageTimerTask extends TimerTask {
public void executeOnTimeup() {
// ...
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
// 是否到時間
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 取出消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
// 修正消息,設置上正確的話題和隊列 ID
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
// 重新存儲消息
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
} else {
// countdown 后投遞此消息
ScheduleMessageService.this
.timer
.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
// 更新偏移量
}
} // end of for
// 更新偏移量
}
}
同時該定時任務也進行持久化,一個是消費進度,一個消息對應的位移量
消息消費重試
RocketMQ中遇到以下情況就會進行消息重試 :
- 拋出異常
- 返回 NULL 狀態
- 返回 RECONSUME_LATER 狀態
- 超時 15 分鍾沒有響應
consumer 注冊訂閱重試隊列
consumer 在啟動的時候就會訂閱“%RETRY_XXX%”的topic,為的就是當某個topic消費失敗處理重試消息。如下圖所示 :
public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
// ...
this.copySubscription();
// ...
this.serviceState = ServiceState.RUNNING;
break;
}
}
private void copySubscription() throws MQClientException {
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
break;
case CLUSTERING:
// 重試話題組
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
}
}
超時消費
我們思考一個問題,假如消費者掉線了,那么消息直接發不過去了,而要是消費者的消費邏輯執行了太久的業務邏輯,那么應該有一個動作來觸發 消費超時,進行重試.
ConsumeMessageConcurrentlyService 的 start 方法。
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
這個定時周期任務每過 getConsumeTimeout 時間就會掃描消費超時的任務,調用 sendMessageBack 方法,該方法會調用 RPC發送消息給 broker ,消費失敗進行重試。
上一篇我們講到消息消費的過程,當集群模式下,消息消費成功會本地的消息消費進度,而失敗了會調用RPC 發送消息給broker ,而broker 處理的邏輯在 SendMessageProcessor
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
switch (request.getCode()) {
//消費者消費失敗的情況
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.consumerSendMsgBack(ctx, request);
default:
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return null;
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
RemotingCommand response;
if (requestHeader.isBatch()) {
response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
this.executeSendMessageHookAfter(response, mqtraceContext);
return response;
}
}
需要注意的是 consumerTimeOut 的時間是 15 分鍾,生產的時候可以配置短點。
批量處理的問題
批量處理一批數據要是返回 RECONSUME_LATER ,那么這批數據就會重新發給 broker ,進行消息重試,所以在業務邏輯的時候就要考慮消費者重新消費的冪等性。
ConsumeRequest的 run 方法
@Override
public void run() {
....
try {
ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
//NO.1 業務實現
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
...
if (!processQueue.isDropped()) {
//NO.2 處理消息消費的結果
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
我們可以設置最大批量處理的數量為 1 ,那么就會針對每一條消息進行重試,但是那樣的話就會性能相對於批量處理肯定差一些。
ack 機制
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
case CONSUME_SUCCESS:
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
int ok = ackIndex + 1;
int failed = consumeRequest.getMsgs().size() - ok;
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
break;
case RECONSUME_LATER:
ackIndex = -1;
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
consumeRequest.getMsgs().size());
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
}
break;
case CLUSTERING:
//發送給broker , 該批數據進行消息重試
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
//刪除消息樹中的已消費的消息節點,並返回消息樹中最小的節點,更新最小的節點為當前進度!!
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
可以看到消息消費完成后,更新的進度都是對應的 processqueue中對應的消息樹里的最小節點(即偏移量最小的節點),那么有可能存在這樣的問題,下面來自 參考
這鍾方式和傳統的一條message單獨ack的方式有本質的區別。性能上提升的同時,會帶來一個潛在的重復問題——由於消費進度只是記錄了一個下標,就可能出現拉取了100條消息如 2101-2200的消息,后面99條都消費結束了,只有2101消費一直沒有結束的情況。
在這種情況下,RocketMQ為了保證消息肯定被消費成功,消費進度職能維持在2101,直到2101也消費結束了,本地的消費進度才能標記2200消費結束了(注:consumerOffset=2201)。
在這種設計下,就有消費大量重復的風險。如2101在還沒有消費完成的時候消費實例突然退出(機器斷電,或者被kill)。這條queue的消費進度還是維持在2101,當queue重新分配給新的實例的時候,新的實例從broker上拿到的消費進度還是維持在2101,這時候就會又從2101開始消費,2102-2200這批消息實際上已經被消費過還是會投遞一次。
總結
從參考資料中我學習到了自己學習與別人的差異是總結的能力,通過濃縮代碼片段,總結核心的邏輯步驟,加深對邏輯的理解。
參考資料
- https://www.jianshu.com/p/5843cdcd02aa
- http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/




