定時消息的第二個設計關鍵點 : 消息存儲時如果消息的延遲級別屬性 delayLevel > 0
,則會備份原主題 原隊列到消息屬性中,其鍵分別為 PROPERTY_REAL_TOPIC
PROPERTY_REAL_QUEUE_ID 通過為不同的延遲級別創建不同的調度任務。 當時間到
達后執行調度任務, 調度任務主要就是根據延遲拉取消息消費進度從延遲隊列中拉取消息,
然后從 commitlog 加載完整消息,清除延遲級別屬性並恢復原先的主題、隊列,再次創
建一條新的消息存入到 commitlog 中並轉發到消息消費隊列供消息消費者消費
2021-05-19 更新
1 一個消息發送還是正常發送,包括選擇哪個broker,只是在到達commitLog時會對他做特殊處理。
org.apache.rocketmq.store.CommitLog
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
這里其實就是精髓了,在存消息的時候就已經根據不同的level把消息分到不同的queue上去了,每一種level存到不同的queue。比如1s的延時和10s的延時就存到了不同的queue上。這樣做的好處很明顯,
比如在10s這個延時等級上,在某一次執行的時候,調度任務會計算距離下一次的時間 delayTime,同時在包一個延時任務,延時的時間就是delayTime。
此時由於是多線程環境,其他線程再放進來的任務,延時時間一定不會比delayTime小。也就是說多線程環境下,也能保證某一次尋找下一次要執行的任務的delayTime總是正確的。
2 創建不同的定時調度器
在每一個broker啟動的時候 就已經把18個延時級別都分別創建了定時調度器了,當然我覺得有點浪費
ScheduleMessageService# parseDelayLevel
public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); return result; }
public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); Long tu = timeUnitTable.get(ch); int level = i + 1; if (level > this.maxDelayLevel) { this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); } } catch (Exception e) { log.error("parseDelayLevel exception", e); log.info("levelString String = {}", levelString); return false; } return true; }
MessageStoreConfig #messageDelayLevel
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
然后就是啟動定時器
ScheduleMessageService# start
public void start() { if (started.compareAndSet(false, true)) { this.timer = new Timer("ScheduleMessageTimerThread", true); for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { if (started.get()) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); } }
3 定時調度任務
ScheduleMessageService 還有一個重要的成員變量
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable = new ConcurrentHashMap<Integer, Long>(32);
用來保存下一次的offset
DeliverDelayedMessageTimerTask是核心邏輯所在
@Override public void run() { try { if (isStarted()) { this.executeOnTimeup(); } } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); } }
public void executeOnTimeup() {
//找到對應的queue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; if (cq != null) { SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);//把實際的文件裝進來 if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//CQ_STORE_UNIT_SIZE是20,就是每一個延時隊列中消息的固定大小 long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); long tagsCode = bufferCQ.getByteBuffer().getLong(); if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now;//計算距離下一個消息還需要等待的時間 if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) { log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}", msgInner.getTopic(), msgInner); continue; } PutMessageResult putMessageResult = ScheduleMessageService.this.writeMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { /* * XXX: warn and notify me */ log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else {
// 該分支是當前的記錄還到執行時間的邏輯,能看到會直接return。還有會延時countdown時間 ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }