consumer 消費失敗,會把消息重新發往 %RETRY% + consumerGroup,這個 retry 消息會在一定時間后,真實送到 retry topic。
broker 處理發送到 retry topic 的消息:
org.apache.rocketmq.broker.processor.SendMessageProcessor#consumerSendMsgBack
消息消費超過最大次數或者客戶端配置了直接發送到死信隊列,則把消息發送到死信隊列,否則把消息發送 retry topic,雖然看起來是把消息直接寫入 %RETRY% + consumerGroup
但其實在 putMessage 的時候,會把消息寫入 SCHEDULE_TOPIC_XXXX
// org.apache.rocketmq.store.CommitLog#putMessage if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); }
SCHEDULE_TOPIC_XXXX 這個 topic 非常有意思,broker 並沒有顯式創建這個 topic,即 nameserver 和 broker 沒有保存這個 broker 的元數據,topic 的數據會正常寫入 commitLog,一個 delay 等級對應一個 queue,queueId = delayLevel - 1,所以 SCHEDULE_TOPIC_XXXX 最多有 18 個 queue。
// org.apache.rocketmq.store.config.MessageStoreConfig#messageDelayLevel private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
一共有 18 個 delayLevel
// org.apache.rocketmq.common.subscription.SubscriptionGroupConfig#retryMaxTimes private int retryMaxTimes = 16;
這個參數 consumer 不可配置,默認 16
ScheduleMessageService 初始化 delayLevelTable,鍵是 delayLevel,值是 delay 的毫秒數,從 1 到 18
// org.apache.rocketmq.store.schedule.ScheduleMessageService#load public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); return result; } public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); Long tu = timeUnitTable.get(ch); int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); } } catch (Exception e) { log.error("parseDelayLevel exception", e); log.info("levelString String = {}", levelString); return false; } return true; }
ScheduleMessageService 針對每一個 level 創建一個定時任務,遍歷 consume queue,判斷消息是否到期,到期則把消息寫入真實 topic
// org.apache.rocketmq.store.schedule.ScheduleMessageService#start public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } // 定期持久化處理完的 queue offset 到 delayOffset.json 文件中 this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }