RocketMQ源碼 — 六、 RocketMQ高可用(1)


高可用究竟指的是什么?請參考:關於高可用的系統

RocketMQ做了以下的事情來保證系統的高可用

  • 多master部署,防止單點故障
  • 消息冗余(主從結構),防止消息丟失
  • 故障恢復(本篇暫不討論)

那么問題來了:

  • 怎么支持多broker的寫?
  • 怎么實現消息冗余?

下面分開說明這兩個問題

多master集群

這里強調出master集群,是因為需要多個broker set,而一個broker set只有一個master(見下文的“注意”),所以是master集群

broker有三種角色:ASYNC_MASTER、SYNC_MASTER和SLAVE,這些角色常用的搭配為:

  1. ASYNC_MASTER、SLAVE:容許丟消息,但是要broker一直可用,master異步傳輸CommitLog到slave
  2. SYNC_MASTER、SLAVE:不允許丟消息,master同步傳輸CommitLog到slave
  3. ASYNC_MASTER:如果只是想簡單部署則使用這種方式

master:負責消息的讀寫

slave:只負責讀消息

SYNC_MASTER與ASYNC_MASTER的區別是sync會等待消息傳輸到slave才算消息寫完成,而async不會同步等待,而是異步復制到slave

RocketMQ的架構圖(原圖地址

注意:在RocketMQ里面有一個概念broker set,一個broker set由一個master和多個slave組成,一個broker set內的每個broker的brokerName相同。

在broker集群中每個master相互之間是獨立,master之間不會有交互,每個master維護自己的CommitLog、自己的ConsumeQueue,但是每一個master都有可能收到同一個topic下的producer發來的消息

為了支持多master集群,需要解決幾個問題:

  • namesrv怎么管理broker
  • producer發送消息的時候知道發送到哪一個broker(為什么是master)

1. namesrv怎么管理broker

broker啟動的時候會向namesrv注冊自己的信息

// org.apache.rocketmq.broker.BrokerController#registerBrokerAll
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    // 省略中間代碼...
    RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
        this.brokerConfig.getBrokerClusterName(),
        this.getBrokerAddr(),
        this.brokerConfig.getBrokerName(),
        this.brokerConfig.getBrokerId(),
        this.getHAServerAddr(),
        topicConfigWrapper,
        this.filterServerManager.buildNewFilterServerList(),
        oneway,
        this.brokerConfig.getRegisterBrokerTimeoutMills());
    // 省略中間代碼...
}

信息中包括:

clusterName:broker 集群的名字,如:DefaultCluster

brokerAddr:broker的ip:port,如:192.168.0.102:10911

brokerName:注意這個字段,上面介紹過了,一個broker set中的brokerName是相同的,需要在部署的時候配置

brokerId:用來唯一標示一個broker set中的broker,master是0(org.apache.rocketmq.common.MixAll#MASTER_ID),slave是正整數

haServerAddr:haServer的ip:port,如:192.168.0.102:10912

topicConfigWrapper:是比較復雜的數據結構,主要包含了broker上所有的topic信息,如:

{
    "dataVersion": {
        "counter": 2,
        "timestamp": 1514252649572
    },
    "topicConfigTable": {
        "TopicTest": {
            "order": false,
            "perm": 6,
            "readQueueNums": 4,
            "topicFilterType": "SINGLE_TAG",
            "topicName": "TopicTest",
            "topicSysFlag": 0,
            "writeQueueNums": 4
        },
        "%RETRY%please_rename_unique_group_name_4": {
            "order": false,
            "perm": 6,
            "readQueueNums": 1,
            "topicFilterType": "SINGLE_TAG",
            "topicName": "%RETRY%please_rename_unique_group_name_4",
            "topicSysFlag": 0,
            "writeQueueNums": 1
        }
    }
}

上面包含了兩個topic:TopicTest和%RETRY%please_rename_unique_group_name_4,相關字段的含義:

order:是否是順序消息

perm:表明該topic的權限,可讀(4)、可寫(2)、可繼承(1),通過位運算組合

readQueueNums:決定了consume消費的MessageQueue共有幾個

writeQueueNums:決定了producer發送消息的MessageQueue共有幾個

這些信息發送給namesrv之后,namesrv轉化為自己的數據結構,namesrv處理broker注冊的方法是:

// org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker
public RegisterBrokerResult registerBroker(
    final String clusterName,
    final String brokerAddr,
    final String brokerName,
    final long brokerId,
    final String haServerAddr,
    final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,
    final Channel channel) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            // 省略中間代碼...
            // 這里會判斷只有master才會創建QueueData,因為只有master才包含了讀寫隊列的信息
            // slave沒有自己獨立的讀寫隊列信息(salve不會創建自己的queue信息),只是和master的的讀寫隊列信息一致
            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) {
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) {
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) {
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
                            // 這個方法創建了QueueData,QueueData包含broker set下的讀寫隊列的信息
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        }
                    }
                }
            }

            // 省略中間代碼...
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}

上面涉及到的namesrv的幾個重要數據結構

// 每個cluster下的broker set信息,一個brokerName對應的broker set
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
// 每個broker set中的broker信息(set中有哪些broker,每個broker的brokerId和brokerAddr)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// 每個broker的存活情況
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 每個topic下的queue信息,包括每個broker set中讀寫隊列的個數,consumer消費消息和producer發送消息的路由信息都從這個數據結構中獲取
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

所以,namesrv通過將broker注冊來的信息構造成自己的數據結構:

  • 每個cluster有哪些broker set
  • 每個broker set包括哪些broker,brokerId和broker的ip:port
  • 每個broker的存活情況,根據每次broker上報來的信息,清除可能下線的broker
  • 每個topic的消息隊列信息,幾個讀隊列,幾個寫隊列

namesrv匯總所有的broker的這些信息,然后供consumer和producer拉取

2. producer發送消息的時候知道發送到哪一個master

之前我們知道producer發送消息的時候發往哪一個broker是由MessageQueue決定的,所以我們先要搞清楚producer發送消息時候的MessageQueue怎么來的。producer維護了一個topicPublishInfoTable,里面包含了每個topic對應的MessageQueue,所以問題就變成了topicPublishInfoTable怎么構造的。

producer發送消息之前都會獲取topic對應的隊列信息,當topicPublishInfoTable中沒有的時候會從namesrv獲取,獲取的方法如下:

// org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) {
    try {
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
            try {
                TopicRouteData topicRouteData;
                if (isDefault && defaultMQProducer != null) {
                    // 省略中間代碼...
                } else {
                    // 從manesrv獲取topic的路由信息,namesrv從topicQueueTable獲取到該topic對應的所有的QueueData
                    // 然后將每個brokerName下的BrokerData返回
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                }
			    	  // 省略中間代碼...
					  for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                            // 每個broker set下所有的broker地址(ip:port)
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        }
                        // Update Pub info
                        {
                            // 將從namesrv獲取到的路由信息轉換為TopicPublishInfo
                            // 期間會將沒有master的broker set的queue信息去除
                            TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
	// 省略中間代碼...
    } catch (InterruptedException e) {
        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    }

    return false;
}

到此,producer也知道自己可以向哪些MessageQueue發送消息了,接下來就是producer的負載均衡算法選出其中一個MessageQueue發送消息(org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#selectOneMessageQueue,這個暫時不詳表),MessageQueue包含的信息有topic、brokerName、queueId,但是producer發送的時候得知道broker的ip:port信息,而且一個brokerName對應的是一個broker set,並不能確定具體的broker,所以接下來應該找到具體的broker

// org.apache.rocketmq.client.impl.factory.MQClientInstance#findBrokerAddressInPublish
public String findBrokerAddressInPublish(final String brokerName) {
    // 上面updateTopicRouteInfoFromNameServer方法將broker set下的broker地址信息保存到brokerAddrTable
    // 再次重申:一個broker set下的broker的brokerName相同
    HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
    if (map != null && !map.isEmpty()) {
        // 沒有花樣,就是直接返回brokerId時MixAll.MASTER_ID的broker的ip:port信息
        // 前面說過master的brokerId就是MixAll.MASTER_ID,所以獲取到的broker是broker set中的master
        return map.get(MixAll.MASTER_ID);
    }

    return null;
}

終於真相大白,producer只會向是master的broker發送消息,也就是一個broker set中brokerId是0的broker。

producer只能發送消息到master,而不能發送到slave,這也說明了master負責讀“寫”,而slave只負責讀(當然,這里只說明了“寫”的部分,關於master 和slave的“讀”下一篇介紹)。

總結

本篇介紹了RocketMQ究竟做了什么來實現作為一個消息隊列中間件的高可用,由於篇幅會偏長,所以分為兩篇文章來說明,下一篇說明文中遺留下的另一個問題——RocketMQ源碼 — 六、 RocketMQ高可用(2)


參考:

關於高可用的系統

RocketMQ Architecture

RocketMQ源碼 — 三、 Producer消息發送過程


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM