RocketMQ之NameServer學習筆記


org.apache.rocketmq.namesrv.NamesrvController

    NameserController,NameServer的核心控制類。

1.1 NamesrvConfig 

NamesrvConfig,主要指定nameserver的相關配置目錄屬性

1)kvConfigPath(kvConfig.json)

2)mqhome/namesrv/namesrv.properties

3)orderMessageEnable,是否開啟順序消息功能,默認為false

 

1.2 ScheduledExecutorService scheduledExecutorService

    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryIm               pl("NSScheduledThread"));

   NameServer 定時任務執行線程池,一個線程,默認定時執行兩個任務:

   任務1、每隔10s掃描broker,維護當前存活的Broker信息

   任務2、每隔10s打印KVConfig信息。

 

1.3 KVConfigManager 

   讀取或變更NameServer的配置屬性,加載NamesrvConfig中配置的配置文件到內存,此類一個亮點就是使用輕量級的非線程安全容器,再結合讀寫鎖對資源讀寫進行保護。盡最大程度提高線程的並發度。

 

1.4 RouteInfoManager 

    NameServer數據的載體,記錄Broker,Topic等信息。

//NameServer 與 Broker 空閑時長,默認2分鍾,在2分鍾內Nameserver沒有收到Broker的心跳包,則關閉該連接。 
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //讀寫鎖,用來保護非線程安全容器HashMap 
    private final ReadWriteLock lock = new ReentrantReadWriteLock(); //topicQueueTable,主題與隊列關系,記錄一個主題的隊列分布在哪些Broker上,每個Broker上存在該主題的隊列個數 
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //brokerAddrTable,所有Broker信息,使用brokerName當key,BrokerData信息描述每一個broker信息。
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //clusterAddrTable,broker集群信息,每個集群包含哪些Broker 
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //brokerLiveTable,當前存活的Broker,該信息不是實時的,NameServer每10S掃描一次所有的broker,根據心跳包的時間得知broker的狀態,該機制也是導致當一個master Down掉后,消息生產者無法感知,可能繼續向Down掉的Master發送消息,導致失敗
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;        
/** * Broker信息;key為brokerName,value為BrokerData */
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; public class BrokerData implements Comparable<BrokerData> { /** * Cluster名稱 */
    private String cluster; /** * broker名稱 */
    private String brokerName; /** * 0->ip:port */
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs; }
  • 注冊topic信息topicQueueTable
/** * 消息隊列路由信息;key為topic,value為QueueData */
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; public class QueueData implements Comparable<QueueData> { /** * Broker名稱 */
    private String brokerName; /** * 讀隊列個數,默認4個 */
    private int readQueueNums; /** * 寫隊列個數,默認4個 */
    private int writeQueueNums; /** * 隊列權限 */
    private int perm; /** * 配置的,同步復制還是異步復制標記,對應TopicConfig.topicSysFlag * */
    private int topicSynFlag; }
/** * Broker狀態信息,NameServer每次收到心跳包會替換該信息,每隔30秒更新一次 * brokerAddr: ip:port->{} */
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; class BrokerLiveInfo { /** * 存儲上次收到心跳包的時間,每隔30秒更新一次 */
    private long lastUpdateTimestamp; private DataVersion dataVersion; private Channel channel; private String haServerAddr; }

 

 

  • NamesrvStartup.java 啟動入口類,NameServer 啟動默認端口9876
nettyServerConfig.setListenPort(9876)
  • 每10秒鍾掃描一次,移除失效的broker,同時刪除緩存元數據信息
//初始化NameServer
boolean initResult = controller.initialize(); public boolean initialize() { //加載KV配置
    this.kvConfigManager.load(); this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); this.registerProcessor(); //每10秒鍾掃描一次,移除失效的broker
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //每隔10秒鍾打印一次KV配置信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }

 

  • 失效時間為2分鍾,即:Broker在2分鍾內未上報心跳會被移除
/** * 失效時間為2分鍾,即:Broker在2分鍾內未上報心跳會被移除 */
public void scanNotActiveBroker() { Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } } private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
2.DefaultRequestProcessor
  • 用於響應客戶端、Broker的請求。主要向NameServer發送心跳包、獲取Cluster、Broker、Topic元數據信息。
  • 調用鏈:
    在NameServer啟動時注冊,NamesrvController.initialize()->registerProcessor()
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { if (log.isDebugEnabled()) { log.debug("receive request, {} {} {}", request.getCode(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), request); } switch (request.getCode()) { case RequestCode.PUT_KV_CONFIG://增加NameServer配置信息;由DefaultMQAdminExt使用
            return this.putKVConfig(ctx, request); case RequestCode.GET_KV_CONFIG://根據NameSpace和key獲取NameServer配置信息;由DefaultMQAdminExt使用
            return this.getKVConfig(ctx, request); case RequestCode.DELETE_KV_CONFIG: //據NameSapce和Key刪除NameServerr配置信息
            return this.deleteKVConfig(ctx, request); case RequestCode.REGISTER_BROKER: //注冊Broker信息;由BrokerOuterAPI.registerBroker使用,在BrokerController啟動時調用
            Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); } case RequestCode.UNREGISTER_BROKER://移除注銷broker信息;由BrokerOuterAPI.unregisterBroker使用,在BrokerController.shutdown時調用
            return this.unregisterBroker(ctx, request); case RequestCode.GET_ROUTEINTO_BY_TOPIC: //獲取Topic路由信息 TopicRouteData
            return this.getRouteInfoByTopic(ctx, request); case RequestCode.GET_BROKER_CLUSTER_INFO://獲取Cluster及Broker信息
            return this.getBrokerClusterInfo(ctx, request); case RequestCode.WIPE_WRITE_PERM_OF_BROKER: //去除該broker上所有topic的寫權限
            return this.wipeWritePermOfBroker(ctx, request); case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER: //獲取所有的Topic列表
            return getAllTopicListFromNameserver(ctx, request); case RequestCode.DELETE_TOPIC_IN_NAMESRV: //從nameServer中刪除topic
            return deleteTopicInNamesrv(ctx, request); case RequestCode.GET_KVLIST_BY_NAMESPACE: //獲取配置信息 configTable
            return this.getKVListByNamespace(ctx, request); case RequestCode.GET_TOPICS_BY_CLUSTER: //獲取該集群下的所有topic list
            return this.getTopicsByCluster(ctx, request); case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS: // 此處意思為:系統會將集群名稱、broker名稱作為默認topic創建。現在獲取這類topic
            return this.getSystemTopicListFromNs(ctx, request); case RequestCode.GET_UNIT_TOPIC_LIST: //暫無使用
            return this.getUnitTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST: //暫無使用
            return this.getHasUnitSubTopicList(ctx, request); case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST://暫無使用
            return this.getHasUnitSubUnUnitTopicList(ctx, request); case RequestCode.UPDATE_NAMESRV_CONFIG: //更新properties請求
            return this.updateConfig(ctx, request); case RequestCode.GET_NAMESRV_CONFIG: //獲取properties內容
            return this.getConfig(ctx, request); default: break; } return null; }

注冊broker信息

  • Broker每隔30秒向所有的NameServer上報Topic注冊信息
  • Broker調用鏈
  BrokerController.start()->this.registerBrokerAll()->this.brokerOuterAPI.registerBrokerAll()
//每隔30秒向所有的NameServer上報Topic注冊信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
  • 服務端處理主要包括:注冊集群信息clusterAddrTable、注冊broker信息brokerAddrTable、
    注冊topic信息topicQueueTable、broker心跳包brokerLiveTable
  • NameServer處理鏈
  DefaultRequestProcessor->processRequest->RequestCode.REGISTER_BROKER->this.registerBroker->RouteInfoManager.registerBroker()
public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { //加寫鎖,防止並發修改
            this.lock.writeLock().lockInterruptibly(); //注冊集群信息
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //注冊broker信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); //Topic配置變化了;Master Broker第一次注冊或者Topic dataVersion不相同時更新路由信息 //有Topic新增時dataVersion會遞增
            if (null != topicConfigWrapper //                 && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//                     || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); //更新topicQueueTable
 } } } } //更新broker心跳信息
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); //新broker注冊時會有日志輸出
            if (null == prevBrokerLiveInfo) { log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); } //更新filterServer信息
            if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } //Slave設置MasterAddr和HaServerAddr
            if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM