RocketMQ-04、消息存儲(6)


4.8.2.Broker異步刷盤

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (messageExt.isWaitStoreMsgOK()) { GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); if (!flushOK) { log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags() + " client address: " + messageExt.getBornHostString()); putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { service.wakeup(); } } // Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } } 
// Asynchronous flush else { if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) { flushCommitLogService.wakeup(); } else { commitLogService.wakeup(); } } 

異步刷盤根據是否開啟transientStorePoolEnable機制,刷盤實現會有細微差別。如果transientStorePoolEnable 為true , RocketMQ 會單獨申請一個與目標物理文件( commitlog)同樣大小的堆外內存, 該堆外內存將使用內存鎖定,確保不會被置換到虛擬內存中去,消息首先追加到堆外內存,然后提交到與物理文件的內存映射內存中,再flush 到磁盤。如果transientStorePoolEnable 為flalse ,消息直接追加到與物理文件直接映射的內存中,然后刷寫到磁盤中。transientStorePoolEnable 為true 的磁盤刷寫流程如圖4-20 所示。

  • 首先將消息直接追加到ByteBuffer (堆外內存DirectByteBuffer ), wrotePosition 隨着消息的不斷追加向后移動。
  • CommitRealTimeService 線程默認每200ms 將ByteBuffer 新追加的內容( wrotePosihon減去commitedPosition )的數據提交到Ma ppedB yte Buff1巳r 中。
  • MappedByt巳Buffer 在內存中追加提交的內容, wrotePosition 指針向前后移動, 然后返回。
  • commit 操作成功返回,將commitedPosition 向前后移動本次提交的內容長度,此時wrotePosition 指針依然可以向前推進。
  • FlushRealTimeService 線程默認每500ms 將MappedByteBuffer 中新追加的內存( wrotePosition 減去上一次刷寫位置flushedPositiont )通過調用MappedByteBuffer#force()方法將數據刷寫到磁盤。
 
image

CommitRealTimeService 提交線程工作機制

@Override public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } try { boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.commit(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } CommitLog.log.info(this.getServiceName() + " service end"); } 
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); int commitDataThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval(); 

Step1: 首先解釋三個配置參數的含義。

  • commitlnterva!CommitLog: CommitRea!TimeService 線程間隔時間,默認200ms 。
  • commitCommitLogLeastPages : 一次提交任務至少包含頁數, 如果待提交數據不足,小於該參數配置的值,將忽略本次提交任務,默認4 頁。
  • commitDataThoroughinterval :兩次真實提交最大間隔,默認200ms 。
    long begin = System.currentTimeMillis(); if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) { this.lastCommitTimestamp = begin; commitDataLeastPages = 0; } 

Step2 :如果距上次提交間隔超過commitDataThoroughInterval , 則本次提交忽略commitCommitLogLeastPages參數, 也就是如果待提交數據小於指定頁數, 也執行提交操作。

    boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); long end = System.currentTimeMillis(); if (!result) { this.lastCommitTimestamp = end; // result = false means some data committed. //now wake up flush thread. flushCommitLogService.wakeup(); } if (end - begin > 500) { log.info("Commit data to file costs {} ms", end - begin); } this.waitForRunning(interval); 

Step3 :執行提交操作,將待提交數據提交到物理文件的內存映射內存區,如果返回false ,並不是代表提交失敗,而是只提交了一部分數據,喚醒刷盤線程執行刷盤操作。該線程每完成一次提交動作,將等待2 00ms 再繼續執行下一次提交任務。

FlushRealTimeService 刷盤線程工作機制

        public void run() { CommitLog.log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); boolean printFlushProgress = false; // Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } try { if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } if (printFlushProgress) { this.printFlushProgress(); } long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } long past = System.currentTimeMillis() - begin; if (past > 500) { log.info("Flush data to disk costs {} ms", past); } } catch (Throwable e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); this.printFlushProgress(); } } // Normal shutdown, to ensure that all the flush before exit boolean result = false; for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) { result = CommitLog.this.mappedFileQueue.flush(0); CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK")); } this.printFlushProgress(); CommitLog.log.info(this.getServiceName() + " service end"); } 
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed(); int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval(); 

Step1: 首先解釋四個配置參數的含義。

  • flushCommitLogTimed : 默認為false , 表示await 方法等待;如果為true ,表示使用Thread.sleep 方法等待。
  • flushIntervalCommitLog: FlushRealTimeService 線程任務運行間隔。
  • flushPhysicQueueLeastPages : 一次刷寫任務至少包含頁數, 如果待刷寫數據不足,小於該參數配置的值,將忽略本次刷寫任務,默認4 頁。
  • flushPhysicQueueThoroughInterval :兩次真實刷寫任務最大間隔, 默認10s 。
// Print flush progress long currentTimeMillis = System.currentTimeMillis(); if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) { this.lastFlushTimestamp = currentTimeMillis; flushPhysicQueueLeastPages = 0; printFlushProgress = (printTimes++ % 10) == 0; } 

Step2 :如果距上次提交間隔超過flushPhysicQueueThoroughInterval ,則本次刷盤任務將忽略flushPhysicQueueLeastPages , 也就是如果待刷寫數據小於指定頁數也執行刷寫磁盤操作。

    if (flushCommitLogTimed) { Thread.sleep(interval); } else { this.waitForRunning(interval); } 

Step3 :執行一次刷盤任務前先等待指定時間間隔, 然后再執行刷盤任務。

    long begin = System.currentTimeMillis(); CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp(); if (storeTimestamp > 0) { CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp); } 

Step4 :調用flush 方法將內存中數據刷寫到磁盤,並且更新存儲檢測點文件的comm1tlog 文件的更新時間戳,文件檢測點文件( checkpoint 文件)的刷盤動作在刷盤消息消費隊列線程中執行, 其入口為DefaultMessageStore# FlushConsumeQueueS 巳rvice 。由於消息消費隊列、索引文件的刷盤實現原理與Comm itlog 文件的刷盤機制類同,故本書不再做重復分析。

4.9.過期文件刪除機制

由於RocketMQ 操作CommitLog 、ConsumeQueue 文件是基於內存映射機制並在啟動的時候會加載commitlog 、ConsumeQueue 目錄下的所有文件,為了避免內存與磁盤的浪費,不可能將消息永久存儲在消息服務器上,所以需要引人一種機制來刪除己過期的文件。RocketMQ 順序寫Commitlog 文件、Cons umeQueue 文件,所有寫操作全部落在最后一個CommitLog 或Cons umeQueu e 文件上,之前的文件在下一個文件創建后將不會再被更新。RocketMQ 清除過期文件的方法是:如果非當前寫文件在一定時間間隔內沒有再次被更新,則認為是過期文件,可以被刪除, RocketMQ 不會關注這個文件上的消息是否全部被消費。默認每個文件的過期時間為72 小時,通過在Broker 配置文件中設置fi leReservedTime 來改變過期時間,單位為小時· 。接下來詳細分析RocketMQ 是如何設計與實現上述機制的。

    private void addScheduleTask() { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.checkSelf(); } }, 1, 10, TimeUnit.MINUTES); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); if (lockTime > 1000 && lockTime < 10000000) { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } } catch (Exception e) { } } } }, 1, 1, TimeUnit.SECONDS); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { // @Override // public void run() { // DefaultMessageStore.this.cleanExpiredConsumerQueue(); // } // }, 1, 1, TimeUnit.HOURS); } 

RocketMQ 會每隔10 s 調度一次cleanFilesPeriodically , 檢測是否需要清除過期文件。執行頻率可以通過設置cleanResourceInterval ,默認為10 s 。

    private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); } 

分別執行清除消息存儲文件( Commitlog 文件)與消息消費隊列文件( ConsumeQueue文件) 。由於消息消費隊列文件與消息存儲文件( Commitlo g )共用一套過期文件刪除機制,本書將重點講解消息存儲過期文件刪除。實現方法: DefaultMessage Store$Clean CommitLogService#deleteExpiredFiles 。

        private void deleteExpiredFiles() { int deleteCount = 0; long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } } 

Step1: 解釋一下這個三個配置屬性的含義。

int deleteCount = 0; long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); 
  • fileReservedTime : 文件保留時間, 也就是從最后一次更新時間到現在, 如果超過了該時間, 則認為是過期文件, 可以被刪除。
  • deletePhysicFilesInterval :刪除物理文件的間隔,因為在一次清除過程中, 可能需要被刪除的文件不止一個,該值指定兩次刪除文件的問隔時間。
  • destroyMapedFilelntervalForcibly : 在清除過期文件時, 如果該文件被其他線程所占用(引用次數大於0 ,比如讀取消息), 此時會阻止此次刪除任務, 同時在第一次試圖刪除該文件時記錄當前時間戳, destroyMapedFile lntervalForcibly 表示第一次拒絕刪除之后能保留的最大時間,在此時間內, 同樣可以被拒絕刪除, 同時會將引用減少1000 個,超過該時間間隔后,文件將被強制刪除。
boolean timeup = this.isTimeToDelete(); boolean spacefull = this.isSpaceToDelete(); boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", fileReservedTime, timeup, spacefull, manualDeleteFileSeveralTimes, cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { log.warn("disk space will be full soon, but delete file failed."); } } 

Step2: RocketMQ 在如下三種情況任意之一滿足的情況下將繼續執行刪除文件操作。

  • 指定刪除文件的時間點, RocketMQ 通過delete When 設置一天的固定時間執行一次刪除過期文件操作, 默認為凌晨4 點。
  • 磁盤空間是否充足,如果磁盤空間不充足,則返回true ,表示應該觸發過期文件刪除操作。
  • 預留,手工觸發,可以通過調用excuteDeleteFilesManualy 方法手工觸發過期文件刪除,目前RocketMQ 暫未封裝手工觸發文件刪除的命令。

本節重點分析一下磁盤空間是否充足的實現邏輯。

        private boolean isSpaceToDelete() { double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; cleanImmediately = false; { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (physicRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } if (physicRatio < 0 || physicRatio > ratio) { DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } } { String storePathLogics = StorePathConfigHelper .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); if (logicsRatio > diskSpaceWarningLevelRatio) { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); } cleanImmediately = true; } else if (logicsRatio > diskSpaceCleanForciblyRatio) { cleanImmediately = true; } else { boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); } } if (logicsRatio < 0 || logicsRatio > ratio) { DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); return true; } } return false; } 

1 )首先解釋一下幾個參數的含義。

  • diskMaxUsedSpaceRatio : 表示commitlog 、consumequeue 文件所在磁盤分區的最大使用量,如果超過該值, 則需要立即清除過期文件。
  • cleanImmediately : 表示是否需要立即執行清除過期文件操作。
  • physicRatio : 當前commitlog 目錄所在的磁盤分區的磁盤使用率,通過File#getTotalSpace()獲取文件所在磁盤分區的總容量,通過File#getFreeSpace() 獲取文件所在磁盤分區剩余容量。
  • diskSpaceWarningLevelRatio : 通過系統參數-Drocketmq.broker.diskSpace WamingLevelRatio設置,默認0.90 。如果磁盤分區使用率超過該闊值,將設置磁盤不可寫,此時會拒絕新消息的寫人。
  • diskSpaceCleanForciblyRatio :通過系統參數-Drocketmq.broker.diskSpaceCleanF orciblyRatio設置, 默認0.85 。如果磁盤分區使用超過該闊值,建議立即執行過期文件清除,但不會拒絕新消息的寫入。

2 ) 如果當前磁盤分區使用率大於diskSpace WarningLeve!Ratio ,設置磁盤不可寫,應該立即啟動過期文件刪除操作;如果當前磁盤分區使用率大於diskSpaceCleanForciblyRatio,建議立即執行過期文件清除;如果磁盤使用率低於diskSpaceCl eanF orcibly Ratio 將恢復磁盤可寫;如果當前磁盤使用率小於diskMax U sedSpaceRatio 則返回false ,表示磁盤使用率正常,否則返回true , 需要執行清除過期文件。

for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } else { //avoid deleting files in the middle break; } } 

執行文件銷毀與刪除。從倒數第二個文件開始遍歷,計算文件的最大存活時間( = 文件的最后一次更新時間+文件存活時間(默認72 小時)) , 如果當前時間大於文件的最大存活時間或需要強制刪除文件(當磁盤使用超過設定的闊值)時則執行MappedFile#destory 方法,清除MappedFile 占有的相關資源,如果執行成功,將該文件加入到待刪除文件列表中,然后統一執行File#delete 方法將文件從物理磁盤中刪除。

4.10.本章小節

RocketMQ 主要存儲文件包含消息文件( commitlog )、消息消費隊列文件(ConsumeQueue)、Hash 索引文件(indexFile)、檢測點文件( checkpoint ) 、abort (關閉異常文件) 。單個消息存儲文件、消息消費隊列文件、Hash 索引文件長度固定以便使用內存映射機制進行文件的讀寫操作。RocketMQ 組織文件以文件的起始偏移量來命名文件,這樣根據偏移量能快速定位到真實的物理文件。RocketMQ 基於內存映射文件機制提供了同步刷盤與異步刷盤兩種機制,異步刷盤是指在消息存儲時先追加到內存映射文件,然后啟動專門的刷盤線程定時將內存中的數據刷寫到磁盤。

Commitlog,消息存儲文件, RocketMQ 為了保證消息發送的高吞吐量,采用單一文件存儲所有主題的消息,保證消息存儲是完全的順序寫,但這樣給文件讀取同樣帶來了不便,為此RocketMQ 為了方便消息消費構建了消息消費隊列文件,基於主題與隊列進行組織, 同時RocketMQ 為消息實現了Hash 索引,可以為消息設置索引鍵,根據索引能夠快速從Commitog 文件中檢索消息。

當消息到達Commitlog 文件后,會通過ReputMessageService 線程接近實時地將消息轉發給消息消費隊列文件與索引文件。為了安全起見, RocketMQ 引人abort 文件,記錄Broker 的停機是正常關閉還是異常關閉,在重啟Broker 時為了保證Commitlog 文件、消息消費隊列文件與Hash 索引文件的正確性,分別采取不同的策略來恢復文件。

RocketMQ 不會永久存儲消息文件、消息消費隊列文件,而是啟用文件過期機制並在磁盤空間不足或默認在凌晨4 點刪除過期文件,文件默認保存72 小時並且在刪除文件時並不會判斷該消息文件上的消息是否被消費。下面一章我們將重點分析有關消息、消費的實現機制。

https://www.jianshu.com/p/8ee482ef6911

https://itzones.cn/2019/07/24/RocketMQ%E5%88%B7%E7%9B%98%E7%AD%96%E7%95%A5/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM