RocketMQ 拉取消息-文件獲取


看完了上一篇的《RocketMQ 拉取消息-通信模塊》,請求進入PullMessageProcessor中,接着

PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中調用了:

 final GetMessageResult getMessageResult =
                this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                        requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);

來從硬盤中獲取消息體。

接着來看看DefaultMessageStore是如何從消息隊列中獲取消息的:

    /**
     * 獲取消息結果
     * 所有的參數都是從requestheader中獲取的。也就是說從consumer client端傳遞過來的。
     * @param group
     * @param topic
     * @param queueId
     * @param offset
     * @param maxMsgNums
     * @param subscriptionData
     * @return
     */
    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums,
                                       final SubscriptionData subscriptionData) {
        if (this.shutdown) {
            log.warn("message store has shutdown, so getMessage is forbidden");
            return null;
        }

        if (!this.runningFlags.isReadable()) {
            log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());
            return null;
        }

        long beginTime = this.getSystemClock().now();

        // 枚舉變量,取消息結果
        GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
        // 當被過濾后,返回下一次開始的Offset
        long nextBeginOffset = offset;
        // 邏輯隊列中的最小Offset
        long minOffset = 0;
        // 邏輯隊列中的最大Offset
        long maxOffset = 0;

        GetMessageResult getResult = new GetMessageResult();


        final long maxOffsetPy = this.commitLog.getMaxOffset();

        //通過topic和queueid查找邏輯隊列對象,相當於字典的目錄,用來指定消息在物理文件commitlog上的位置
        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
        if (consumeQueue != null) {
            minOffset = consumeQueue.getMinOffsetInQuque(); // 消費隊列 最小隊列編號
            maxOffset = consumeQueue.getMaxOffsetInQuque(); // 消費隊列 最大隊列編號

            // 判斷 隊列位置(offset)
            if (maxOffset == 0) {   // 消費隊列無消息
                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;
                nextBeginOffset = nextOffsetCorrection(offset, 0);
            } else if (offset < minOffset) {    // 查詢offset 太小!!!這里可一看出offset的意義
                status = GetMessageStatus.OFFSET_TOO_SMALL;
                nextBeginOffset = nextOffsetCorrection(offset, minOffset);
            } else if (offset == maxOffset) {   // 查詢offset 超過 消費隊列 一個位置
                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;
                nextBeginOffset = nextOffsetCorrection(offset, offset);
            } else if (offset > maxOffset) {    // 查詢offset 超過 消費隊列 太多(大於一個位置)
                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;
                if (0 == minOffset) {
                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);
                } else {
                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);
                }
            } else {
                // 獲得 映射Buffer結果(MappedFile)
                SelectMapedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);
                if (bufferConsumeQueue != null) {
                    try {
                        status = GetMessageStatus.NO_MATCHED_MESSAGE;

                        long nextPhyFileStartOffset = Long.MIN_VALUE;   // commitLog下一個文件(MappedFile)對應的開始offset。
                        long maxPhyOffsetPulling = 0;   // 消息物理位置拉取到的最大offset

                        int i = 0;
                        final int MaxFilterMessageCount = 16000;
                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();
                        // 循環獲取 消息位置信息
                        for (; i < bufferConsumeQueue.getSize() && i < MaxFilterMessageCount; i += ConsumeQueue.CQStoreUnitSize) {
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();   // 消息物理位置offset
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();   // 消息長度
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();   // 消息tagsCode
                            // 設置消息物理位置拉取到的最大offset
                            maxPhyOffsetPulling = offsetPy;

                            // 當 offsetPy 小於 nextPhyFileStartOffset 時,意味着對應的 Message 已經移除,所以直接continue,直到可讀取的Message。
                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }

                            // 校驗 commitLog 是否需要硬盤,無法全部放在內存
                            boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
                            // 此批消息達到上限了    // 是否已經獲得足夠消息
                            if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),
                                    isInDisk)) {
                                break;
                            }

                            // 消息過濾 // 判斷消息是否符合條件 !!!
                            if (this.messageFilter.isMessageMatched(subscriptionData, tagsCode)) {
                                // 從commitLog獲取對應消息ByteBuffer !!!
                                SelectMapedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);
                                
                   // 最中從對應的MappedFile對象中通過位置獲取到這個數據,下面把它轉換為GetMessageResult對象。
                   if (selectResult != null) {
                                    this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
                                    getResult.addMessage(selectResult);
                                    status = GetMessageStatus.FOUND;
                                    nextPhyFileStartOffset = Long.MIN_VALUE;
                                } else {
                                    // 從commitLog無法讀取到消息,說明該消息對應的文件(MappedFile)已經刪除,計算下一個MappedFile的起始位置
                                    if (getResult.getBufferTotalSize() == 0) {
                                        status = GetMessageStatus.MESSAGE_WAS_REMOVING;
                                    }


                                    nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);
                                }
                            } else {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                if (log.isDebugEnabled()) {
                                    log.debug("message type not matched, client: " + subscriptionData + " server: " + tagsCode);
                                }
                            }
                        }

                        // 統計剩余可拉取消息字節數
                        if (diskFallRecorded) {
                            long fallBehind = maxOffsetPy - maxPhyOffsetPulling;
                            brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);
                        }

                        // 計算下次拉取消息的消息隊列編號 !!!
                        nextBeginOffset = offset + (i / ConsumeQueue.CQStoreUnitSize);

                        // 根據剩余可拉取消息字節數與內存判斷是否建議讀取從節點
                        long diff = maxOffsetPy - maxPhyOffsetPulling;
                        long memory = (long) (StoreUtil.TotalPhysicalMemorySize
                                * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
                        getResult.setSuggestPullingFromSlave(diff > memory);
                    } finally {
                        // 必須釋放資源
                        bufferConsumeQueue.release();
                    }
                } else {
                    status = GetMessageStatus.OFFSET_FOUND_NULL;
                    nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));
                    log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "
                            + maxOffset + ", but access logic queue failed.");
                }
            }
        }
        // 請求的隊列Id沒有
        else {
            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;
            nextBeginOffset = nextOffsetCorrection(offset, 0);
        }
        // 統計
        if (GetMessageStatus.FOUND == status) {
            this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();
        } else {
            this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();
        }
        long eclipseTime = this.getSystemClock().now() - beginTime;
        this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);
        // 設置返回結果
        getResult.setStatus(status);
        getResult.setNextBeginOffset(nextBeginOffset);
        getResult.setMaxOffset(maxOffset);
        getResult.setMinOffset(minOffset);
        return getResult;
    }

 

先來看一下獲取消息隊列的方法:

    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentHashMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentHashMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(//                     topic, //                     queueId, //                     StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), //                     this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), //                     this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

 下面看第二個標黃的方法commitlog.getmessage~:

    public SelectMapedBufferResult getMessage(final long offset, final int size) {
        int mapedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
        MapedFile mapedFile = this.mapedFileQueue.findMapedFileByOffset(offset, (0 == offset ? true : false));
        if (mapedFile != null) {
            int pos = (int) (offset % mapedFileSize);
            SelectMapedBufferResult result = mapedFile.selectMapedBuffer(pos, size);
            return result;
        }

        return null;
    }

標黃的方法findMapedFileByOffset:這個方法在《再說rocketmq消息存儲》中有介紹。可以到頁面上搜關鍵詞找一下。

    public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            this.readWriteLock.readLock().lock();
            MapedFile mapedFile = this.getFirstMapedFile();

            if (mapedFile != null) {
                int index =
                        (int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));
                if (index < 0 || index >= this.mapedFiles.size()) {
                    logError
                            .warn(
                                    "findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//
                                    offset,//
                                    index,//
                                    this.mapedFileSize,//
                                    this.mapedFiles.size(),//
                                    UtilAll.currentStackTrace());
                }

                try {
                    return this.mapedFiles.get(index);
                } catch (Exception e) {
                    if (returnFirstOnNotFound) {
                        return mapedFile;
                    }
                }
            }
        } catch (Exception e) {
            log.error("findMapedFileByOffset Exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }

        return null;
    }

 

MappedFile中獲取對應位置的數據。

    public SelectMapedBufferResult selectMapedBuffer(int pos, int size) {

        if ((pos + size) <= this.wrotePostion.get()) {

            if (this.hold()) {
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(pos);
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);
                return new SelectMapedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
            } else {
                log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: "
                        + this.fileFromOffset);
            }
        }

        else {
            log.warn("selectMapedBuffer request pos invalid, request pos: " + pos + ", size: " + size
                    + ", fileFromOffset: " + this.fileFromOffset);
        }


        return null;
    }

獲取到文件中的消息數據后返回DefaultMessageStore中的方法:

getResult.addMessage(selectResult)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM