🏆【Alibaba中間件技術系列】「RocketMQ技術專題」Broker服務端自動創建topic的原理分析和問題要點指南


前提背景

使用RocketMQ進行發消息時,一般我們是必須要指定topic,此外topic必須要提前建立,但是topic的創建(自動或者手動方式)的設置有一個開關autoCreateTopicEnable,此部分主要會在broker節點的配置文件的時候進行設置,運行環境中會使用默認設置autoCreateTopicEnable = true,但是這樣就會導致topic的設置不容易規范管理,所以在生產環境中會在Broker設置參數autoCreateTopicEnable = false。那么如果此參數稍有偏差,或者沒有提前手動創建topic,則會頻繁出現No route info of this topic這個錯誤,那么接下來我們探索一下此問題的出現原因以及系統如何進行創建topic。

No route info of this topic

相信做過RocketMQ項目的小伙伴們,可能對No route info of this topic一點都不陌生,說明的含義起始就是無法解析或者路由這個topic,但是造成的原因有很多種。

沒有配置NameServer服務

Broker啟動時我們沒有配置NameSrv地址,發送程序會報錯:No route info of this topic。但當我們配上NameSrv地址后,再次啟動,可以正常發送消息。

沒有建立autoCreateTopicEnable=true且沒有創建該topic

當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發消息的時候肯定先要獲取關於topic的一些信息,比如有幾個消息隊列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的信息時,就會拋出異常

RocketMQ的客戶端版本與服務端版本不一致

RocketMQ Java客戶端調用No route info of this topic錯誤(原因版本不一致)。此時,即使啟動broker的時候設置autoCreateTopicEnable=true也沒有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0

RocketMQ 4.3.0版本的自動創建(autoCreateTopicEnable),客戶端傳遞使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客戶端傳遞的默認AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。

org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

實際代碼

> 4.4.0版本

<=4.3.0版本

方案1:要不就進行調整client客戶端版本的version

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

方案2:調整自動創建代碼為AUTO_CREATE_TOPIC_KEY

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//設置自動創建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

Topic之前並未創建過,Broker未配置NameSrv地址,無法發送,而配置NameSrv后則可以正常發送。這中間有2個問題:
1、topic是怎么自動創建的?
2、topic自動創建過程中Broker、NameSrv如何協作配合的?

分析以下如何自動創建topic的源碼流程

RocketMQ基本路由規則

  1. Broker在啟動時向Nameserver注冊存儲在該服務器上的路由信息,並每隔30s向Nameserver發送心跳包,並更新路由信息。
    Nameserver每隔10s掃描路由表,如果檢測到Broker服務宕機,則移除對應的路由信息。

  2. 消息生產者每隔30s會從Nameserver重新拉取Topic的路由信息並更新本地路由表;在消息發送之前,如果本地路由表中不存在對應主題的路由消息時,會主動向Nameserver拉取該主題的消息。

  3. 如果autoCreateTopicEnable設置為true,消息發送者向NameServer查詢主題的路由消息返回空時,會嘗試用一個系統默認的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時消息發送者得到的路由信息為:

默認Topic的路由信息是如何創建的?

Nameserver?broker?當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發消息的時候肯定先要獲取關於topic的一些信息,比如有幾個消息隊列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的信息時,就會拋出異常

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 如果獲取到topic的路由信息,則發送,否則拋異常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
           ... ...
        }
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        }
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

tryToFindTopicPublishInfo是發送的關鍵,如果獲取到topic的信息,則發送,否則就異常;因此之前No route info of this topic的異常,就是Producer獲取不到Topic的信息,導致發送失敗。

先從topicPublishInfoTable緩存中獲取

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    // topicPublishInfoTable是Producer本地緩存的topic信息表
    // Producer啟動后,會添加默認的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未獲取到,從NameSrv獲取該topic的信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    // 獲取到了,則返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        // 沒獲取到,再換種方式從NameSrv獲取
        // 如果再獲取不到,那后續就無法發送了
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
  1. Producer本地topicPublishInfoTable變量中沒有topic的信息,只緩存了TBW102。

  2. 嘗試從NameSrv獲取Topic的信息。獲取失敗,NameSrv中根本沒有Topic,因為這個topic是Producer發送時設置的,沒有同步到NameSrv。

  3. 再換種方式從NameSrv獲取,如果獲取到了,那么可以執行發送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。

再從NameServer服務中進行獲取

public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        return updateTopicRouteInfoFromNameServer(topic, false, null);
}
  1. 第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用Topic去獲取

  2. 第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入參的topic轉換為默認的TBW102,獲取TBW102的信息

  3. 不管Broker配沒配NameSrv地址,獲取Topic的信息,必失敗

  4. 獲取TBW102信息:

    • 2.1 Broker配置了NameSrv地址,成功
    • 2.2 Broker沒有配置NameSrv地址,失敗

生產者首先向NameServer查詢路由信息,由於是一個不存在的主題,故此時返回的路由信息為空,RocketMQ會使用默認的主題再次尋找,由於開啟了自動創建路由信息,NameServer會向生產者返回默認主題的路由信息。

然后從返回的路由信息中選擇一個隊列(默認輪詢)。消息發送者從Nameserver獲取到默認的Topic的隊列信息后,隊列的個數會改變嗎?

從NameServer中獲取,注意這個isDefault=false,defaultMQProducer=null

溫馨提示:消息發送者在到默認路由信息時,其隊列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的隊列數取最小值,DefaultMQProducer#defaultTopicQueueNums默認值為4,故自動創建的主題,其隊列數量默認為4。

獲取消息對應的topic信息

發請求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader),但是因為沒有任何一個Broker有關於這個topic的信息,所以namesrv就會返回topic不存在,處理請求的代碼在DefaultRequestProcessor的。

case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);

也就是回應碼ResponseCode.TOPIC_NOT_EXIST,然后拋出異常 throw new MQClientException(response.getCode(), response.getRemark());被捕獲之后退出返回false。

從NameServer獲取相關的Topic信息數據

updateTopicRouteInfoFromNameServer最終會發給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) {
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            }
                        }
                    } else {
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    }
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }
        return false;
    }

因為if條件不滿足,所以獲取默認的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
}

默認的topic為"TBW102",這個時候如果namesrv中如果還是沒有這個topic的信息的話,就會拋出異常No route info of this topic。
autoCreateTopicEnable=true的作用。

Broker啟動流程自動創建topic

  • 在Broker啟動流程中,會構建TopicConfigManager對象,其構造方法中首先會判斷是否開啟了允許自動創建主題,如果啟用了自動創建主題,則向topicConfigTable中添加默認主題的路由信息。

  • 當Broker啟動時,TopicConfigManager初始化,這里會判斷該標識,創建TBW102topic,並且在后續的心跳中把信息更新到namesrv中,這樣在發消息的時候就不會拋出不存在的異常。

 // MixAll.DEFAULT_TOPIC
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
                String topic = MixAll.DEFAULT_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            }

該topicConfigTable中所有的路由信息,會隨着Broker向Nameserver發送心跳包中,Nameserver收到這些信息后,更新對應Topic的路由信息表。

BrokerConfig的defaultTopicQueueNum默認為8。兩台Broker服務器都會運行上面的過程,故最終Nameserver中關於默認主題的路由信息中,會包含兩個Broker分別各8個隊列信息。

TopicConfigManager構造方法

當從namesrv查出Topic相關的信息時,在topicRouteData2TopicPublishInfo設置消息隊列數量 info.getMessageQueueList().add(mq);,調用updateTopicPublishInfo方法更新緩存topicPublishInfoTable

 // Update Pub info
                            {
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) {
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) {
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    }
                                }
                            }

然后if (topicPublishInfo != null && topicPublishInfo.ok()) 這個條件就會符合,那個異常就不會拋出。

當autoCreateTopicEnable=false時

  1. 創建topic的類UpdateTopicSubCommand(),設置相應的信息,最后調用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  2. 發消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor處理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
  3. 同步給其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);

Broker端收到消息后的處理流程

服務端收到消息發送的處理器為:SendMessageProcessor,在處理消息發送時,會調用super.msgCheck方法:

AbstractSendMessageProcessor#msgCheck

在Broker端,首先會使用TopicConfigManager根據topic查詢路由信息,如果Broker端不存在該主題的路由配置(路由信息),此時如果Broker中存在默認主題的路由配置信息,則根據消息發送請求中的隊列數量,在Broker創建新Topic的路由信息。這樣Broker服務端就會存在主題的路由信息。

在Broker端的topic配置管理器中存在的路由信息,一會向Nameserver發送心跳包,匯報到Nameserver,另一方面會有一個定時任務,定時存儲在broker端,具體路徑為${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉后再重啟,並不會丟失路由信息。

TBW102是為何物?

TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創建該默認topic。

public TopicConfigManager(BrokerController brokerController) {
    this.brokerController = brokerController;
    // ...
    {
        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        }
    }
    // ...
}

autoCreateTopicEnable的默認值是true,可以同步外部配置文件,讓Broker啟動時加載,來改變該值。我理解的TBW102的作用是當開啟自動創建topic功能,發送時用了未配置的topic,可以讓該topic繼承默認TBW102的配置,實現消息的發送。

總結分析

  1. client本地首先沒有緩存對應topic的路由信息,然后先去nameserver去查找,nameserver中也沒有此topic的路由信息,然后返回給client。client接收到返回后再向nameserver請求topic為tbw102的路由信息。

  2. 如果有broker設置了autocreateTopic,則broker在啟動的時候會在topicManager中創建對應的topicconfig通過心跳發送給nameserver,namerserver會將其保存。nameserver將之前保存的tbw102的路由信息返回給請求的client。

  3. client拿到了topic為tbw102的路由信息后返回,client根據返回的tbw102路由信息(里面包含所有設置了autocreateTopic為true的broker,默認每個broker會在client本地創建DefaultTopicQueueNums=4個讀寫隊列選擇,假設兩個broker則會有8個隊列讓你選擇)先緩存到本地的topicPublishInfoTable表中,key為此topic ,value為此topicRouteData,輪詢選擇一個隊列進行發送。

根據選擇到的隊列對應的broker發送該topic消息。

broker在接收到此消息后會在msgcheck方法中調用createTopicInSendMessageMethod方法創建topicConfig信息塞進topicConfigTable表中,然后就跟發送已經創建的topic的流程一樣發送消息了。

同時topicConfigTable會通過心跳將新的這個topicConfig信息發送給nameserver。

nameserver接收到后會更新topic的路由信息,如果之前接收到消息的broker沒有全部覆蓋到,因為broker會30S向nameserver發送一次心跳,心跳包里包含topicconfig,覆蓋到的broker會將自動創建好的topicconfig信息發送給nameserver,從而在nameserver那邊接收到后會注冊這個新的topic信息,因為消費者每30S也會到nameserver去更新本地的topicrouteinfo,請求發送到nameserver得到了之前覆蓋到的broker發送的心跳包更新后的最新topic路由信息,那么未被覆蓋的broker就永遠不會加入到這個負載均衡了,就會造成負載均衡達不到預期了,即所有能自動創建topic的broker不能全部都參與進來。

參考資料

https://www.cnblogs.com/dingwpmz/p/11809404.html

https://www.pianshen.com/article/24191855587/

https://www.jianshu.com/p/c8fd57a7f741


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM