RocketMq刷盤機制


RocketMq刷盤機制

handleDiskFlush

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush 同步刷盤
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
      //① 同步刷盤使用GroupCommitService來刷盤
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
      //MessageConst.PROPERTY_WAIT_STORE_MSG_OK屬性是否為true 默認Message構造中  都為true
      if (messageExt.isWaitStoreMsgOK()) {
        		//創建commit請求 將數據從mappedByteBuffer中刷盤到磁盤中
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            //阻塞當前進程,等待刷盤完成或者5秒超時返回
            boolean flushOK =            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            //如果刷盤失敗打印日志
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush ②
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {	
          //transientStorePoolEnable沒有開啟或者是從broker
            flushCommitLogService.wakeup();
        } else {
          //transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType() && BrokerRole.SLAVE != getBrokerRole() 開啟transientStorePoolEnable,且刷盤模式是異步刷盤,且角色不說從broker
            commitLogService.wakeup();
        }
    }
}

①同步刷盤使用GroupCommitService

②異步刷盤 且開啟了transientStorePoolEnable且不是從服務器,使用CommitLogService 否則使用FlushCommitLogService刷盤

GroupCommitService

public synchronized void putRequest(final GroupCommitRequest request) {
    synchronized (this.requestsWrite) {
        this.requestsWrite.add(request);
    }
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown(); // notify
    }
}

putRequest 提交刷盤請求

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        try {
            //等待運行 沒有任務就睡眠10毫秒
            this.waitForRunning(10);
            //提交刷盤
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }

    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }

    synchronized (this) {
        this.swapRequests();
    }

    this.doCommit();

    CommitLog.log.info(this.getServiceName() + " service end");
}
private void doCommit() {
    synchronized (this.requestsRead) {
        if (!this.requestsRead.isEmpty()) {
            //處理刷盤請求
            for (GroupCommitRequest req : this.requestsRead) {
                // There may be a message in the next file, so a maximum of
                // two times the flush
                boolean flushOK = false;
                //刷盤
                for (int i = 0; i < 2 && !flushOK; i++) {
                  	//刷盤指針是否大於寫指針
                    flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                  	//如果還有數據可以刷就進行刷盤
                    if (!flushOK) {
                        CommitLog.this.mappedFileQueue.flush(0);
                    }
                }
								//喚醒等待刷盤完成的阻塞線程
                req.wakeupCustomer(flushOK);
            }

            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }

            this.requestsRead.clear();
        } else {
            // Because of individual messages is set to not sync flush, it
            // will come to this process
            CommitLog.this.mappedFileQueue.flush(0);
        }
    }
}

這里調用了mappedFileQueue.flush(0)進行刷盤

mappedFileQueue.flush(0)

public boolean flush(final int flushLeastPages) {
    boolean result = true;
    //①根據刷盤指針找到對應的文件
    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
    if (mappedFile != null) {
        long tmpTimeStamp = mappedFile.getStoreTimestamp();
        //②核心邏輯調用 mappedFile#flush刷盤
        int offset = mappedFile.flush(flushLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.flushedWhere;
        this.flushedWhere = where;
        if (0 == flushLeastPages) {
            this.storeTimestamp = tmpTimeStamp;
        }
    }

    return result;
}

①根據刷盤指針找到對應的文件

②核心邏輯調用 mappedFile#flush刷盤

mappedFile.flush(0)

public int flush(final int flushLeastPages) {
    //①判斷是否需要進行刷盤 這里傳0的話表示強制刷盤
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            //this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();  writeBuffer存在就用wrotePosition指針,否則用committedPosition指針
            int value = getReadPosition();

            try {
                //We only append data to fileChannel or mappedByteBuffer, never both.
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    //writeBuffer存在或者fileChannel的position不為0用fileChannel刷盤
                    this.fileChannel.force(false);
                } else {
                    //直接用mappedByteBuffer刷盤
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
          	//設置刷盤指針
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    //返回刷盤指針位置
    return this.getFlushedPosition();
}

FlushRealTimeService 異步刷盤

未開啟transientStorePoolEnable

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        //刷盤線程是否休眠 默認false
        boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
        //間隔500毫秒
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //刷盤至少滿4頁
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
        //強制刷盤間隔 10秒 (未滿4頁也刷盤 防止數據丟失)
        int flushPhysicQueueThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

        boolean printFlushProgress = false;

        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        //具體上次強制刷盤時間超過10秒
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            //設置成0,將觸發強制刷盤
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = (printTimes++ % 10) == 0;
        }

        try {
            //刷盤線程是否休眠 默認false
            if (flushCommitLogTimed) {
                Thread.sleep(interval);
            } else {
                //線程阻塞500毫秒 中途可被喚醒
                this.waitForRunning(interval);
            }

            if (printFlushProgress) {
                this.printFlushProgress();
            }

            long begin = System.currentTimeMillis();
            //至少滿4頁才刷盤 但是每10秒將會強制刷盤一次,flushPhysicQueueLeastPages會被設置為0
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            long past = System.currentTimeMillis() - begin;
            if (past > 500) {
                log.info("Flush data to disk costs {} ms", past);
            }
        } catch (Throwable e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }
    }

CommitRealTimeService

異步將writeBuffer的數據刷到fileChannel

public void run() {
    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
      	//默認200毫秒阻塞等待喚醒
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        //至少4頁才把數據commit到fileChannel
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        //強制commit 每隔200毫秒
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

        long begin = System.currentTimeMillis();
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            commitDataLeastPages = 0;
        }

        try {
          	//核心邏輯 將數據commit到fileChannel
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            //result是false 表示有數據commit了
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                //喚醒刷盤線程
                flushCommitLogService.wakeup();
            }

            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }

    boolean result = false;
    for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
        result = CommitLog.this.mappedFileQueue.commit(0);
        CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
    }
    CommitLog.log.info(this.getServiceName() + " service end");
}

mappedFileQueue.commit

public boolean commit(final int commitLeastPages) {
    boolean result = true;
    //根據當前已提交偏移量找到對應的文件
    MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
    if (mappedFile != null) {
        //委托mappedFile commit數據
        int offset = mappedFile.commit(commitLeastPages);
        long where = mappedFile.getFileFromOffset() + offset;
        result = where == this.committedWhere;
        this.committedWhere = where;
    }

    return result;
}

mappedFile.commit

public int commit(final int commitLeastPages) {
    //writeBuffer是null,這應該是不正常的
    if (writeBuffer == null) {
        //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
        return this.wrotePosition.get();
    }
  	//是否可以進行commit,至少堆積commitLeastPages頁數據 為0的話表示強制commit
    if (this.isAbleToCommit(commitLeastPages)) {
        if (this.hold()) {
            //調用commit0
            commit0(commitLeastPages);
            this.release();
        } else {
            log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
        }
    }

    // All dirty data has been committed to FileChannel.  文件已經寫滿且已經全commit,可以把writeBuffer歸還給池子里了
    if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
        this.transientStorePool.returnBuffer(writeBuffer);
        this.writeBuffer = null;
    }
  	//返回commit指針
    return this.committedPosition.get();
}
protected void commit0(final int commitLeastPages) {
    int writePos = this.wrotePosition.get();
    int lastCommittedPosition = this.committedPosition.get();
    
    if (writePos - this.committedPosition.get() > 0) {
        try {
            //將lastCommittedPosition和writePos之間的數據刷到fileChannel中
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            //設置已提交指針為writePos
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM