rocketmq的broker恢復commit-log的時候如何恢復consumeQueue、indexfile


如果一個broker正常退出,是會刪除abort文件的。那么啟動broker的時候發現abort文件還存在,那么說明上次是異常終止,會進入到commit-log的recoverAbnormally邏輯里面,因為所有其他的信息都是從commit-log獲取到的,所以追根溯源只能從commit-log開始着手。

    public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) {
        // recover by the minimum time stamp
        boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
        final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
        if (!mappedFiles.isEmpty()) {
            // Looking beginning to recover from which file
            int index = mappedFiles.size() - 1;
            MappedFile mappedFile = null;
            for (; index >= 0; index--) {
                mappedFile = mappedFiles.get(index);
                if (this.isMappedFileMatchedRecover(mappedFile)) {
                    log.info("recover from this mapped file " + mappedFile.getFileName());
                    break;
                }
            }

 從最新的mapped文件開始找,直到發現一個有效的mappedfile,對這個mappedfile文件執行操作:

        ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
        while (true) {
                DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                int size = dispatchRequest.getMsgSize();

                if (dispatchRequest.isSuccess()) {
                    // Normal data
                    if (size > 0) {
                        mappedFileOffset += size;

                        if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
                            if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {
                                this.defaultMessageStore.doDispatch(dispatchRequest);
                            }
                        } else {
                            this.defaultMessageStore.doDispatch(dispatchRequest);
                        }
                    }

  checkMessageAndReturnSize的邏輯:

    public DispatchRequest checkMessageAndReturnSize(java.nio.ByteBuffer byteBuffer, final boolean checkCRC,
        final boolean readBody) {
        try {
            // 1 TOTAL SIZE
            int totalSize = byteBuffer.getInt();

  通過不變遍歷這個最新的mappedfile的bytebuffer,把他的每一條消息取出來,並且消息一個字段就是total-size,累計求和就可以拿到這個mappedfile上次掛掉的時候到底寫到哪了。對於其他的mappedfile默認認為都是寫滿的。

 

            processOffset += mappedFileOffset;
            this.mappedFileQueue.setFlushedWhere(processOffset);
            this.mappedFileQueue.setCommittedWhere(processOffset);
            this.mappedFileQueue.truncateDirtyFiles(processOffset);

  

這里已經把commit-log完全恢復了。接下來就是恢復consumequeue、indexfile了。前一節提到生成消息的時候,其實也會同步更新consumeQueue,並且刷盤。

也就是不管是故障恢復還是正常生成消息,都會涉及commit-log、consumeQueue、indexfile的更新、落盤。只不過后面兩個是異步的。

他們都是通過this.defaultMessageStore.doDispatch(dispatchRequest)更新的

 

另外topicQueueTable完全可以通過consumeQueue得到,因為前者是后者子集:

 

    private void recoverTopicQueueTable() {
        HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);
        long minPhyOffset = this.commitLog.getMinOffset();
        for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
            for (ConsumeQueue logic : maps.values()) {
                String key = logic.getTopic() + "-" + logic.getQueueId();
                table.put(key, logic.getMaxOffsetInQueue());
                logic.correctMinOffset(minPhyOffset);
            }
        }

        this.commitLog.setTopicQueueTable(table);
    }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM