啟動源:
nameServer 啟動是使用 org.apache.rocketmq.namesrv.NamesrvStartup main 方法啟動的,腳手架這里就不多說了。
配置文件的配置信息請看這兩個配置類的字段:
org.apache.rocketmq.remoting.netty.NettyServerConfig
org.apache.rocketmq.common.namesrv.NamesrvConfig
啟動流程:
1.設置腳手架進行初始化
2.設置nameService Netty 默認啟動端口為9876 ,也可以通過 -n 或者配置文件進行修改
3.執行一些腳手架命令,進行配置NameSrvConfig 以及NettyServerConfig
4.實例化 NamesrvController
5.init NameSrv
6.啟動NameSrv
public static NamesrvController main0(String[] args) { String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); System.out.println(rocketmqHome); System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); try { //PackageConflictDetect.detectFastjson(); //1.腳手架命令 Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } final NamesrvConfig namesrvConfig = new NamesrvConfig(); final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //2.nameService 地址默認是9876 端口 nettyServerConfig.setListenPort(9876); if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); //3. -c 命令文件配置信息初始化到NamesrvConfig NettyServerConfig 中 MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, " + file + "%n"); in.close(); } } if (commandLine.hasOption('p')) { MixAll.printObjectProperties(null, namesrvConfig); MixAll.printObjectProperties(null, nettyServerConfig); System.exit(0); } MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig); if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); } LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml"); final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); //4.實例化 NamesrvController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); //5.初始化NameSrv 的信息 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); //6.啟動NameSrv controller.start(); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf(tip + "%n"); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
實例化Srv:
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) { this.namesrvConfig = namesrvConfig; //nettyServer 配置信息 this.nettyServerConfig = nettyServerConfig; this.kvConfigManager = new KVConfigManager(this); //broker 路由信息管理器,用來管理存儲broker 給我們發送的toipc 信息,這個很重要 this.routeInfoManager = new RouteInfoManager(); //連接檢測 this.brokerHousekeepingService = new BrokerHousekeepingService(this); this.configuration = new Configuration( log, this.namesrvConfig, this.nettyServerConfig ); this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath"); }
init NameSrv:
public boolean initialize() { //加載 kvConfig.json 文件 this.kvConfigManager.load(); //Netty Server 初始化 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); //注冊nameSrv處理器 this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //kv 打印定時任務 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true; }
啟動NameSrv
public void start() throws Exception { //Netty 服務端bind 9876 開始服務 this.remotingServer.start(); }
從啟動流程可以看到,NameSrv 只有一個服務端,沒有客戶端,不會主動與broker 以及consumer producer 進行交互 。
RouteInfoManager
RouteInfoManager 維護了broker 中topic ip queue 等信息,是namesrv 重要的一個組成部分,Broker 向 namesrv 報告自身信息全部維護在 RouteInfoManager 中,以及consumer
producter 都會從這里取到相關的信息。
private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); //topic 對應的queueData 信息 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //brokerName 與Broker信息映射 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //clipperName 與brokerName 映射 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); }