RocketMQ(八)RocketMQ的Consumer負載均衡


一、問題描述

RocketMQ的Consumer是如何做的負載均衡?比如:5個Consumer進程同時消費一個Topic,這個Topic只有4個queue會出現啥情況?反之Consumer數量小於queue的數據是啥情況?

二、源碼剖析

1、RebalancePushImpl

public class RebalancePushImpl extends RebalanceImpl {
    public RebalancePushImpl(String consumerGroup, MessageModel messageModel,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MQClientInstance mQClientFactory, DefaultMQPushConsumerImpl defaultMQPushConsumerImpl) {
        // 可以看到很簡單,調用了父類RebalanceImpl的構造器
        super(consumerGroup, messageModel, allocateMessageQueueStrategy, mQClientFactory);
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    }

2、RebalanceImpl

public abstract class RebalanceImpl {
    // 很簡單,就是初始化一些東西,關鍵在於下面的doRebalance
    public RebalanceImpl(String consumerGroup, MessageModel messageModel,
        AllocateMessageQueueStrategy allocateMessageQueueStrategy,
        MQClientInstance mQClientFactory) {
        this.consumerGroup = consumerGroup;
        this.messageModel = messageModel;
        this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
        this.mQClientFactory = mQClientFactory;
    }
    
    /**
     * 分配消息隊列,命名抄襲spring,doXXX開始真正的業務邏輯
     *
     * @param isOrder:是否是順序消息 true:是;false:不是
     */
    public void doRebalance(final boolean isOrder) {
        // 分配每個topic的消息隊列
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    // 這個是關鍵了
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        // 移除未訂閱的topic對應的消息隊列
        this.truncateMessageQueueNotMyTopic();
    }
}

2.1、rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case CLUSTERING: {
            // 獲取topic對應的隊列和consumer信息,比如mqSet如下
            /**
             * 0 = {MessageQueue@2151} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=3]"
             * 1 = {MessageQueue@2152} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=0]"
             * 2 = {MessageQueue@2153} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=2]"
             * 3 = {MessageQueue@2154} "MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=1]"
             */
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            // 所有的Consumer客戶端cid,比如:172.16.20.246@7832
            List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
            if (mqSet != null && cidAll != null) {
                List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                // 為什么要addAll到list里,因為他要排序
                mqAll.addAll(mqSet);

                // 排序消息隊列和消費者數組,因為是在進行分配隊列,排序后,各Client的順序才能保持一致。
                Collections.sort(mqAll);
                Collections.sort(cidAll);

                // 默認選擇的是org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                // 根據隊列分配策略分配消息隊列
                List<MessageQueue> allocateResult = null;
                try {
                    // 這個才是要介紹的真正C位,strategy.allocate()
                    allocateResult = strategy.allocate(
                        this.consumerGroup,
                        this.mQClientFactory.getClientId(),
                        mqAll,
                        cidAll);
                } catch (Throwable e) {
                    return;
                }
            }
        }
    }
}

3、AllocateMessageQueueAveragely

3.1、allocate

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    private final InternalLogger log = ClientLogger.getLog();
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
        List<String> cidAll) {
        /**
         * 參數校驗的代碼我刪了。
         */
        
        List<MessageQueue> result = new ArrayList<MessageQueue>();
        /**
         * 第幾個Consumer,這也是我們上面為什么要排序的重要原因之一。
         * Collections.sort(mqAll);
         * Collections.sort(cidAll);
         */
        int index = cidAll.indexOf(currentCID);
        // 取模,多少消息隊列無法平均分配 比如mqAll.size()是4,代表4個queue。cidAll.size()是5,代表一個consumer,那么mod就是4
        int mod = mqAll.size() % cidAll.size();
        // 平均分配
        // 4 <= 5 ? 1 : (4 > 0 && 1 < 4 ? 4 / 5 + 1 : 4 / 5)
        int averageSize =
            mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                + 1 : mqAll.size() / cidAll.size());
        // 有余數的情況下,[0, mod) 平分余數,即每consumer多分配一個節點;第index開始,跳過前mod余數。
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        // 分配隊列數量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消息隊列。
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}

3.2、解釋

看着這算法凌亂的很,太復雜了!說實話,確實挺復雜,蠻羅嗦的,但是代數法可以得到如下表格:

假設4個queue Consumer有2個 可以整除 Consumer有3個 不可整除 Consumer有5個 無法都分配
queue[0] Consumer[0] Consumer[0] Consumer[0]
queue[1] Consumer[0] Consumer[0] Consumer[1]
queue[2] Consumer[1] Consumer[1] Consumer[2]
queue[3] Consumer[1] Consumer[2] Consumer[3]

所以得出如下真香定律(也是回擊面試官的最佳答案):

  • queue個數大於Consumer個數,且queue個數能整除Consumer個數的話, 那么Consumer會平均分配queue。(比如上面表格的Consumer有2個 可以整除部分)
  • queue個數大於Consumer個數,且queue個數不能整除Consumer個數的話, 那么會有一個Consumer多消費1個queue,其余Consumer平均分配。(比如上面表格的Consumer有3個 不可整除部分)
  • queue個數小於Consumer個數,那么會有Consumer閑置,就是浪費掉了,其余Consumer平均分配到queue上。(比如上面表格的Consumer有5個 無法都分配部分)

4、補充

queue選擇算法也就是負載均衡算法有很多種可選擇:

  • AllocateMessageQueueAveragely:是前面講的默認方式
  • AllocateMessageQueueAveragelyByCircle:每個消費者依次消費一個partition,環狀。
  • AllocateMessageQueueConsistentHash:一致性hash算法
  • AllocateMachineRoomNearby:就近元則,離的近的消費
  • AllocateMessageQueueByConfig:是通過配置的方式

三、何時Rebalance

那就得從Consumer啟動的源碼開始看起,先看Consumer的啟動方法start()

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    private MQClientInstance mQClientFactory;
    
    // 啟動Consumer的入口函數
 public synchronized void start() throws MQClientException {
        this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(
            this.defaultMQPushConsumer, this.rpcHook);
        // 調用MQClientInstance的start方法,追進去看看。
        mQClientFactory.start();
    }
}

看看mQClientFactory.start();都干了什么

public class MQClientInstance {
    private final RebalanceService rebalanceService;
    
    public void start() throws MQClientException {
        synchronized (this) {
            // 調用RebalanceService的start方法,別慌,繼續追進去看看
   this.rebalanceService.start();
        }
    }
}

看看rebalanceService.start();都干了什么,先看下他的父類ServiceThread

/*
 * 首先可以發現他是個線程的任務,實現了Runnable接口
 * 其次發現上步調用的start方法居然就是thread.start(),那就相當於調用了RebalanceService的run方法
 */
public abstract class ServiceThread implements Runnable {
 public void start() {
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }

最后來看看RebalanceService.run()

public class RebalanceService extends ServiceThread {
    /**
     * 等待時間的間隔,毫秒,默認是20s
     */
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));

    @Override
    public void run() {
        while (!this.isStopped()) {
            // 等待20s,然后超時自動釋放鎖執行doRebalance
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    }
}

到這里真相大白了。

當一個consumer出現宕機后,默認最多20s,其它機器將重新消費已宕機的機器消費的queue,同樣當有新的Consumer連接上后,20s內也會完成rebalance使得新的Consumer有機會消費queue里的msg。

等等,好像有問題:新上線一個Consumer要等20s才能負載均衡?這不是搞笑呢嗎?肯定有貓膩。

確實,新啟動Consumer的話會立即喚醒沉睡的線程, 讓他立馬進行this.mqClientFactory.doRebalance();,源碼如下

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    // 啟動Consumer的入口函數
 public synchronized void start() throws MQClientException {        
        // 看到了沒!!!, 見名知意,立即rebalance負載均衡
        this.mQClientFactory.rebalanceImmediately();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM