Broker 過期文件刪除機制
RocketMQ 中主要保存了 CommitLog、Consume Queue、Index File 三種數據文件。由於內存和磁盤都是有限的資源,Broker 不可能永久地保存所有數據,所以一些超過保存期限的數據會被定期刪除。RocketMQ 通過設置數據過期時間來刪除額外的數據文件,具體的實現邏輯是通過 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\DefaultMessageStore.start() 方法啟動的周期性執行方法 cleanFilesPeriodically()方法,該方法的代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\DefaultMessageStore.java 來實現的。
一、CommitLog 文件的刪除過程
CommitLog 文件由 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\DefaultMessageStore.CleanCommitLogService 類提供的一個線程服務周期執行刪除操作,代碼路徑:
1 class CleanCommitLogService { 2 3 private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; 4 private final double diskSpaceWarningLevelRatio = 5 Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90")); 6 7 private final double diskSpaceCleanForciblyRatio = 8 Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85")); 9 private long lastRedeleteTimestamp = 0; 10 11 private volatile int manualDeleteFileSeveralTimes = 0; 12 13 private volatile boolean cleanImmediately = false; 14 15 public void excuteDeleteFilesManualy() { 16 this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES; 17 DefaultMessageStore.log.info("executeDeleteFilesManually was invoked"); 18 } 19 20 public void run() { 21 try { 22 this.deleteExpiredFiles(); #刪除過期文件 23 24 this.redeleteHangedFile(); 25 } catch (Throwable e) { 26 DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); 27 } 28 } 29 30 private void deleteExpiredFiles() { 31 int deleteCount = 0; 32 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); 33 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); 34 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); 35 36 boolean timeup = this.isTimeToDelete(); 37 boolean spacefull = this.isSpaceToDelete(); 38 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; 39 40 if (timeup || spacefull || manualDelete) { 41 42 if (manualDelete) 43 this.manualDeleteFileSeveralTimes--; 44 45 boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; 46 47 log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", 48 fileReservedTime, 49 timeup, 50 spacefull, 51 manualDeleteFileSeveralTimes, 52 cleanAtOnce); 53 54 fileReservedTime *= 60 * 60 * 1000; 55 56 deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, 57 destroyMapedFileIntervalForcibly, cleanAtOnce); 58 if (deleteCount > 0) { 59 } else if (spacefull) { 60 log.warn("disk space will be full soon, but delete file failed."); 61 } 62 } 63 } 64 65 private void redeleteHangedFile() { 66 int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval(); 67 long currentTimestamp = System.currentTimeMillis(); 68 if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) { 69 this.lastRedeleteTimestamp = currentTimestamp; 70 int destroyMapedFileIntervalForcibly = 71 DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); 72 if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) { 73 } 74 } 75 } 76 77 public String getServiceName() { 78 return CleanCommitLogService.class.getSimpleName(); 79 } 80 81 private boolean isTimeToDelete() { 82 String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen(); 83 if (UtilAll.isItTimeToDo(when)) { 84 DefaultMessageStore.log.info("it's time to reclaim disk space, " + when); 85 return true; 86 } 87 88 return false; 89 } 90 91 private boolean isSpaceToDelete() { 92 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; 93 94 cleanImmediately = false; 95 96 { 97 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic()); 98 if (physicRatio > diskSpaceWarningLevelRatio) { 99 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 100 if (diskok) { 101 DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); 102 } 103 104 cleanImmediately = true; 105 } else if (physicRatio > diskSpaceCleanForciblyRatio) { 106 cleanImmediately = true; 107 } else { 108 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 109 if (!diskok) { 110 DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); 111 } 112 } 113 114 if (physicRatio < 0 || physicRatio > ratio) { 115 DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); 116 return true; 117 } 118 } 119 120 { 121 String storePathLogics = StorePathConfigHelper 122 .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir()); 123 double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics); 124 if (logicsRatio > diskSpaceWarningLevelRatio) { 125 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 126 if (diskok) { 127 DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full"); 128 } 129 130 cleanImmediately = true; 131 } else if (logicsRatio > diskSpaceCleanForciblyRatio) { 132 cleanImmediately = true; 133 } else { 134 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 135 if (!diskok) { 136 DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok"); 137 } 138 } 139 140 if (logicsRatio < 0 || logicsRatio > ratio) { 141 DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio); 142 return true; 143 } 144 } 145 146 return false; 147 } 148 149 public int getManualDeleteFileSeveralTimes() { 150 return manualDeleteFileSeveralTimes; 151 } 152 153 public void setManualDeleteFileSeveralTimes(int manualDeleteFileSeveralTimes) { 154 this.manualDeleteFileSeveralTimes = manualDeleteFileSeveralTimes; 155 } 156 public boolean isSpaceFull() { 157 String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); 158 double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); 159 double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; 160 if (physicRatio > ratio) { 161 DefaultMessageStore.log.info("physic disk of commitLog used: " + physicRatio); 162 } 163 if (physicRatio > this.diskSpaceWarningLevelRatio) { 164 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); 165 if (diskok) { 166 DefaultMessageStore.log.error("physic disk of commitLog maybe full soon, used " + physicRatio + ", so mark disk full"); 167 } 168 169 return true; 170 } else { 171 boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); 172 173 if (!diskok) { 174 DefaultMessageStore.log.info("physic disk space of commitLog OK " + physicRatio + ", so mark disk ok"); 175 } 176 177 return false; 178 } 179 } 180 }
this.deleteExpiredFiles(),當滿足3個條件時執行刪除操作:
- 第一,當前時間等於已經配置的刪除時間。
- 第二,磁盤使用空間超過85%。
- 第三,手動執行刪除
上面代碼,第56行,DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,destroyMapedFileIntervalForcibly, cleanAtOnce),代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\DefaultMessageStore.java,該方法調用的了return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately),代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\CommitLog.java。我么講講this.mappedFileQueue.deleteExpiredFileByTime()方法是如何刪除 CommitLog文件的,代碼如下:
1 public int deleteExpiredFile( 2 final long expiredTime, 3 final int deleteFilesInterval, 4 final long intervalForcibly, 5 final boolean cleanImmediately 6 ) { 7 return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately); 8 }
1 public int deleteExpiredFileByTime(final long expiredTime, 2 final int deleteFilesInterval, 3 final long intervalForcibly, 4 final boolean cleanImmediately) { 5 Object[] mfs = this.copyMappedFiles(0); #全部 commitLog 文件 6 7 if (null == mfs) 8 return 0; 9 10 int mfsLength = mfs.length - 1; 11 int deleteCount = 0; 12 List<MappedFile> files = new ArrayList<MappedFile>(); #已經刪除的文件 13 if (null != mfs) { 14 for (int i = 0; i < mfsLength; i++) { 15 MappedFile mappedFile = (MappedFile) mfs[i]; 16 long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; 17 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { #刪除條件:過期或者必須立即刪除 18 if (mappedFile.destroy(intervalForcibly)) { #關閉文件映射,刪除物理文件 19 files.add(mappedFile); 20 deleteCount++; 21 22 if (files.size() >= DELETE_FILES_BATCH_MAX) { 23 break; 24 } 25 26 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { 27 try { 28 Thread.sleep(deleteFilesInterval); 29 } catch (InterruptedException e) { 30 } 31 } 32 } else { 33 break; 34 } 35 } else { 36 //avoid deleting files in the middle 37 break; 38 } 39 } 40 } 41 42 deleteExpiredFile(files); #刪除內存中的文件信息 43 44 return deleteCount; 45 }
deleteExpiredFileByTime()方法的實現分為如下兩步:
- 克隆全部的 CommitLog 文件。CommitLog 文件可能隨時有數據寫入,為了不影響正常寫入,所以可能一份來操作。
- 檢查每一個 CommitLog 文件是否過期,如果已過期則立即通過調用 destroy() 方法進行刪除。在刪除前會做一系列檢查:檢查文件被引用的次數、清理映射的所有內存數據對象、釋放內存。清理完成后,刪除物理文件。
二、Consume Queue、Index File 文件的刪除過程
Consume Queue 和 Index File 都是索引文件,在 CommitLog 文件被刪除后,對應的索引文件其實沒有存在的意義,並且占用磁盤空間,所以這些文件應該被刪除。
RocketMQ 的刪除策略是定時檢查,滿足刪除條件時會刪除過期或者無意義的文件。
程序調用 D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\DefaultMessageStore.java 中 deleteExpiredFiles(),代碼如下:
1 private void deleteExpiredFiles() { 2 int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); 3 4 long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); #CommitLog 全部文件中的最小物理位點。 5 if (minOffset > this.lastPhysicalMinOffset) { #上次檢查到的最小物理位點。當 if (minOffset > this.lastPhysicalMinOffset) 條件成立時,說明當前有新數據沒有被檢查過,就會 6 調用 org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset()方法進行檢查及刪除。 7 this.lastPhysicalMinOffset = minOffset; 8 9 ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; 10 11 for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { 12 for (ConsumeQueue logic : maps.values()) { 13 int deleteCount = logic.deleteExpiredFile(minOffset); 14 15 if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { 16 try { 17 Thread.sleep(deleteLogicsFilesInterval); 18 } catch (InterruptedException ignored) { 19 } 20 } 21 } 22 } 23 24 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); 25 } 26 }
org.apache.rocketmq.store.MappedFileQueue.deleteExpiredFileByOffset() 的代碼路徑:D:\rocketmq-master\store\src\main\java\org\apache\rocketmq\store\MappedFileQueue.java,代碼如下:
1 public int deleteExpiredFileByOffset(long offset, int unitSize) { 2 Object[] mfs = this.copyMappedFiles(0); 3 4 List<MappedFile> files = new ArrayList<MappedFile>(); 5 int deleteCount = 0; 6 if (null != mfs) { 7 8 int mfsLength = mfs.length - 1; 9 10 for (int i = 0; i < mfsLength; i++) { 11 boolean destroy; 12 MappedFile mappedFile = (MappedFile) mfs[i]; 13 SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize); 14 if (result != null) { 15 long maxOffsetInLogicQueue = result.getByteBuffer().getLong(); 16 result.release(); 17 destroy = maxOffsetInLogicQueue < offset; #maxOffsetInLogicQueue:Consume Queue 中最大的位點值。 offset:檢查的最小位點。如果maxOffsetInLogicQueue < offset 成立,則說明 Consume Queue 已經過期了,可以刪除。 18 if (destroy) { 19 log.info("physic min offset " + offset + ", logics in current mappedFile max offset " 20 + maxOffsetInLogicQueue + ", delete it"); 21 } 22 } else if (!mappedFile.isAvailable()) { // Handle hanged file. 說明存儲服務已經被關閉(或者該文件曾經被刪除,但是刪除失敗) 23 log.warn("Found a hanged consume queue file, attempting to delete it."); 24 destroy = true; 25 } else { 26 log.warn("this being not executed forever."); 27 break; 28 } 29 30 if (destroy && mappedFile.destroy(1000 * 60)) { 31 files.add(mappedFile); 32 deleteCount++; 33 } else { 34 break; 35 } 36 } 37 } 38 39 deleteExpiredFile(files); 40 41 return deleteCount; 42 }