一、RocketMQ架構簡介
1.1 邏輯部署圖
1.2 核心組件說明
通過上圖可以看到,RocketMQ的核心組件主要包括4個,分別是NameServer、Broker、Producer和Consumer,下面我們先依次簡單說明下這四個核心組件:
NameServer:NameServer充當路由信息的提供者。生產者或消費者能夠通過NameServer查找各Topic相應的Broker IP列表。多個Namesrver實例組成集群,但相互獨立,沒有信息交換。
Broker:消息中轉角色,負責存儲消息、轉發消息。Broker服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。Broker服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
Producer:負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到Broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
Consumer:負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
除了上面說的三個核心組件外,還有Topic這個概念下面也會多次提到:
Topic:表示一類消息的集合,每個Topic包含若干條消息,每條消息只能屬於一個Topic,是RocketMQ進行消息訂閱的基本單位。一個Topic可以分片在多個Broker集群上,每一個Topic分片包含多個queue,具體結構可以參考下圖:
1.3 設計理念
RocketMQ是基於主題的發布與訂閱模式,核心功能包括消息發送、消息存儲、消息消費,整體設計追求簡單與性能第一,歸納來說主要是下面三種:
-
NameServer取代ZK充當注冊中心,NameServer集群間互不通信,容忍路由信息在集群內分鍾級不一致,更加輕量級;
-
使用內存映射機制實現高效的IO存儲,達到高吞吐量;
-
容忍設計缺陷,通過ACK確保消息至少消費一次,但是如果ACK丟失,可能消息重復消費,這種情況設計上允許,交給使用者自己保證。
本文重點介紹的就是NameServer,我們下面一起來看下NameServer是如何啟動以及如何進行路由管理的。
二、NameServer架構設計
在第一章已經簡單介紹了NameServer取代zk作為一種更輕量級的注冊中心充當路由信息的提供者。那么具體是如何來實現路由信息管理的呢?我們先看下圖:
上面的圖描述了NameServer進行路由注冊、路由剔除和路由發現的核心原理。
路由注冊:Broker服務器在啟動的時候會想NameServer集群中所有的NameServer發送心跳信號進行注冊,並會每隔30秒向nameserver發送心跳,告訴NameServer自己活着。NameServer接收到Broker發送的心跳包之后,會記錄該broker信息,並保存最近一次收到心跳包的時間。
路由剔除:NameServer和每個Broker保持長連接,每隔30秒接收Broker發送的心跳包,同時自身每個10秒掃描BrokerLiveTable,比較上次收到心跳時間和當前時間比較是否大於120秒,如果超過,那么認為Broker不可用,剔除路由表中該Broker相關信息。
路由發現:路由發現不是實時的,路由變化后,NameServer不主動推給客戶端,等待producer定期拉取最新路由信息。這樣的設計方式降低了NameServer實現的復雜性,當路由發生變化時通過在消息發送端的容錯機制來保證消息發送的高可用(這塊內容會在后續介紹producer消息發送時介紹,本文不展開講解)。
高可用:NameServer通過部署多台NameServer服務器來保證自身的高可用,同時多個NameServer服務器之間不進行通信,這樣路由信息發生變化時,各個NameServer服務器之間數據可能不是完全相同的,但是通過發送端的容錯機制保證消息發送的高可用。這個也正是NameServer追求簡單高效的目的所在。
三、 啟動流程
在整理了解了NameServer的架構設計之后,我們先來看下NameServer到底是如何啟動的呢?
我們先來看下代碼入口:org.apache.rocketmq.namesrv.NamesrvStartup#main(String[] args),實際調用的是main0()方法,
代碼如下:
public static NamesrvController main0(String[] args) { try { //創建namesrvController NamesrvController controller = createNamesrvController(args); //初始化並啟動NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
通過main方法啟動NameServer,主要分為兩大步,先創建NamesrvController,然后再初始化並啟動NamesrvController。我們分別展開來分析。
3.1 時序圖
具體展開閱讀代碼之前,我們先通過一個序列圖對整體流程有個了解,如下圖:
3.2 創建NamesrvController
先來看核心代碼,如下:
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { // 設置版本號為當前版本號 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); //構造org.apache.commons.cli.Options,並添加-h -n參數,-h參數是打印幫助信息,-n參數是指定namesrvAddr Options options = ServerUtil.buildCommandlineOptions(new Options()); //初始化commandLine,並在options中添加-c -p參數,-c指定nameserver的配置文件路徑,-p標識打印配置信息 commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //nameserver配置類,業務參數 final NamesrvConfig namesrvConfig = new NamesrvConfig(); //netty服務器配置類,網絡參數 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //設置nameserver的端口號 nettyServerConfig.setListenPort(9876); //命令帶有-c參數,說明指定配置文件,需要根據配置文件路徑讀取配置文件內容,並將文件中配置信息賦值給NamesrvConfig和NettyServerConfig if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //反射的方式 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //設置配置文件路徑 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //命令行帶有-p,說明是打印參數的命令,那么就打印出NamesrvConfig和NettyServerConfig的屬性。在啟動NameServer時可以先使用./mqnameserver -c configFile -p打印當前加載的配置屬性 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); //打印參數命令不需要啟動nameserver服務,只需要打印參數即可 System.exit(0); } //解析命令行參數,並加載到namesrvConfig中 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //檢查ROCKETMQ_HOME,不能為空 if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } //初始化logback日志工廠,rocketmq默認使用logback作為日志輸出 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //創建NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); //將全局Properties的內容復制到NamesrvController.Configuration.allConfigs中 // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); return controller; }
通過上面對每一行代碼的注釋,可以看出來,創建NamesrvController的過程主要分為兩步:
Step1:通過命令行中獲取配置。賦值給NamesrvConfig和NettyServerConfig類。
Step2:根據配置類NamesrvConfig和NettyServerConfig構造一個NamesrvController實例。
可見NamesrvConfig和NettyServerConfig是想當重要的,這兩個類分別是NameServer的業務參數和網絡參數,我們分別看下這兩個類里面有哪些屬性:
NamesrvConfig
NettyServerConfig
注:Apache Commons CLI是開源的命令行解析工具,它可以幫助開發者快速構建啟動命令,並且幫助你組織命令的參數、以及輸出列表等。
3.3 初始化並啟動
創建了NamesrvController實例之后,開始初始化並啟動NameServer。
首先進行初始化,代碼入口是NamesrvController#initialize。
public boolean initialize() { //加載kvConfigPath下kvConfig.json配置文件里的KV配置,然后將這些配置放到KVConfigManager#configTable屬性中 this.kvConfigManager.load(); //根據nettyServerConfig初始化一個netty服務器。 //brokerHousekeepingService是在NamesrvController實例化時構造函數里實例化的,該類負責Broker連接事件的處理,實現了ChannelEventListener,主要用來管理RouteInfoManager的brokerLiveTable this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); //初始化負責處理Netty網絡交互數據的線程池,默認線程數是8個 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注冊Netty服務端業務處理邏輯,如果開啟了clusterTest,那么注冊的請求處理類是ClusterTestRequestProcessor,否則請求處理類是DefaultRequestProcessor this.registerProcessor(); //注冊心跳機制線程池,延遲5秒啟動,每隔10秒遍歷RouteInfoManager#brokerLiveTable這個屬性,用來掃描不存活的broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //注冊打印KV配置線程池,延遲1分鍾啟動、每10分鍾打印出kvConfig配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); //rocketmq可以通過開啟TLS來提高數據傳輸的安全性,如果開啟了,那么需要注冊一個監聽器來重新加載SslContext if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }
上面的代碼是NameServer初始化流程,通過每行代碼的注釋,可以看出來,主要有5步驟操作:
-
Step1:加載KV配置,並寫入到KVConfigManager的configTable屬性中;
-
Step2:初始化netty服務器;
-
Step3:初始化處理netty網絡交互數據的線程池;
-
Step4:注冊心跳機制線程池,啟動5秒后每隔10秒檢測一次Broker的存活情況;
-
Step5:注冊打印KV配置的線程池,啟動1分鍾后,每隔10分鍾打印一次KV配置。
RocketMQ的開發團隊還使用了一個常用的編程技巧,就是使用JVM鈎子函數對NameServer進行優雅停機。這樣在JVM進程關閉前,會先執行shutdown操作。
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } }));
執行start函數,啟動NameServer。代碼比較簡單,就是將第一步中創建的netty server進行啟動。其中remotingServer.start()方法不展開詳細說明了,需要對netty比較熟悉,不是本篇文章重點,有興趣的同學可以自行下載源碼閱讀。
public void start() throws Exception { //啟動netty服務 this.remotingServer.start(); //如果開啟了TLS if (this.fileWatchService != null) { this.fileWatchService.start(); } }
四、路由管理
我們在第二章開篇有了解到NameServer作為一個輕量級的注冊中心,主要是為消息生產者和消費者提供Topic的路由信息,並對這些路由信息和Broker節點進行管理,主要包括路由注冊、路由剔除和路由發現。
本章將會通過源碼的角度來具體分析NameServer是如果進行路由信息管理的。核心代碼主要都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中實現。
4.1 路由元信息
在了解路由信息管理之前,我們首先需要了解NameServer到底存儲了哪些路由元信息,數據結構分別是什么樣的。
查看代碼我們可以看到主要通過5個屬性來維護路由元信息,如下:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
我們依次對這5個屬性進行展開說明。
4.1.1 TopicQueueTable
說明:Topic消息隊列路由信息,消息發送時根據路由表進行負載均衡。
數據結構:HashMap結構,key是Topic名字,value是一個類型是QueueData的隊列集合。在第一章就講過,一個Topic中有多個隊列。QueueData的數據結構如下:
數據結構示例:
topicQueueTable:{
"topic1": [ { "brokerName": "broker-a", "readQueueNums":4, "writeQueueNums":4, "perm":6, "topicSynFlag":0, }, { "brokerName": "broker-b", "readQueueNums":4, "writeQueueNums":4, "perm":6, "topicSynFlag":0, } ] }
4.1.2 BrokerAddrTable
說明:Broker基礎信息,包含BrokerName、所屬集群名稱、主備Broker地址。
數據結構:HashMap結構,key是BrokerName,value是一個類型是BrokerData的對象。BrokerData的數據結構如下(可以結合下面Broker主從結構邏輯圖來理解):
Broker主從結構邏輯圖:
數據結構示例:
brokerAddrTable:{
"broker-a": { "cluster": "c1", "brokerName": "broker-a", "brokerAddrs": { 0: "192.168.1.1:10000", 1: "192.168.1.2:10000" } }, "broker-b": { "cluster": "c1", "brokerName": "broker-b", "brokerAddrs": { 0: "192.168.1.3:10000", 1: "192.168.1.4:10000" } } }
4.1.3 ClusterAddrTable
說明:Broker集群信息,存儲集群中所有Broker名稱。
數據結構:HashMap結構,key是ClusterName,value是存儲BrokerName的Set結構。
數據結構示例:
clusterAddrTable:{
"c1": ["broker-a","broker-b"] }
4.1.4 BrokerLiveTable
說明:Broker狀態信息。NameServer每次收到心跳包時會替換該信息
數據結構:HashMap結構,key是Broker的地址,value是BrokerLiveInfo結構的該Broker信息對象。BrokerLiveInfo的數據結構如下:
數據結構示例:
brokerLiveTable:{
"192.168.1.1:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"" }, "192.168.1.2:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"192.168.1.1:10000" }, "192.168.1.3:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"" }, "192.168.1.4:10000": { "lastUpdateTimestamp": 1518270318980, "dataVersion":versionObj1, "channel":channelObj, "haServerAddr":"192.168.1.3:10000" } }
4.1.5 filterServerTable
說明:Broker上的FilterServer列表,消息過濾服務器列表,后續介紹Consumer時會介紹,consumer拉取數據是通過filterServer拉取,consumer向Broker注冊。
數據結構:HashMap結構,key是Broker地址,value是記錄了filterServer地址的List集合。
4.2 路由注冊
路由注冊是通過Broker和NameServer之間的心跳功能來實現的。主要分為兩步:
Step1:
Broker啟動時向集群中所有NameServer發送心跳語句,每隔30秒(默認30s,時間間隔在10秒到60秒之間)再發一次。
Step2:
NameServer收到心跳包更新topicQueueTable,brokerAddrTable,brokerLiveTable,clusterAddrTable,filterServerTable。
我們分別展開分析這兩步。
4.2.1 Broker發送心跳包
發送心跳包的核心邏輯是在Broker啟動邏輯里,代碼入口是org.apache.rocketmq.broker.BrokerController#start,本篇文章重點關注的是發送心跳包的邏輯實現,只列出發送心跳包的核心代碼,如下:
1)創建了一個線程池注冊Broker,程序啟動10秒后執行,每隔30秒(默認30s,時間間隔在10秒到60秒之間,BrokerConfig.getRegisterNameServerPeriod()的默認值是30秒)執行一次。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
2)封裝Topic配置和版本號之后,進行實際的路由注冊(注:封裝Topic配置不是本篇重點,會在介紹Broker源碼時重點講解)。實際路由注冊是在org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll中實現,核心代碼如下:
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>(); //獲取nameserver地址列表 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { /** *封裝請求包頭start *封裝請求包頭,主要封裝broker相關信息 **/ final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); //封裝requestBody,包括topic和filterServerList相關信息 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); /** *封裝請求包頭end **/ //開啟多線程到每個nameserver進行注冊 final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { //實際進行注冊方法 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { //封裝nameserver返回的信息 registerBrokerResultList.add(result); } log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
從上面代碼來看,也比較簡單,首先需要封裝請求包頭和requestBody,然后開啟多線程到每個NameServer服務器去注冊。
請求包頭類型為RegisterBrokerRequestHeader,主要包括如下字段:
requestBody類型是RegisterBrokerBody,主要包括如下字段:
1)實際的路由注冊是通過registerBroker方法實現,核心代碼如下:
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { //創建請求指令,需要注意RequestCode.REGISTER_BROKER,nameserver端的網絡處理器會根據requestCode進行相應的業務處理 RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); //基於netty進行網絡傳輸 if (oneway) { //如果是單向調用,沒有返回值,不返回nameserver返回結果 try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } //異步調用向nameserver發起注冊,獲取nameserver的返回信息 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { //獲取返回的reponseHeader RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); //重新封裝返回結果,更新masterAddr和haServerAddr RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr()); }
borker和NameServer之間通過netty進行網絡傳輸,Broker向NameServer發起注冊時會在請求中添加注冊碼RequestCode.REGISTER_BROKER。這是一種網絡跟蹤方法,RocketMQ的每個請求都會定義一個requestCode,服務端的網絡處理器會根據不同的requestCode進行影響的業務處理。
4.2.2 NameServer處理心跳包
Broker發出路由注冊的心跳包之后,NameServer會根據心跳包中的requestCode進行處理。NameServer的默認網絡處理器是DefaultRequestProcessor,具體代碼如下:
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (ctx != null) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { ...... //,如果是RequestCode.REGISTER_BROKER,進行broker注冊 case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } ...... default: break; } return null; }
判斷requestCode,如果是RequestCode.REGISTER_BROKER,那么確定業務處理邏輯是注冊Broker。根據Broker版本號選擇不同的方法,我們已V3_0_11以上為例,調用registerBrokerWithFilterServer方法進行注冊主要步驟分為三步:
Step1:
解析requestHeader並驗簽(基於crc32),判斷數據是否正確;
Step2:
解析Topic信息;
Step3:
調用RouteInfoManager#registerBroker來進行Broker注冊;
核心注冊邏輯是由RouteInfoManager#registerBroker來實現,核心代碼如下:
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { //加寫鎖,防止並發寫RoutInfoManager中的路由表信息。 this.lock.writeLock().lockInterruptibly(); //根據clusterName從clusterAddrTable中獲取所有broker名字集合 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); //如果沒有獲取到,說明broker所屬集群還沒記錄,那么需要創建,並將brokerName加入到集群的broker集合中 if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //根據brokerName嘗試從brokerAddrTable中獲取brokerData BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { //如果沒獲取到brokerData,新建BrokerData並放入brokerAddrTable,registerFirst設為true; registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } //更新brokerData中的brokerAddrs Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //考慮到可能出現master掛了,slave變成master的情況,這時候brokerId會變成0,這時候需要把老的brokerAddr給刪除 //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } } //更新brokerAddrs,根據返回的oldAddr判斷是否是第一次注冊的broker String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); //如過Broker是Master,並且Broker的Topic配置信息發生變化或者是首次注冊,需要創建或更新Topic路由元數據,填充topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { //創建或更新Topic路由元數據 this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } //更新BrokerLivelnfo,BrokeLivelnfo是執行路由刪除的重要依據 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } //注冊Broker的filterServer地址列表 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } //如果此Broker為從節點,則需要查找Broker Master的節點信息,並更新對應masterAddr屬性 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
通過上面的源碼分析,可以分解出一個Broker的注冊主要分7步:
-
Step1:加寫鎖,防止並發寫RoutInfoManager中的路由表信息;
-
Step2:判斷Broker所屬集群是否存在,不存在需要創建,並將Broker名加入到集群Broker集合中;
-
Step3:維護BrokerData;
-
Step4:如過Broker是Master,並且Broker的Topic配置信息發生變化或者是首次注冊,需要創建或更新Topic路由元數據,填充TopicQueueTable;
-
Step5:更新BrokerLivelnfo;
-
Step6:注冊Broker的filterServer地址列表;
-
Step7:如果此Broker為從節點,則需要查找Broker Master的節點信息,並更新對應masterAddr屬性,並返回給Broker端。
4.3 路由剔除
4.3.1 觸發條件
路由剔除的觸發條件主要有兩個:
NameServer每隔10s掃描BrokerLiveTable,連續120s沒收到心跳包,則移除該Broker並關閉socket連接;
Broker正常關閉時觸發路由刪除。
4.3.2 源碼解析
上面描述的觸發點最終刪除路由的邏輯是一樣的,統一在RouteInfoManager#onChannelDestroy
中實現,核心代碼如下:
public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { //加讀鎖 this.lock.readLock().lockInterruptibly(); //通過channel從brokerLiveTable中找出對應的Broker地址 Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { //釋放讀鎖 this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } //若該Broker已經從存活的Broker地址列表中被清除,則直接使用remoteAddr if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { //申請寫鎖 this.lock.writeLock().lockInterruptibly(); //根據brokerAddress,將這個brokerAddress從brokerLiveTable和filterServerTable中移除 this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); //遍歷brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); //根據brokerAddress找到對應的brokerData,並將brokerData中對應的brokerAddress移除 if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } //如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除 if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } if (brokerNameFound != null && removeBrokerName) { //遍歷clusterAddrTable Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); String clusterName = entry.getKey(); Set<String> brokerNames = entry.getValue(); //根據第三步中獲取的需要移除的brokerName,將對應的brokerName移除了 boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName); //如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除 if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); it.remove(); } break; } } } if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); //遍歷topicQueueTable while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); String topic = entry.getKey(); List<QueueData> queueDataList = entry.getValue(); Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { QueueData queueData = itQueueData.next(); //根據brokerName,將topic下對應的broker移除掉 if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } //如果該topic下只有一個待移除的broker,那么該topic也從table中移除 if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { //釋放寫鎖 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } }
路由刪除整體邏輯主要分為6步:
-
Step1:加readlock,通過channel從BrokerLiveTable中找出對應的Broker地址,釋放readlock,若該Broker已經從存活的Broker地址列表中被清除,則直接使用remoteAddr。
-
Step2:申請寫鎖,根據BrokerAddress從BrokerLiveTable、filterServerTable移除。
-
Step3:遍歷BrokerAddrTable,根據BrokerAddress找到對應的brokerData,並將brokerData中對應的brokerAddress移除,如果移除后,整個brokerData的brokerAddress空了,那么將整個brokerData移除。
-
Step4:遍歷clusterAddrTable,根據第三步中獲取的需要移除的BrokerName,將對應的brokerName移除了。如果移除后,該集合為空,那么將整個集群從clusterAddrTable中移除。
-
Step5:遍歷TopicQueueTable,根據BrokerName,將Topic下對應的Broker移除掉,如果該Topic下只有一個待移除的Broker,那么該Topic也從table中移除。
-
Step6:釋放寫鎖。
從上面可以看出,路由剔除的整體邏輯比較簡單,就是單純地針對路由元信息的數據結構進行操作。為了大家能夠更好地理解這塊代碼,建議大家對照4.1中介紹的路由元信息的數據結構來進行代碼走讀。
4.4 路由發現
當路由信息發生變化之后,NameServer不會主動推送給客戶端,而是等待客戶端定期到nameserver主動拉取最新路由信息。這種設計方式降低了NameServer實現的復雜性。
4.4.1 producer主動拉取
producer在啟動后會開啟一系列定時任務,其中有一個任務就是定期從NameServer獲取Topic路由信息。代碼入口是MQClientInstance#start-ScheduledTask(),核心代碼如下:
private void startScheduledTask() { ...... this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //從nameserver更新最新的topic路由信息 MQClientInstance.this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } } }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS); ...... } /** * 從nameserver獲取topic路由信息 */ public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { ...... //向nameserver發送請求包,requestCode為RequestCode.GET_ROUTEINFO_BY_TOPIC RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader); ...... }
producer和NameServer之間通過netty進行網絡傳輸,producer向NameServer發起的請求中添加注冊碼
RequestCode.GET_ROUTEINFO_BY_TOPIC。
4.4.2 NameServer返回路由信息
NameServer收到producer發送的請求后,會根據請求中的requestCode進行處理。處理requestCode同樣是在默認的網絡處理器DefaultRequestProcessor中進行處理,最終通過RouteInfoManager#pickupTopicRouteData來實現。
TopicRouteData結構
在正式解析源碼前,我們先看下NameServer返回給producer的數據結構。通過代碼可以看到,返回的是一個TopicRouteData對象,具體結構如下:
其中QueueData,BrokerData,filterServerTable在4.1章節介紹路由元信息時有介紹。
源碼分析
在了解了返回給producer的TopicRouteData結構后,我們進入RouteInfoManager#pickupTopicRouteData方法來看下具體如何實現。
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { //加讀鎖 this.lock.readLock().lockInterruptibly(); //從元數據topicQueueTable中根據topic名字獲取隊列集合 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { //將獲取到的隊列集合寫入topicRouteData的queueDatas中 topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } //遍歷從QueueData集合中提取的brokerName for (String brokerName : brokerNameSet) { //根據brokerName從brokerAddrTable獲取brokerData BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { //克隆brokerData對象,並寫入到topicRouteData的brokerDatas中 BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; //遍歷brokerAddrs for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { //根據brokerAddr獲取filterServerList,封裝后寫入到topicRouteData的filterServerTable中 List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { //釋放讀鎖 this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
上面代碼封裝了TopicRouteData的queueDatas、BrokerDatas和filterServerTable,還有orderTopicConf字段沒封裝,我們再看下這個字段是在什么時候封裝的,我們向上看RouteInfoManager#pickupTopicRouteData的調用方法DefaultRequestProcessor#getRouteInfoByTopic如下:
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { ...... //這塊代碼就是上面解析的代碼,獲取到topicRouteData對象 TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { //判斷nameserver的orderMessageEnable配置是否打開 if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { //如果配置打開了,根據namespace和topic名字獲取kvConfig配置文件中順序消息配置內容 String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); //封裝orderTopicConf topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } //如果沒有獲取到topic路由,那么reponseCode為TOPIC_NOT_EXIST response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; }
結合這兩個方法,我們可以總結出查找Topic路由主要分為3個步驟:
調用RouteInfoManager#pickupTopicRouteData,從topicQueueTable,brokerAddrTabl,filterServerTable中獲取信息,分別填充queue-Datas、BrokerDatas、filterServerTable。
如果topic為順序消息,那么從KVconfig中獲取關於順序消息先關的配置填充到orderTopicConf中。
如果找不到路由信息,那么返回code為ResponseCode.TOPIC_NOT_EXIST。
五、小結
本篇文章主要是從源碼的角度給大家介紹了RocketMQ的NameServer,包括NameServer的啟動流程、路由注冊、路由剔除和路由發現。我們在了解了NameServer的設計原理之后,也可以回過頭思考下在設計過程中一些值得我們學習的小技巧,在此我拋磚引玉提出兩點:
-
啟動流程注冊JVM鈎子用於優雅停機。這是一個編程技巧,我們在實際開發過程中,如果有使用線程池或者一些常駐線程任務時,可以考慮通過注冊JVM鈎子的方式,在JVM關閉前釋放資源或者完成一些事情來保證優雅停機。
-
更新路由表時需要通過加鎖防止並發操作,這里使用的是鎖粒度較少的讀寫鎖,允許多個消息發送者並發讀,保證消息發送時的高並發,但同一時刻NameServer只處理一個Broker心跳包,多個心跳包請求串行執行,這也是讀寫鎖經典使用場景。