RocketMQ 源碼分析之路由中心(NameServer)


你可能沒有看過 RocketMQ 的架構圖,沒關系,一起來學習一下,RocketMQ 架構圖如下:

RocketMQ 架構圖

在 RocketMQ 中,有四個角色:

  • Producer:消息的生產者,每個 MQ 中間件都有。
  • Consumer:消息的消費者,每個 MQ 中間件都有。
  • NameServer:RocketMQ 的路由中心,跟 ZooKeeper 差不多。
  • Broker:消息服務器,RocketMQ 的消息全部存儲在這里。

Producer 發送消息之前,先從 NameServer 中獲取到 Broker 服務器列表,然后根據負載均衡策略選擇一台 Broker 發送,消息消費時也是同樣的道理。可以說 NameServer 是 RocketMQ 的大腦,想要實現路由分發的功能,那么在 NameServer 必然要維護着 Broker 服務器信息,這中間就會涉及到 Broker 服務器服務狀態管理問題,這篇文章就來聊一聊 RocketMQ 是如何做服務狀態管理的。

在聊服務狀態管理之前,先來講一講為何不用 ZooKeeper 來做路由中心?

聽聞早期的 RocketMQ 是使用 ZooKeeper 來做路由中心。我們知道 ZooKeeper 功能比較強大,包括自動 Master 選舉等,強大的同時部署維護就變得復雜了,但是 ZooKeeper 的很多功能 RocketMQ 並不需要,RocketMQ 只需要一個輕量級的元數據服務器就夠了。所以就造了 NameServer 這個輪子。

還有一個原因就是中間件對穩定性要求比較高,使用 ZooKeeper 作為注冊和路由中心的話,就依賴了另一個中間件,提高了系統復雜性和維護成本,而 NameServer 只是 RocketMQ 中的一個模塊,且只有少量代碼,維護起來簡單,穩定性也提高了。

好了,說回服務狀態管理問題,其實這個並不陌生,在微服務領域有大量的中間件都涉及到了這個問題。對於服務狀態管理,一般有兩種解決思路。

第一種思路是主動探測,如圖:

主動探測是由路由方(比如 NameServer)發起的,每一個被路由方(比如 Broker)需要打開一個端口,然后路由方每隔一段時間(比如 30 秒)探測這些端口是否可用,如果可用就認為服務器正常,否則認為服務不可用,就把服務從列表中刪除。

這種方式存在的問題就路由方壓力可能過大,如果被路由方部署的實例較多時,那么每次探測的成本會比較高,探測的時間也比較長,可能會導致路由方可能不能正常工作。

第二種思路是心跳模式,如圖:

心跳模式不在是路由方發起了,改成被路由方每隔一段時間向路由方發送心跳包,路由方記錄被路由方的心跳包,包括服務器IP、上報時間等。每一次上報后,更新對應的信息。路由方啟動一個定時器,定期檢測當前時間和節點,最近續約時間的差值,如果達到一個閾值(比如說90秒),那么認為這個服務節點不可用。

現在大部分需要服務狀態管理的中間件,都采用心跳模式,沒有太多的缺陷,也不會對服務器造成多大的壓力。在 RocketMQ 中 NameServer 與 Broker 的通信也是采用 心跳模式

心跳模式中,有上報心跳、保存心跳信息、定時檢測這個步驟。我們從上報心跳和定時檢測這兩個方面,從源碼的角度,看看 RocketMQ 是如何實現心跳模式的。

先從上報心跳開始,在 RocketMQ 中,默認情況下,Broker 服務器會每間隔 30秒向集群中的所有 NameServer 發送心跳包。源代碼是BrokerController#start(),如下代碼:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
    // brokerConfig.getRegisterNameServerPeriod() 默認是 30 秒
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

其中上報心跳的時間用戶是可以自定義的,但是不會低於 10秒高於 60秒。當然這只是一個定時器,具體發送心跳包的方法是org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll(),代碼如下:

 public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
        // 獲取所有 NameServer 服務器
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            // 構建 broker 信息
            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);

            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            final byte[] body = requestBody.encode(compressed);
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            // 向 NameServer 逐個上報
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

心跳包發送完之后,就是 NameServer 處理心跳包了,NameServer 會將心跳信息保存起來,保存心跳信息的源代碼我就不貼了,涉及的東西比較多,有興趣的可以查看org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest()#RequestCode.REGISTER_BROKER,一步一步 Debug 就知道保存過程。

來看看最后一個操作定時檢測,NameServer 會開啟一個探測線程,源代碼在org.apache.rocketmq.namesrv.NamesrvController#initialize()下,代碼如下:


// 檢測 broker
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

NameServer 每 10秒會發起一次檢測。具體檢測源代碼是org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#scanNotActiveBroker(),代碼如下:


/**
* 檢測 broker 狀態
*/
public void scanNotActiveBroker() {
   // 遍歷 broker 存活列表
   Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
   while (it.hasNext()) {
       Entry<String, BrokerLiveInfo> next = it.next();
       long last = next.getValue().getLastUpdateTimestamp();
       // 如果最后一次上報時間已經超過兩分鍾,則移出
       if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
           RemotingUtil.closeChannel(next.getValue().getChannel());
           it.remove();
           log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
           this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
       }
   }
}

NameServer 會遍歷 Broker 存活列表,如果最后一次發送心跳包的時間超過 120秒,則認為 Broker 服務器不可用,將 Broker 從各種配置列表中移出。

到此為止,RocketMQ 的心跳模式實現就完成了,上面的源代碼都是一些粗略的,具體的實現細節還是比較繁瑣的,有興趣的可以深入研究源碼,獲取更多詳細信息。

關於RocketMQ 解決服務狀態管理的分享就這些,感謝您的閱讀,希望這篇文章對您的學習或者工作有一點幫助。有收獲的話,也可以幫忙推薦給其他的小伙伴,讓更多的人受益,萬分感謝

最后

目前互聯網上很多大佬都有 RocketMQ 相關文章,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支持。若文中有所錯誤之處,還望提出,謝謝。

歡迎關注公眾號【互聯網平頭哥】。這里有職場感悟、Java 技術,雖然不高大上,但通俗易懂。今天最好的是明天最低的要求,願你我共同進步。

互聯網平頭哥


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM