問題
- RocketMQ重復消費問題
- RocketMQ線程過高問題
線上場景
場景一:重復消費
場景:生產有這么一種場景,我們在RocketMQ中對一個topic創建了16個tag,不同總類的信息放到不同的tag中,在消費端每個tag對應三個線程組成group去消費消息。消費服務在線上是集群部署,是使用docker進行部署的。
問題1:tag中的消息發生了穩定的重復性消費。
排查:首先我們發現重復消費的次數和線上集群的台數是一致的,所以這個時候就去查看配置信息,然后發現沒有配置錯誤,在多方試錯的情況下,最后在rocketmq的監控頁面發現ClientId獲取的IP竟然是一樣的。
這時候閱讀RocketMQ的源碼,我們在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl的start方法中看到下面這行代碼
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
點進去看到
從上面圖中的代碼以及我們看到的RocketMQ的監控圖可以明白一點,rocketmq在docker部署中通過getLocalAddress方法獲取出來的IP是一樣,如果你不設置instanceName和unitName,那么多台機器上面使用的就是一個instance。這樣可能會造成重復消費,那么為什么instanceName一致就會造成重復消費呢?接着往下看
-
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
start方法
// 一個JVM中的所有消費組、生產者持有同一個MQClientInstance,MQClientInstance只會啟動一次 mQClientFactory.start();
-
org.apache.rocketmq.client.impl.factory.MQClientInstance
public void start() throws MQClientException { ... this.rebalanceService.start(); ... }
-
org.apache.rocketmq.client.impl.consumer.RebalanceService
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { // 該線程默認20s執行一次rebalance this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
-
org.apache.rocketmq.client.impl.factory.MQClientInstance
public void doRebalance() { // 遍歷注冊的所有已經注冊的消費者,對消費者執行rebalance for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
-
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl
@Override public void doRebalance() { if (!this.pause) { // 每個DefaultMQPushConsumerImpl都持有一個單獨的RebalanceImpl對象 this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } }
-
org.apache.rocketmq.client.impl.consumer.RebalanceImpl
public void doRebalance(final boolean isOrder) { Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { // 遍歷訂閱信息對每個主題的隊列進行重新負載 for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic, isOrder); } catch (Throwable e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
-
org.apache.rocketmq.client.impl.consumer.RebalanceImpl
private void rebalanceByTopic(final String topic, final boolean isOrder) { switch (messageModel) { case BROADCASTING: { ... } case CLUSTERING: { // 從主題訂閱信息緩存表中獲取該topic的隊列信息 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // topic分布在多個broker上,但是每個broker都存有所有的消費者信息,因為消費者啟動的時候需要像所有的broker注冊信息 // 這里獲取的是當前topic下消費者組里所有的消費者客戶端ID List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // 對cidAll和mqAll排序,確保所有消費者結果一致,這樣一個消費隊列就只能被一個消費者分配 Collections.sort(mqAll); Collections.sort(cidAll); // 默認為AllocateMessageQueueAveragely AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; allocateResult = strategy.allocate(// this.consumerGroup, // this.mQClientFactory.getClientId(), // mqAll, // cidAll); } } }
我們知道RocketMQ不管push還是pull其實底層的實現都是pull,我們看到最后發現他會根據topic和group從broker那里獲取出來所有cunsumer client,如果clientId相同,那么在broker上面只有一個,獲取出來的是一樣,那么拉取的MessageQueue就是一樣的。於是我們就可以給consumer的instanceName設置一個隨機值
場景二:線程劇增
問題2: 設置完隨機值以后確實不重復消費了,但是發現服務器的線程飆升。
排查:jstack下來線上日志,發現里面有很多netty以及rocketmq相關的線程,於是我們再次進到源碼中。這里我就不詳細跟蹤代碼了
我們從這里可以看到consumer端起了很多線程,報錯與broker建立鏈接的線程,這里面會級聯產生多個netty相關的線程,然后是定時任務的線程,以及拉取消息的線程和負載均衡的線程。於是我們把instanceName的隨機性與服務綁定,而不是與tag綁定,這樣就可以做到一台服務器以他instance
結論
對於同一個jvm實例我們只需要一個instance實例即可,對於多個jvm我們要區分,不然集群消費會隱式的變為廣播消費