RocketMQ源碼 — 五、 主要feature及其實現方式


RocketMQ的主要特點以及實現方式

單機支持1萬以上持久隊列

所有數據單獨存儲到一個CommitLog,完全順序寫,隨機讀

在一個broker上一個DefaultMessageStore管理一個commitLog
順序寫:在commitLog.putMessage里面獲取mapedFile之后進入synchronized塊,開始寫內存,所以當有新的消息需要保存的時候會等待鎖釋放,所以寫消息的時候就是順序的

MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();
// 給commitLog上鎖
synchronized (this) {
    // 保存消息
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
}

隨機讀:因為在pull Message的時候根據consumeQueue來讀取消息的,consumeQueue里面記錄了offset,所以讀取mapedFile的時候是按照offset隨機讀取的

對最終用戶展現的是實際只存儲了消息在commitLog的位置信息,並串行刷盤

最終用戶接觸到的是邏輯隊列ComsumeQueue,只存儲了topic、offset等信息
串行刷盤:DefaultMessageStore使用StoreCheckPoint記錄當前刷盤的文件,並只將StoreCheckPoint的mapedFile進行刷盤
PageCache:文件cache是文件數據在內存中的副本,因此文件cache管理與內存管理和文件系統管理相關。文件cache分為兩個層面,Page Cache和Buffer Cache,每一個Page Cache包含多個Buffer Cache。linux中文件Cache的操作分為兩類,一是在文件Cache與應用程序提供的用戶空間buffer拷貝數據(普通的read/wrote操作),二是使用mmap將Cache映射到用戶空間,並沒有拷貝(所以速度更快),用戶空間可以像使用指針一樣(普通訪問文件是使用流)訪問文件

刷盤策略

在commitLog.putMessage中決定刷盤方式,在MessageStoreConfig中配置刷盤的方式

RocketMQ的消息都是持久化的:所有消息保存在commitlog文件夾下的文件中
先寫入系統PageCache:所有commitlog下的文件都是使用的直接內存,采用mmap文件映射的方法,每次接收到消息的時候先把消息寫入直接內存PageCache——即mapedFile
然后刷盤:啟動commitLog的時候會啟動刷盤的線程(FlushCommitLogService)定時刷盤
可以保證內存與磁盤都有一份數據,訪問消息的時候直接從內存中讀取:讀取文件的時候直接從mapedFile取

// 同步刷盤,使用GroupCommitService
if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
	// 是否配置為等待
    if (msg.isWaitStoreMsgOK()) {
        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
        service.putRequest(request);
		// 超時等待
        boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
        if (!flushOK) {
            log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()
                    + " client address: " + msg.getBornHostString());
            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
    } else {
        service.wakeup();
    }
}
// 異步刷盤,FlushRealTimeService
else {
    this.flushCommitLogService.wakeup();
}

消息過濾

  1. 在DefaultMessageStore.getMessage的時候,先根據topic和queueId獲取ConsumeQueue,然后讀取consumeQueue,一次對比consumeQueue的tag的hashCode,如果匹配才去讀取commitLog
  2. 存儲tag的hashCode,定長節省空間
  3. 先讀取consumeQueue,在消息堆積的情況下也能高效過濾消息

長輪詢pull

在PullMessageService的run方法中pull message,在獲取返回結果的回調中再次發起請求(只是添加pullRequest到pullRequestQueue中)——也就是長輪詢

發送消息負載均衡

在DefaultMQProducerImpl send message的時候會調用selectOneMessageQueue(MessageQueue包含了topic,broker,queueID等信息,表明消息發送到哪一個broker的哪一個queue),使用遞增取模的方法決定使用哪一個messageQueue

消費消息的負載均衡

AllocateMessageQueueAveragely.allocate實現了consumer消費的默認負載均衡算法,、

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                       List<String> cidAll) {
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
                consumerGroup, //
                currentCID,//
                cidAll);
        return result;
    }

    // 基本原則,每個隊列只能被一個consumer消費
    // 當messageQueue個數小於等於consume的時候,排在前面(在list中的順序)的consumer消費一個queue,index大於messageQueue之后的consumer消費不到queue,也就是為0
    // 當messageQueue個數大於consumer的時候,分兩種情況
    //     當有余數(mod > 0)並且index < mod的時候,當前comsumer可以消費的隊列個數是 mqAll.size() / cidAll.size() + 1
    //     可以整除或者index 大於余數的時候,隊列數為:mqAll.size() / cidAll.size()
    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                    + 1 : mqAll.size() / cidAll.size());
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

該負載平衡算法:
就是把messageQueue放到一個隊列中,consumer放到一個隊列中,messageQueue依次分配給consumer,
如果不夠分配,則排在后面的consumer就不能消費messageQueue
如果給consumer分配完一輪之后,messageQueue還有多余,那么messageQueue接着分配,consumer隊列從頭開始
示意圖如下:

HA,同步雙寫,異步復制

HAService,RocketMQ的高可用服務
同步雙寫:在commitLog.putMessage中進行同步雙寫,將GroupCommitRequest放進GroupTransferService.requestWrite等待slave主動拉取,master超時等待同步雙寫完成。所以在寫消息的時候是同步等待的,slave從master復制消息的時候是異步的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM