高可用究竟指的是什么?請參考:關於高可用的系統
RocketMQ做了以下的事情來保證系統的高可用
- 多master部署,防止單點故障
- 消息冗余(主從結構),防止消息丟失
- 故障恢復(本篇暫不討論)
那么問題來了:
- 怎么支持多broker的寫?
- 怎么實現消息冗余?
下面分開說明這兩個問題
多master集群
這里強調出master集群,是因為需要多個broker set,而一個broker set只有一個master(見下文的“注意”),所以是master集群
broker有三種角色:ASYNC_MASTER、SYNC_MASTER和SLAVE,這些角色常用的搭配為:
- ASYNC_MASTER、SLAVE:容許丟消息,但是要broker一直可用,master異步傳輸CommitLog到slave
- SYNC_MASTER、SLAVE:不允許丟消息,master同步傳輸CommitLog到slave
- ASYNC_MASTER:如果只是想簡單部署則使用這種方式
master:負責消息的讀寫
slave:只負責讀消息
SYNC_MASTER與ASYNC_MASTER的區別是sync會等待消息傳輸到slave才算消息寫完成,而async不會同步等待,而是異步復制到slave
RocketMQ的架構圖(原圖地址)
注意:在RocketMQ里面有一個概念broker set,一個broker set由一個master和多個slave組成,一個broker set內的每個broker的brokerName相同。
在broker集群中每個master相互之間是獨立,master之間不會有交互,每個master維護自己的CommitLog、自己的ConsumeQueue,但是每一個master都有可能收到同一個topic下的producer發來的消息
為了支持多master集群,需要解決幾個問題:
- namesrv怎么管理broker
- producer發送消息的時候知道發送到哪一個broker(為什么是master)
1. namesrv怎么管理broker
broker啟動的時候會向namesrv注冊自己的信息
// org.apache.rocketmq.broker.BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
// 省略中間代碼...
RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills());
// 省略中間代碼...
}
信息中包括:
clusterName:broker 集群的名字,如:DefaultCluster
brokerAddr:broker的ip:port,如:192.168.0.102:10911
brokerName:注意這個字段,上面介紹過了,一個broker set中的brokerName是相同的,需要在部署的時候配置
brokerId:用來唯一標示一個broker set中的broker,master是0(org.apache.rocketmq.common.MixAll#MASTER_ID),slave是正整數
haServerAddr:haServer的ip:port,如:192.168.0.102:10912
topicConfigWrapper:是比較復雜的數據結構,主要包含了broker上所有的topic信息,如:
{
"dataVersion": {
"counter": 2,
"timestamp": 1514252649572
},
"topicConfigTable": {
"TopicTest": {
"order": false,
"perm": 6,
"readQueueNums": 4,
"topicFilterType": "SINGLE_TAG",
"topicName": "TopicTest",
"topicSysFlag": 0,
"writeQueueNums": 4
},
"%RETRY%please_rename_unique_group_name_4": {
"order": false,
"perm": 6,
"readQueueNums": 1,
"topicFilterType": "SINGLE_TAG",
"topicName": "%RETRY%please_rename_unique_group_name_4",
"topicSysFlag": 0,
"writeQueueNums": 1
}
}
}
上面包含了兩個topic:TopicTest和%RETRY%please_rename_unique_group_name_4,相關字段的含義:
order:是否是順序消息
perm:表明該topic的權限,可讀(4)、可寫(2)、可繼承(1),通過位運算組合
readQueueNums:決定了consume消費的MessageQueue共有幾個
writeQueueNums:決定了producer發送消息的MessageQueue共有幾個
這些信息發送給namesrv之后,namesrv轉化為自己的數據結構,namesrv處理broker注冊的方法是:
// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
// 省略中間代碼...
// 這里會判斷只有master才會創建QueueData,因為只有master才包含了讀寫隊列的信息
// slave沒有自己獨立的讀寫隊列信息(salve不會創建自己的queue信息),只是和master的的讀寫隊列信息一致
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
// 這個方法創建了QueueData,QueueData包含broker set下的讀寫隊列的信息
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
// 省略中間代碼...
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
上面涉及到的namesrv的幾個重要數據結構
// 每個cluster下的broker set信息,一個brokerName對應的broker set
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 每個broker set中的broker信息(set中有哪些broker,每個broker的brokerId和brokerAddr)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 每個broker的存活情況
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 每個topic下的queue信息,包括每個broker set中讀寫隊列的個數,consumer消費消息和producer發送消息的路由信息都從這個數據結構中獲取
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
所以,namesrv通過將broker注冊來的信息構造成自己的數據結構:
- 每個cluster有哪些broker set
- 每個broker set包括哪些broker,brokerId和broker的ip:port
- 每個broker的存活情況,根據每次broker上報來的信息,清除可能下線的broker
- 每個topic的消息隊列信息,幾個讀隊列,幾個寫隊列
namesrv匯總所有的broker的這些信息,然后供consumer和producer拉取
2. producer發送消息的時候知道發送到哪一個master
之前我們知道producer發送消息的時候發往哪一個broker是由MessageQueue決定的,所以我們先要搞清楚producer發送消息時候的MessageQueue怎么來的。producer維護了一個topicPublishInfoTable,里面包含了每個topic對應的MessageQueue,所以問題就變成了topicPublishInfoTable怎么構造的。
producer發送消息之前都會獲取topic對應的隊列信息,當topicPublishInfoTable中沒有的時候會從namesrv獲取,獲取的方法如下:
// org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null) {
// 省略中間代碼...
} else {
// 從manesrv獲取topic的路由信息,namesrv從topicQueueTable獲取到該topic對應的所有的QueueData
// 然后將每個brokerName下的BrokerData返回
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
// 省略中間代碼...
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
// 每個broker set下所有的broker地址(ip:port)
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
{
// 將從namesrv獲取到的路由信息轉換為TopicPublishInfo
// 期間會將沒有master的broker set的queue信息去除
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
// 省略中間代碼...
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
到此,producer也知道自己可以向哪些MessageQueue發送消息了,接下來就是producer的負載均衡算法選出其中一個MessageQueue發送消息(org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue,這個暫時不詳表),MessageQueue包含的信息有topic、brokerName、queueId,但是producer發送的時候得知道broker的ip:port信息,而且一個brokerName對應的是一個broker set,並不能確定具體的broker,所以接下來應該找到具體的broker
// org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish
public String findBrokerAddressInPublish(final String brokerName) {
// 上面updateTopicRouteInfoFromNameServer方法將broker set下的broker地址信息保存到brokerAddrTable
// 再次重申:一個broker set下的broker的brokerName相同
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
// 沒有花樣,就是直接返回brokerId時MixAll.MASTER_ID的broker的ip:port信息
// 前面說過master的brokerId就是MixAll.MASTER_ID,所以獲取到的broker是broker set中的master
return map.get(MixAll.MASTER_ID);
}
return null;
}
終於真相大白,producer只會向是master的broker發送消息,也就是一個broker set中brokerId是0的broker。
producer只能發送消息到master,而不能發送到slave,這也說明了master負責讀“寫”,而slave只負責讀(當然,這里只說明了“寫”的部分,關於master 和slave的“讀”下一篇介紹)。
總結
本篇介紹了RocketMQ究竟做了什么來實現作為一個消息隊列中間件的高可用,由於篇幅會偏長,所以分為兩篇文章來說明,下一篇說明文中遺留下的另一個問題——RocketMQ源碼 — 六、 RocketMQ高可用(2)
參考: