【Java應用】RocketMQ線上問題:重復消費和線程飆升


問題

  1. RocketMQ重復消費問題
  2. RocketMQ線程過高問題

線上場景

場景一:重復消費

場景:生產有這么一種場景,我們在RocketMQ中對一個topic創建了16個tag,不同總類的信息放到不同的tag中,在消費端每個tag對應三個線程組成group去消費消息。消費服務在線上是集群部署,是使用docker進行部署的。

問題1:tag中的消息發生了穩定的重復性消費。

排查:首先我們發現重復消費的次數和線上集群的台數是一致的,所以這個時候就去查看配置信息,然后發現沒有配置錯誤,在多方試錯的情況下,最后在rocketmq的監控頁面發現ClientId獲取的IP竟然是一樣的。
clientID

這時候閱讀RocketMQ的源碼,我們在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl的start方法中看到下面這行代碼

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

點進去看到getAndCreateMQClientInstance

buildMQClientId

從上面圖中的代碼以及我們看到的RocketMQ的監控圖可以明白一點,rocketmq在docker部署中通過getLocalAddress方法獲取出來的IP是一樣,如果你不設置instanceName和unitName,那么多台機器上面使用的就是一個instance。這樣可能會造成重復消費,那么為什么instanceName一致就會造成重復消費呢?接着往下看

  1. org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

    start方法

    // 一個JVM中的所有消費組、生產者持有同一個MQClientInstance,MQClientInstance只會啟動一次
    mQClientFactory.start();
    
  2. org.apache.rocketmq.client.impl.factory.MQClientInstance

    public void start() throws MQClientException {
    	...
    	this.rebalanceService.start();
    	...
    }
    
  3. org.apache.rocketmq.client.impl.consumer.RebalanceService

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            // 該線程默認20s執行一次rebalance
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }
    
        log.info(this.getServiceName() + " service end");
    }
    
  4. org.apache.rocketmq.client.impl.factory.MQClientInstance

    public void doRebalance() {
        // 遍歷注冊的所有已經注冊的消費者,對消費者執行rebalance
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
    
  5. org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl

    @Override
    public void doRebalance() {
        if (!this.pause) {
            // 每個DefaultMQPushConsumerImpl都持有一個單獨的RebalanceImpl對象
            this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
        }
    }
    
  6. org.apache.rocketmq.client.impl.consumer.RebalanceImpl

    public void doRebalance(final boolean isOrder) {
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            // 遍歷訂閱信息對每個主題的隊列進行重新負載
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }
    
        this.truncateMessageQueueNotMyTopic();
    }
    
  7. org.apache.rocketmq.client.impl.consumer.RebalanceImpl

    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                ...
            }
            case CLUSTERING: {
                // 從主題訂閱信息緩存表中獲取該topic的隊列信息
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // topic分布在多個broker上,但是每個broker都存有所有的消費者信息,因為消費者啟動的時候需要像所有的broker注冊信息
                // 這里獲取的是當前topic下消費者組里所有的消費者客戶端ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    
                // 對cidAll和mqAll排序,確保所有消費者結果一致,這樣一個消費隊列就只能被一個消費者分配
                Collections.sort(mqAll);
                Collections.sort(cidAll);
                // 默認為AllocateMessageQueueAveragely
                AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
                allocateResult = strategy.allocate(//
                        this.consumerGroup, //
                        this.mQClientFactory.getClientId(), //
                        mqAll, //
                        cidAll);
            }
        }
    }
    

我們知道RocketMQ不管push還是pull其實底層的實現都是pull,我們看到最后發現他會根據topic和group從broker那里獲取出來所有cunsumer client,如果clientId相同,那么在broker上面只有一個,獲取出來的是一樣,那么拉取的MessageQueue就是一樣的。於是我們就可以給consumer的instanceName設置一個隨機值

場景二:線程劇增

問題2: 設置完隨機值以后確實不重復消費了,但是發現服務器的線程飆升。

排查:jstack下來線上日志,發現里面有很多netty以及rocketmq相關的線程,於是我們再次進到源碼中。這里我就不詳細跟蹤代碼了

start

我們從這里可以看到consumer端起了很多線程,報錯與broker建立鏈接的線程,這里面會級聯產生多個netty相關的線程,然后是定時任務的線程,以及拉取消息的線程和負載均衡的線程。於是我們把instanceName的隨機性與服務綁定,而不是與tag綁定,這樣就可以做到一台服務器以他instance

結論

對於同一個jvm實例我們只需要一個instance實例即可,對於多個jvm我們要區分,不然集群消費會隱式的變為廣播消費

參考

五種隊列分配策略


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM