broker過期文件刪除機制
RocketMQ會每隔10秒執行文件清理任務
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
主要是刪除commitlog和consumequeue文件
private void cleanFilesPeriodically() { this.cleanCommitLogService.run(); this.cleanConsumeQueueService.run(); }
commitlog文件刪除
觸發過期文件刪除的條件:
1)達到配置的時間點
2)磁盤用了超過85%
3)手動執行
public void run() { try { //刪除已經失效的 this.deleteExpiredFiles(); // 為啥會有掛起的文件呢? /** * 第一次刪除有可能失敗,比如有線程引用該過期文件,內存映射清理失敗,都可能導致失敗 * 如果文件已經關閉,刪除前檢查沒有通過,可以通過第二次刪除 */ this.redeleteHangedFile(); } catch (Throwable e) { DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e); } }
private void deleteExpiredFiles() { int deleteCount = 0; // 文件保留時長 72 long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime(); // 100 int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval(); // 1000*120 int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly(); // 判斷有沒到凌晨4點 boolean timeup = this.isTimeToDelete(); // 空間是否上限 boolean spacefull = this.isSpaceToDelete(); // 手動刪除 經過20次的調度 boolean manualDelete = this.manualDeleteFileSeveralTimes > 0; if (timeup || spacefull || manualDelete) { if (manualDelete) this.manualDeleteFileSeveralTimes--; boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately; log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", // fileReservedTime, // timeup, // spacefull, // manualDeleteFileSeveralTimes, // cleanAtOnce); fileReservedTime *= 60 * 60 * 1000; deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) { } else if (spacefull) { // 刪除文件失敗 log.warn("disk space will be full soon, but delete file failed."); } } }
刪除的數據是針對於3天前的或者當前磁盤已經占用了85%以上。
public int deleteExpiredFileByTime(final long expiredTime, // 72h final int deleteFilesInterval, // 0.1s final long intervalForcibly, // 120s final boolean cleanImmediately) { // commitlog文件可能隨時有寫入,copy一份不影響寫入 Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return 0; int mfsLength = mfs.length - 1; int deleteCount = 0; // 存放要刪除的MappedFile List<MappedFile> files = new ArrayList<MappedFile>(); if (null != mfs) { for (int i = 0; i < mfsLength; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime; // 如果文件最新修改已經超過三天或者是磁盤空間達到85%以上 而要在此之前需要滿足3個條件之一,時間,容量,和手動觸發 if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) { // 真正的刪除邏輯 if (mappedFile.destroy(intervalForcibly)) { files.add(mappedFile); deleteCount++; // 當刪除的文件達到10的時候 結束 if (files.size() >= DELETE_FILES_BATCH_MAX) { break; } // 如果沒達到十個 並且 還沒掃描完所有文件 if (deleteFilesInterval > 0 && (i + 1) < mfsLength) { try { //等待0.1s Thread.sleep(deleteFilesInterval); } catch (InterruptedException e) { } } } else { break; } } } } deleteExpiredFile(files); return deleteCount; }
接下來具體看MappedFile的destory過程
public boolean destroy(final long intervalForcibly) { this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { // 關閉文件通道 this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 刪除文件 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeEclipseTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false; }
MappedFile的shutdown,並釋放內存的過程:
public void shutdown(final long intervalForcibly) { if (this.available) { this.available = false; this.firstShutdownTimestamp = System.currentTimeMillis(); this.release(); } else if (this.getRefCount() > 0) { // 說明已經shutdown過了 但是還有引用並且時間已經超過了intervalForcibly if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) { // 強制回收內存了 this.refCount.set(-1000 - this.getRefCount()); this.release(); } } } //如果還存在引用,返回再等等 public void release() { long value = this.refCount.decrementAndGet(); if (value > 0) return; // value <= 0 ,表示已經沒有引用了 或者需要強制cleanup的時候 synchronized (this) { //清理映射的所有內存數據對象,釋放內存 this.cleanupOver = this.cleanup(value); } }
ConsumeQueue文件刪除
private void deleteExpiredFiles() { // 0.1s int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval(); //得到commitlog中第一個文件的起始物理offset long minOffset = DefaultMessageStore.this.commitLog.getMinOffset(); if (minOffset > this.lastPhysicalMinOffset) { //發現上次的已經變小了 說明commitlog已經發生過刪除操作了 this.lastPhysicalMinOffset = minOffset; ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable; for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) { for (ConsumeQueue logic : maps.values()) { // 對某一個消費隊列做刪除 參數是commitlog最小的物理點位 int deleteCount = logic.deleteExpiredFile(minOffset); if (deleteCount > 0 && deleteLogicsFilesInterval > 0) { try { // 當上一個ConsumeQueue成功刪除之后,下一個ConsumeQueue刪除需要等待0.1s Thread.sleep(deleteLogicsFilesInterval); } catch (InterruptedException ignored) { } } } } // 刪除索引文件 DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset); } }
public int deleteExpiredFile(long offset) { int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE); // 重新計算最小的邏輯點位 this.correctMinOffset(offset); return cnt; }
deleteExpiredFileByOffset方法從第一個consumeQueue開始遍歷,拿最后一個offset獲取其物理點位,並比較當前commitlog中最小的物理點位。如果小了,則把這個comsumequeue刪除。
其次遍歷所有的cosumequeue,並從第一個offset開始,直到發現其指定的最小的物理點位>=當前commitlog中最小的物理點位
Index文件刪除
Index文件的刪除原理和consumeQueue一樣。
public void deleteExpiredFile(long offset) { Object[] files = null; try { this.readWriteLock.readLock().lock(); if (this.indexFileList.isEmpty()) { return; } long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset(); if (endPhyOffset < offset) { // 這個判斷說明存在了無效的index數據 files = this.indexFileList.toArray(); } } catch (Exception e) { log.error("destroy exception", e); } finally { this.readWriteLock.readLock().unlock(); } // 遍歷索引文件 如果發現存在一個索引文件其最后一個物理offset比當前最小的物理ofset還小,說明這個index文件可以刪除了 if (files != null) { List<IndexFile> fileList = new ArrayList<IndexFile>(); for (int i = 0; i < (files.length - 1); i++) { IndexFile f = (IndexFile) files[i]; if (f.getEndPhyOffset() < offset) { fileList.add(f); } else { break; } } this.deleteExpiredFile(fileList); } }
broker存儲文件恢復
private void recover(final boolean lastExitOK) { // 恢復consumeQueue this.recoverConsumeQueue(); if (lastExitOK) { // 上次正常退出 this.commitLog.recoverNormally(); } else { // 上次非正常退出 this.commitLog.recoverAbnormally(); } //consumerlog寫進度 保存
this.recoverTopicQueueTable();
}
恢復consumelog的邏輯其實很簡單,從倒數第三個文件開始,逐條遍歷消息,如果取出的物理點位大於0並且message的size大於0,說明數據有效。
public void recover() { final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { int index = mappedFiles.size() - 3; if (index < 0) index = 0; int mappedFileSizeLogics = this.mappedFileSize; MappedFile mappedFile = mappedFiles.get(index); ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; long maxExtAddr = 1; while (true) { for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); long tagsCode = byteBuffer.getLong(); if (offset >= 0 && size > 0) { mappedFileOffset = i + CQ_STORE_UNIT_SIZE; this.maxPhysicOffset = offset; if (isExtAddr(tagsCode)) { maxExtAddr = tagsCode; } } else { log.info("recover current consume queue file over, " + mappedFile.getFileName() + " " + offset + " " + size + " " + tagsCode); break; } } // 讀完一個comsumelog if (mappedFileOffset == mappedFileSizeLogics) { index++; if (index >= mappedFiles.size()) { log.info("recover last consume queue file over, last maped file " + mappedFile.getFileName()); break; } else { // 讀下一個文件 mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next consume queue file, " + mappedFile.getFileName()); } } else { log.info("recover current consume queue queue over " + mappedFile.getFileName() + " " + (processOffset + mappedFileOffset)); break; } } processOffset += mappedFileOffset; this.mappedFileQueue.setFlushedWhere(processOffset); this.mappedFileQueue.setCommittedWhere(processOffset); this.mappedFileQueue.truncateDirtyFiles(processOffset); if (isExtReadEnable()) { this.consumeQueueExt.recover(); log.info("Truncate consume queue extend file by max {}", maxExtAddr); this.consumeQueueExt.truncateByMaxAddress(maxExtAddr); } } }
恢復commitlog分正常退出和非正常退出。
正常退出的commitlog所有數據都是flush完成的,所以只要從倒數第三個文件開始恢復即可,遍歷每一個message,並校驗其CRC。
非正常退出則從最后一個文件開始恢復,一般出現問題的都是最后一個文件,然后獲取文件中的第一個message,其存儲時間是否小於checkpoint時間點中的最小的一個,如果是,表示其就是需要恢復的起始文件。然后檢驗每一個message的CRC,並將通過校驗的數據dispatch到consumelog和index文件中。
當前consumer寫的進度,寫的邏輯點位保存
private void recoverTopicQueueTable() { HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024); // 第一個commit log中的起始點位 long minPhyOffset = this.commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { String key = logic.getTopic() + "-" + logic.getQueueId(); // consumerlog寫進度保存 table.put(key, logic.getMaxOffsetInQueue()); // commitlog中最小的物理點位 來計算出consumelog中最小的邏輯點位minLogicOffset logic.correctMinOffset(minPhyOffset); } } // <String/* topic-queueid */, Long/* logicOffset */> this.commitLog.setTopicQueueTable(table); }