RocketMQ源碼 — 四、 Consumer 接收消息過程


Consumer

consumer pull message

訂閱

在Consumer啟動之前先將自己放到一個本地的集合中,再以后獲取消費者的時候會用到,同時會將自己訂閱的信息告訴broker

接收消息

consumer啟動的時候會啟動兩個service:
RebalanceService:主要實現consumer的負載均衡,但是並不會直接發送獲取消息的請求,而是構造request之后放到PullMessageService中,等待PullMessageService的線程取出執行
PullMessageService:主要負責從broker獲取message,包含一個需要獲取消息的請求隊列(是阻塞的),並不斷依次從隊列中取出請求向broker send Request

執行時序圖太大,截屏截不全,所以放在git上,在這里:RocketMQ.asta

從Broker pullMessage

在PullMessageService中通過netty發送pull消息的請求之后,Broker的remoteServer會收到request,然后在PullMessageProcessor中的processRequest處理,先會解析requestHeader,request中帶了讀取MessageStore的參數:

  • consumerGroup
  • topic
  • queueId
  • queueOffset
  • MaxMsgNums
  • subscriptionData(ConsumerManager中獲取)

processRequest處理流程

  • 判斷Broker當前是否允許接收消息
  • 找到subscriptionGroupConfig,subscriptionGroupTable,如果不存在當前的group則新增一個
  • 是否subscriptionGroupConfig.consumeEnable
  • 獲取從TopicConfigManager.topicConfigTable獲取topicConfig
  • topicConfig是否有讀權限
  • 校驗queueId是否在范圍內
  • ConsumerManager.getConsumerGroupInfo獲取ConsumerGroupInfo
  • consumerGroupInfo.findSubscriptionData查找subscriptionData
  • MessageStore.getMessage讀取消息,從文件中讀取消息,屬於pullMessage的核心方法
  • 判斷brokerRole,master和slave工作模式的哪一種
  • 判斷MessageResult.status
  • 如果responseCode是SUSCESS,判斷是使用heap還是非heap方式傳輸數據
  • 使用netty序列化response返回netty客戶端

負載均衡

// AllocateMessageQueueAveragely
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                       List<String> cidAll) {
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
                consumerGroup, //
                currentCID,//
                cidAll);
        return result;
    }

    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                    + 1 : mqAll.size() / cidAll.size());
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}

// RebalanceImpl
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {
    boolean changed = false;

	// 去掉topic對應的無用MessageQueue(不包含在processQueueTable或者pullExpired)
    Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<MessageQueue, ProcessQueue> next = it.next();
        MessageQueue mq = next.getKey();
        ProcessQueue pq = next.getValue();

        if (mq.getTopic().equals(topic)) {
            if (!mqSet.contains(mq)) {
                pq.setDropped(true);
                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                    it.remove();
                    changed = true;
                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                }
            } else if (pq.isPullExpired()) {
                switch (this.consumeType()) {
                    case CONSUME_ACTIVELY:
                        break;
                    case CONSUME_PASSIVELY:
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
                                    consumerGroup, mq);
                        }
                        break;
                    default:
                        break;
                }
            }
        }
    }

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    for (MessageQueue mq : mqSet) {
    	// 如果MessageQueue不在processQueueTable
        if (!this.processQueueTable.containsKey(mq)) {
            if (isOrder && !this.lock(mq)) {
                log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                continue;
            }

            this.removeDirtyOffset(mq);
            ProcessQueue pq = new ProcessQueue();
            long nextOffset = this.computePullFromWhere(mq);
            // 如果message還有數據需要讀
            if (nextOffset >= 0) {
                ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                if (pre != null) {
                    log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                } else {
                    log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                    PullRequest pullRequest = new PullRequest();
                    pullRequest.setConsumerGroup(consumerGroup);
                    pullRequest.setNextOffset(nextOffset);
                    pullRequest.setMessageQueue(mq);
                    pullRequest.setProcessQueue(pq);
                    pullRequestList.add(pullRequest);
                    changed = true;
                }
            } else {
                log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
            }
        }
    }

	// 將pullRequest放在pullRequestQueue中等待去取數據
    this.dispatchPullRequest(pullRequestList);

    return changed;
}

issue

多個consumer讀取同一個consumerQueue的時候怎么記錄每個讀取的進度

每個consumer只要記住自己讀取哪一個隊列,以及offset就可以了

pull消息過程中的關鍵類

DefaultMQPushConsumerImpl

供DefaultMQPushConsumer調用,作為consumer的默認實現:

AllocateMessageQueueAveragely

這個類主要是consumer消費的負載均衡算法

public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
                                       List<String> cidAll) {
    if (currentCID == null || currentCID.length() < 1) {
        throw new IllegalArgumentException("currentCID is empty");
    }
    if (mqAll == null || mqAll.isEmpty()) {
        throw new IllegalArgumentException("mqAll is null or mqAll empty");
    }
    if (cidAll == null || cidAll.isEmpty()) {
        throw new IllegalArgumentException("cidAll is null or cidAll empty");
    }

    List<MessageQueue> result = new ArrayList<MessageQueue>();
    if (!cidAll.contains(currentCID)) {
        log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}", //
                consumerGroup, //
                currentCID,//
                cidAll);
        return result;
    }

    // 基本原則,每個隊列只能被一個consumer消費
    // 當messageQueue個數小於等於consume的時候,排在前面(在list中的順序)的consumer消費一個queue,index大於messageQueue之后的consumer消費不到queue,也就是為0
    // 當messageQueue個數大於consumer的時候,分兩種情況
    //     當有余數(mod > 0)並且index < mod的時候,當前comsumer可以消費的隊列個數是 mqAll.size() / cidAll.size() + 1
    //     可以整除或者index 大於余數的時候,隊列數為:mqAll.size() / cidAll.size()
    int index = cidAll.indexOf(currentCID);
    int mod = mqAll.size() % cidAll.size();
    int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                    + 1 : mqAll.size() / cidAll.size());
    int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    int range = Math.min(averageSize, mqAll.size() - startIndex);
    for (int i = 0; i < range; i++) {
        result.add(mqAll.get((startIndex + i) % mqAll.size()));
    }
    return result;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM