RocketMQ-存儲機制-文件恢復&過期文件刪除


 

broker過期文件刪除機制

RocketMQ會每隔10秒執行文件清理任務

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

主要是刪除commitlog和consumequeue文件

    private void cleanFilesPeriodically() {
        this.cleanCommitLogService.run();
        this.cleanConsumeQueueService.run();
    }

commitlog文件刪除

觸發過期文件刪除的條件:

1)達到配置的時間點

2)磁盤用了超過85%

3)手動執行

      public void run() {
            try {
                //刪除已經失效的
                this.deleteExpiredFiles();
                // 為啥會有掛起的文件呢?
                /**
                 * 第一次刪除有可能失敗,比如有線程引用該過期文件,內存映射清理失敗,都可能導致失敗
                 * 如果文件已經關閉,刪除前檢查沒有通過,可以通過第二次刪除
                 */
                this.redeleteHangedFile();
            } catch (Throwable e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

       private void deleteExpiredFiles() {
            int deleteCount = 0;
            // 文件保留時長 72
            long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
            // 100
            int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
            // 1000*120
            int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
            // 判斷有沒到凌晨4點
            boolean timeup = this.isTimeToDelete();
            // 空間是否上限
            boolean spacefull = this.isSpaceToDelete();
            // 手動刪除    經過20次的調度
            boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;

            if (timeup || spacefull || manualDelete) {

                if (manualDelete)
                    this.manualDeleteFileSeveralTimes--;

                boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;

                log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}", //
                        fileReservedTime, //
                        timeup, //
                        spacefull, //
                        manualDeleteFileSeveralTimes, //
                        cleanAtOnce);

                fileReservedTime *= 60 * 60 * 1000;

                deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval, destroyMapedFileIntervalForcibly, cleanAtOnce); if (deleteCount > 0) {
                } else if (spacefull) {  // 刪除文件失敗
                    log.warn("disk space will be full soon, but delete file failed.");
                }
            }
        }

 

 刪除的數據是針對於3天前的或者當前磁盤已經占用了85%以上。

    public int deleteExpiredFileByTime(final long expiredTime,    // 72h
        final int deleteFilesInterval,  // 0.1s
        final long intervalForcibly,   // 120s
        final boolean cleanImmediately) {
        // commitlog文件可能隨時有寫入,copy一份不影響寫入
        Object[] mfs = this.copyMappedFiles(0);

        if (null == mfs)
            return 0;

        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        // 存放要刪除的MappedFile
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                // 如果文件最新修改已經超過三天或者是磁盤空間達到85%以上  而要在此之前需要滿足3個條件之一,時間,容量,和手動觸發
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    // 真正的刪除邏輯
                    if (mappedFile.destroy(intervalForcibly)) {
                        files.add(mappedFile);
                        deleteCount++;

                        // 當刪除的文件達到10的時候 結束
                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }

                        // 如果沒達到十個  並且  還沒掃描完所有文件
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                //等待0.1s
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                }
            }
        }

        deleteExpiredFile(files);

        return deleteCount;
    }

 

接下來具體看MappedFile的destory過程

public boolean destroy(final long intervalForcibly) {
        this.shutdown(intervalForcibly); if (this.isCleanupOver()) {
            try {
                // 關閉文件通道
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");

                long beginTime = System.currentTimeMillis();
                // 刪除文件
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeEclipseTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }

MappedFile的shutdown,並釋放內存的過程:

public void shutdown(final long intervalForcibly) {
        if (this.available) {
            this.available = false;
            this.firstShutdownTimestamp = System.currentTimeMillis();
            this.release();
        } else if (this.getRefCount() > 0) {   // 說明已經shutdown過了  但是還有引用並且時間已經超過了intervalForcibly
            if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                // 強制回收內存了
                this.refCount.set(-1000 - this.getRefCount());
                this.release();
            }
        }
    }

    //如果還存在引用,返回再等等
    public void release() {
        long value = this.refCount.decrementAndGet();
        if (value > 0)
            return;

        // value <= 0 ,表示已經沒有引用了 或者需要強制cleanup的時候
        synchronized (this) {
            //清理映射的所有內存數據對象,釋放內存
            this.cleanupOver = this.cleanup(value);
        }
    }

 

ConsumeQueue文件刪除

        private void deleteExpiredFiles() {
            // 0.1s
            int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();

            //得到commitlog中第一個文件的起始物理offset
            long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            if (minOffset > this.lastPhysicalMinOffset) {  //發現上次的已經變小了   說明commitlog已經發生過刪除操作了
                this.lastPhysicalMinOffset = minOffset;

                ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;

                for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                    for (ConsumeQueue logic : maps.values()) {
                        // 對某一個消費隊列做刪除  參數是commitlog最小的物理點位
                        int deleteCount = logic.deleteExpiredFile(minOffset); if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                            try {
                                // 當上一個ConsumeQueue成功刪除之后,下一個ConsumeQueue刪除需要等待0.1s
                                Thread.sleep(deleteLogicsFilesInterval);
                            } catch (InterruptedException ignored) {
                            }
                        }
                    }
                }

                // 刪除索引文件
                DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
            }
        }
public int deleteExpiredFile(long offset) {
        int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
        // 重新計算最小的邏輯點位
        this.correctMinOffset(offset);
        return cnt;
    }

 deleteExpiredFileByOffset方法從第一個consumeQueue開始遍歷,拿最后一個offset獲取其物理點位,並比較當前commitlog中最小的物理點位。如果小了,則把這個comsumequeue刪除。

其次遍歷所有的cosumequeue,並從第一個offset開始,直到發現其指定的最小的物理點位>=當前commitlog中最小的物理點位

 

Index文件刪除

 Index文件的刪除原理和consumeQueue一樣。

    public void deleteExpiredFile(long offset) {
        Object[] files = null;
        try {
            this.readWriteLock.readLock().lock();
            if (this.indexFileList.isEmpty()) {
                return;
            }

            long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
            if (endPhyOffset < offset) {  // 這個判斷說明存在了無效的index數據
                files = this.indexFileList.toArray();
            }
        } catch (Exception e) {
            log.error("destroy exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        // 遍歷索引文件   如果發現存在一個索引文件其最后一個物理offset比當前最小的物理ofset還小,說明這個index文件可以刪除了
        if (files != null) {
            List<IndexFile> fileList = new ArrayList<IndexFile>();
            for (int i = 0; i < (files.length - 1); i++) {
                IndexFile f = (IndexFile) files[i];
                if (f.getEndPhyOffset() < offset) {
                    fileList.add(f);
                } else {
                    break;
                }
            }

            this.deleteExpiredFile(fileList);
        }
    }

 

 

broker存儲文件恢復

 

private void recover(final boolean lastExitOK) {
        // 恢復consumeQueue
        this.recoverConsumeQueue();


        if (lastExitOK) {
            // 上次正常退出
            this.commitLog.recoverNormally();
        } else {
            // 上次非正常退出
            this.commitLog.recoverAbnormally();
        }

        //consumerlog寫進度  保存
this.recoverTopicQueueTable();
}

恢復consumelog的邏輯其實很簡單,從倒數第三個文件開始,逐條遍歷消息,如果取出的物理點位大於0並且message的size大於0,說明數據有效。

 

    public void recover() {
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {

            int index = mappedFiles.size() - 3;
            if (index < 0)
                index = 0;

            int mappedFileSizeLogics = this.mappedFileSize;
            MappedFile mappedFile = mappedFiles.get(index);
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            long processOffset = mappedFile.getFileFromOffset();
            long mappedFileOffset = 0;
            long maxExtAddr = 1;
            while (true) {
                for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    long tagsCode = byteBuffer.getLong();

                    if (offset >= 0 && size > 0) {
                        mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                        this.maxPhysicOffset = offset;
                        if (isExtAddr(tagsCode)) {
                            maxExtAddr = tagsCode;
                        }
                    } else {
                        log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                            + offset + " " + size + " " + tagsCode);
                        break;
                    }
                }

                // 讀完一個comsumelog
                if (mappedFileOffset == mappedFileSizeLogics) {
                    index++;
                    if (index >= mappedFiles.size()) {

                        log.info("recover last consume queue file over, last maped file "
                            + mappedFile.getFileName());
                        break;
                    } else {
                        // 讀下一個文件
                        mappedFile = mappedFiles.get(index);
                        byteBuffer = mappedFile.sliceByteBuffer();
                        processOffset = mappedFile.getFileFromOffset();
                        mappedFileOffset = 0;
                        log.info("recover next consume queue file, " + mappedFile.getFileName());
                    }
                } else {
                    log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                        + (processOffset + mappedFileOffset));
                    break;
                }
            }

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

            if (isExtReadEnable()) {
                this.consumeQueueExt.recover();
                log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    }

恢復commitlog分正常退出和非正常退出。

正常退出的commitlog所有數據都是flush完成的,所以只要從倒數第三個文件開始恢復即可,遍歷每一個message,並校驗其CRC。

非正常退出則從最后一個文件開始恢復,一般出現問題的都是最后一個文件,然后獲取文件中的第一個message,其存儲時間是否小於checkpoint時間點中的最小的一個,如果是,表示其就是需要恢復的起始文件。然后檢驗每一個message的CRC,並將通過校驗的數據dispatch到consumelog和index文件中。

 

當前consumer寫的進度,寫的邏輯點位保存

private void recoverTopicQueueTable() {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        // 第一個commit log中的起始點位
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                // consumerlog寫進度保存
                table.put(key, logic.getMaxOffsetInQueue());
                // commitlog中最小的物理點位 來計算出consumelog中最小的邏輯點位minLogicOffset
                logic.correctMinOffset(minPhyOffset);
            }
        }

        // <String/* topic-queueid */, Long/* logicOffset */>
        this.commitLog.setTopicQueueTable(table);
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM