前提背景
使用RocketMQ進行發消息時,一般我們是必須要指定topic,此外topic必須要提前建立,但是topic的創建(自動或者手動方式)的設置有一個開關autoCreateTopicEnable,此部分主要會在broker節點的配置文件的時候進行設置,運行環境中會使用默認設置autoCreateTopicEnable = true,但是這樣就會導致topic的設置不容易規范管理,所以在生產環境中會在Broker設置參數autoCreateTopicEnable = false。那么如果此參數稍有偏差,或者沒有提前手動創建topic,則會頻繁出現No route info of this topic這個錯誤,那么接下來我們探索一下此問題的出現原因以及系統如何進行創建topic。
No route info of this topic
相信做過RocketMQ項目的小伙伴們,可能對No route info of this topic一點都不陌生,說明的含義起始就是無法解析或者路由這個topic,但是造成的原因有很多種。
沒有配置NameServer服務
Broker啟動時我們沒有配置NameSrv地址,發送程序會報錯:No route info of this topic。但當我們配上NameSrv地址后,再次啟動,可以正常發送消息。
沒有建立autoCreateTopicEnable=true且沒有創建該topic
當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發消息的時候肯定先要獲取關於topic的一些信息,比如有幾個消息隊列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的信息時,就會拋出異常
RocketMQ的客戶端版本與服務端版本不一致
RocketMQ Java客戶端調用No route info of this topic錯誤(原因版本不一致)。此時,即使啟動broker的時候設置autoCreateTopicEnable=true也沒有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0
RocketMQ 4.3.0版本的自動創建(autoCreateTopicEnable),客戶端傳遞使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客戶端傳遞的默認AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。
org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
實際代碼
> 4.4.0版本
<=4.3.0版本
方案1:要不就進行調整client客戶端版本的version
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
方案2:調整自動創建代碼為AUTO_CREATE_TOPIC_KEY
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//設置自動創建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
Topic之前並未創建過,Broker未配置NameSrv地址,無法發送,而配置NameSrv后則可以正常發送。這中間有2個問題:
1、topic是怎么自動創建的?
2、topic自動創建過程中Broker、NameSrv如何協作配合的?
分析以下如何自動創建topic的源碼流程
RocketMQ基本路由規則
-
Broker在啟動時向Nameserver注冊存儲在該服務器上的路由信息,並每隔30s向Nameserver發送心跳包,並更新路由信息。
Nameserver每隔10s掃描路由表,如果檢測到Broker服務宕機,則移除對應的路由信息。 -
消息生產者每隔30s會從Nameserver重新拉取Topic的路由信息並更新本地路由表;在消息發送之前,如果本地路由表中不存在對應主題的路由消息時,會主動向Nameserver拉取該主題的消息。
-
如果autoCreateTopicEnable設置為true,消息發送者向NameServer查詢主題的路由消息返回空時,會嘗試用一個系統默認的主題名稱(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此時消息發送者得到的路由信息為:
默認Topic的路由信息是如何創建的?
Nameserver?broker?當autoCreateTopicEnable=false時,DefaultMQProducerImpl.sendDefaultImpl,當發消息的時候肯定先要獲取關於topic的一些信息,比如有幾個消息隊列,是不時有序topic,有這個topic的Broker列表等,當獲取不到正確的信息時,就會拋出異常
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 如果獲取到topic的路由信息,則發送,否則拋異常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
... ...
}
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty()) {
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
}
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
tryToFindTopicPublishInfo是發送的關鍵,如果獲取到topic的信息,則發送,否則就異常;因此之前No route info of this topic的異常,就是Producer獲取不到Topic的信息,導致發送失敗。
先從topicPublishInfoTable緩存中獲取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// topicPublishInfoTable是Producer本地緩存的topic信息表
// Producer啟動后,會添加默認的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未獲取到,從NameSrv獲取該topic的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 獲取到了,則返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 沒獲取到,再換種方式從NameSrv獲取
// 如果再獲取不到,那后續就無法發送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
-
Producer本地topicPublishInfoTable變量中沒有topic的信息,只緩存了TBW102。
-
嘗試從NameSrv獲取Topic的信息。獲取失敗,NameSrv中根本沒有Topic,因為這個topic是Producer發送時設置的,沒有同步到NameSrv。
-
再換種方式從NameSrv獲取,如果獲取到了,那么可以執行發送流程,如果還是沒有獲取到,就會拋No route info of this topic的異常了。
再從NameServer服務中進行獲取
public boolean updateTopicRouteInfoFromNameServer(final String topic) {
return updateTopicRouteInfoFromNameServer(topic, false, null);
}
-
第1次獲取時,isDefault傳的false,defaultMQProducer傳的null,因此在updateTopicRouteInfoFromNameServer會走else分支,用Topic去獲取
-
第2次獲取時,isDefault傳的true,defaultMQProducer也傳值了,因此會走if分支,將入參的topic轉換為默認的TBW102,獲取TBW102的信息
-
不管Broker配沒配NameSrv地址,獲取Topic的信息,必失敗
-
獲取TBW102信息:
- 2.1 Broker配置了NameSrv地址,成功
- 2.2 Broker沒有配置NameSrv地址,失敗
生產者首先向NameServer查詢路由信息,由於是一個不存在的主題,故此時返回的路由信息為空,RocketMQ會使用默認的主題再次尋找,由於開啟了自動創建路由信息,NameServer會向生產者返回默認主題的路由信息。
然后從返回的路由信息中選擇一個隊列(默認輪詢)。消息發送者從Nameserver獲取到默認的Topic的隊列信息后,隊列的個數會改變嗎?
從NameServer中獲取,注意這個isDefault=false,defaultMQProducer=null
溫馨提示:消息發送者在到默認路由信息時,其隊列數量,會選擇DefaultMQProducer#defaultTopicQueueNums與Nameserver返回的的隊列數取最小值,DefaultMQProducer#defaultTopicQueueNums默認值為4,故自動創建的主題,其隊列數量默認為4。
獲取消息對應的topic信息
發請求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader)
,但是因為沒有任何一個Broker有關於這個topic的信息,所以namesrv就會返回topic不存在,處理請求的代碼在DefaultRequestProcessor的。
case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);
也就是回應碼ResponseCode.TOPIC_NOT_EXIST,然后拋出異常 throw new MQClientException(response.getCode(), response.getRemark());被捕獲之后退出返回false。
從NameServer獲取相關的Topic信息數據
updateTopicRouteInfoFromNameServer最終會發給NameSrv一個GET_ROUTEINTO_BY_TOPIC請求
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
} catch (Exception e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
因為if條件不滿足,所以獲取默認的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
默認的topic為"TBW102",這個時候如果namesrv中如果還是沒有這個topic的信息的話,就會拋出異常No route info of this topic。
autoCreateTopicEnable=true的作用。
Broker啟動流程自動創建topic
-
在Broker啟動流程中,會構建TopicConfigManager對象,其構造方法中首先會判斷是否開啟了允許自動創建主題,如果啟用了自動創建主題,則向topicConfigTable中添加默認主題的路由信息。
-
當Broker啟動時,TopicConfigManager初始化,這里會判斷該標識,創建TBW102topic,並且在后續的心跳中把信息更新到namesrv中,這樣在發消息的時候就不會拋出不存在的異常。
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
該topicConfigTable中所有的路由信息,會隨着Broker向Nameserver發送心跳包中,Nameserver收到這些信息后,更新對應Topic的路由信息表。
BrokerConfig的defaultTopicQueueNum默認為8。兩台Broker服務器都會運行上面的過程,故最終Nameserver中關於默認主題的路由信息中,會包含兩個Broker分別各8個隊列信息。
TopicConfigManager構造方法
當從namesrv查出Topic相關的信息時,在topicRouteData2TopicPublishInfo設置消息隊列數量 info.getMessageQueueList().add(mq);,調用updateTopicPublishInfo方法更新緩存topicPublishInfoTable
// Update Pub info
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
然后if (topicPublishInfo != null && topicPublishInfo.ok()) 這個條件就會符合,那個異常就不會拋出。
當autoCreateTopicEnable=false時
- 創建topic的類UpdateTopicSubCommand(),設置相應的信息,最后調用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- 發消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor處理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
- 同步給其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
Broker端收到消息后的處理流程
服務端收到消息發送的處理器為:SendMessageProcessor,在處理消息發送時,會調用super.msgCheck方法:
AbstractSendMessageProcessor#msgCheck
在Broker端,首先會使用TopicConfigManager根據topic查詢路由信息,如果Broker端不存在該主題的路由配置(路由信息),此時如果Broker中存在默認主題的路由配置信息,則根據消息發送請求中的隊列數量,在Broker創建新Topic的路由信息。這樣Broker服務端就會存在主題的路由信息。
在Broker端的topic配置管理器中存在的路由信息,一會向Nameserver發送心跳包,匯報到Nameserver,另一方面會有一個定時任務,定時存儲在broker端,具體路徑為${ROCKET_HOME}/store/config/topics.json中,這樣在Broker關閉后再重啟,並不會丟失路由信息。
TBW102是為何物?
TBW102是Broker啟動時,當autoCreateTopicEnable的配置為true時,會自動創建該默認topic。
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
// ...
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
// ...
}
autoCreateTopicEnable的默認值是true,可以同步外部配置文件,讓Broker啟動時加載,來改變該值。我理解的TBW102的作用是當開啟自動創建topic功能,發送時用了未配置的topic,可以讓該topic繼承默認TBW102的配置,實現消息的發送。
總結分析
-
client本地首先沒有緩存對應topic的路由信息,然后先去nameserver去查找,nameserver中也沒有此topic的路由信息,然后返回給client。client接收到返回后再向nameserver請求topic為tbw102的路由信息。
-
如果有broker設置了autocreateTopic,則broker在啟動的時候會在topicManager中創建對應的topicconfig通過心跳發送給nameserver,namerserver會將其保存。nameserver將之前保存的tbw102的路由信息返回給請求的client。
-
client拿到了topic為tbw102的路由信息后返回,client根據返回的tbw102路由信息(里面包含所有設置了autocreateTopic為true的broker,默認每個broker會在client本地創建DefaultTopicQueueNums=4個讀寫隊列選擇,假設兩個broker則會有8個隊列讓你選擇)先緩存到本地的topicPublishInfoTable表中,key為此topic ,value為此topicRouteData,輪詢選擇一個隊列進行發送。
根據選擇到的隊列對應的broker發送該topic消息。
broker在接收到此消息后會在msgcheck方法中調用createTopicInSendMessageMethod方法創建topicConfig信息塞進topicConfigTable表中,然后就跟發送已經創建的topic的流程一樣發送消息了。
同時topicConfigTable會通過心跳將新的這個topicConfig信息發送給nameserver。
nameserver接收到后會更新topic的路由信息,如果之前接收到消息的broker沒有全部覆蓋到,因為broker會30S向nameserver發送一次心跳,心跳包里包含topicconfig,覆蓋到的broker會將自動創建好的topicconfig信息發送給nameserver,從而在nameserver那邊接收到后會注冊這個新的topic信息,因為消費者每30S也會到nameserver去更新本地的topicrouteinfo,請求發送到nameserver得到了之前覆蓋到的broker發送的心跳包更新后的最新topic路由信息,那么未被覆蓋的broker就永遠不會加入到這個負載均衡了,就會造成負載均衡達不到預期了,即所有能自動創建topic的broker不能全部都參與進來。
參考資料
https://www.cnblogs.com/dingwpmz/p/11809404.html