啟動源:
RocketMq Broker 啟動 從 rocketmq broker 啟動mqbroker 啟動腳本可以得知,最終運行的是 BrokerStartup 的main 方法,並將腳本參數傳遞。
export ROCKETMQ_HOME
//運行的啟動腳本 $@ 表示附加的所有參數信息傳遞給 BrokerStartup
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@
關於Broker啟動參數,我們不用死記,用的時候可以參考
org.apache.rocketmq.common.BrokerConfig -------> broker 配置文件
org.apache.rocketmq.remoting.netty.NettyServerConfig ------> netty 服務端配置文件
org.apache.rocketmq.remoting.netty.NettyClientConfig ------> netty 客戶端配置文件
org.apache.rocketmq.store.config.MessageStoreConfig ------> store 配置文件
apache 命令腳手架:
命令腳手架即我們啟動broker main 方法時候,會附帶一些參數信息,使用命令腳手架可以讓我們很方便的知道對應的參數信息,我們可以通過 -c 配置文件路徑 ,來初始化配置。
啟動流程:
/* main 方法啟動 * @param args */ public static void main(String[] args) { //創建brkerController start(createBrokerController(args)); }
1.命令腳手架注冊:
可以得知broker 啟動追加的參數信息
-n : 指定broker 的 namesrvAddr 地址
-h :打印命令
-c: 指定配置文件的路徑
-p: 啟動時候日志打印配置信息
-m:啟動時候日志打印導入的配置信息
//1.注冊一些 -n -h 命令 Options options = ServerUtil.buildCommandlineOptions(new Options()); //2.注冊 -c -p -m 命令 commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); }
2.設置 nettyServerConfig綁定的端口信息
//設置broker 服務端綁定的端口為10911 nettyServerConfig.setListenPort(10911);
3. 組裝配置文件信息:
//1. -c 后面追加的參數為配置文件路徑 if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { configFile = file; InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); properties2SystemEnv(properties); //2.反射將配置文件broker信息注入BrokerConfig 配置類中 MixAll.properties2Object(properties, brokerConfig); //3.反射將配置文件NettyServer信息注入NettyServerConfig 配置類中 MixAll.properties2Object(properties, nettyServerConfig); //4.反射將配置文件NettyClient信息注入NettyClientConfig 配置類中 MixAll.properties2Object(properties, nettyClientConfig); //5.反射將配置文件store 信息注入MessageStoreConfig 配置類中 MixAll.properties2Object(properties, messageStoreConfig); BrokerPathConfigHelper.setBrokerConfigPath(file); in.close(); } } //處理其他命令,注入到brokerConfig 中,比如我們的-n 命令 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
4.創建BrokerController:
//1.創建 BrokerController instance final BrokerController controller = new BrokerController( brokerConfig, nettyServerConfig, nettyClientConfig, messageStoreConfig); // remember all configs to prevent discard controller.getConfiguration().registerConfig(properties); //2.進行初始化 Broker controller boolean initResult = controller.initialize();
創建實例:
public BrokerController( final BrokerConfig brokerConfig, final NettyServerConfig nettyServerConfig, final NettyClientConfig nettyClientConfig, final MessageStoreConfig messageStoreConfig ) { //broker 配置信息 this.brokerConfig = brokerConfig; //nettyServer配置信息 this.nettyServerConfig = nettyServerConfig; //nettyClient 配置信息 this.nettyClientConfig = nettyClientConfig; //store 配置信息 this.messageStoreConfig = messageStoreConfig; //consumer 偏移量管理器,會讀取store/config/consumerOffset.json json 配置文件,維護了offsetTable Map 結構 this.consumerOffsetManager = new ConsumerOffsetManager(this); //topic 配置管理器,會讀取store/config/topics.json this.topicConfigManager = new TopicConfigManager(this); //拉去消息處理器,用來處理消費端消息拉去,關聯的業務code 為RequestCode.PULL_MESSAGE this.pullMessageProcessor = new PullMessageProcessor(this); // this.pullRequestHoldService = new PullRequestHoldService(this); // this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); // this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); // 消費過濾管理器會讀取store/config/consumerFilter.json this.consumerFilterManager = new ConsumerFilterManager(this); //生產者管理器 this.producerManager = new ProducerManager(); //用於清除不活動的連接,可以看到里面有一些掃描生產者以及消費者不活動連接的方法 this.clientHousekeepingService = new ClientHousekeepingService(this); // this.broker2Client = new Broker2Client(this); this.subscriptionGroupManager = new SubscriptionGroupManager(this); //NettyClient 初始化 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); // this.filterServerManager = new FilterServerManager(this); // this.slaveSynchronize = new SlaveSynchronize(this); //發送線程池隊列 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); //拉取線程池隊列 this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); //查詢線程池隊列 this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); //broker 狀態管理器 this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); // this.brokerFastFailure = new BrokerFastFailure(this); this.configuration = new Configuration( log, BrokerPathConfigHelper.getBrokerConfigPath(), this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig ); }
初始化brokercotroller
public boolean initialize() throws CloneNotSupportedException { boolean result = this.topicConfigManager.load(); //加載對應管理器的配置文件 result = result && this.consumerOffsetManager.load(); result = result && this.subscriptionGroupManager.load(); result = result && this.consumerFilterManager.load(); if (result) { try { this.messageStore = new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, this.brokerConfig); this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); //load plugin MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); this.messageStore = MessageStoreFactory.build(context, this.messageStore); this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); } catch (IOException e) { result = false; log.error("Failed to initialize", e); } } //加載CommitLog 文件 result = result && this.messageStore.load(); if (result) { //初始化NettyServer this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); //初始化VIP NettyServer 端口為在109011 -2 this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); //初始化一些線程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_")); this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_")); this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_")); this.adminBrokerExecutor = Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( "AdminBrokerThread_")); this.clientManageExecutor = new ThreadPoolExecutor( this.brokerConfig.getClientManageThreadPoolNums(), this.brokerConfig.getClientManageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.clientManagerThreadPoolQueue, new ThreadFactoryImpl("ClientManageThread_")); this.consumerManageExecutor = Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( "ConsumerManageThread_")); //注冊消息處理器,針對客戶端發過來的消息code,會有針對的處理器進行處理 this.registerProcessor(); final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); final long period = 1000 * 60 * 60 * 24; //執行定定時任務 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); //定時 保存consumerOffset.json 文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); //定時保存 consumerfilter.json 文件 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); //打印水印日志 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); // this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); // if (this.brokerConfig.getNamesrvAddr() != null) { this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr()); log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr()); } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); } if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) { this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress()); this.updateMasterHAServerAddrPeriodically = false; } else { this.updateMasterHAServerAddrPeriodically = true; } this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } else { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); } } return result; }
5. 啟動BrokerController
public void start() throws Exception { //刷盤,同步,高可用開啟 if (this.messageStore != null) { this.messageStore.start(); } //10911 NettyServer 端口綁定 ,開始服務 if (this.remotingServer != null) { this.remotingServer.start(); } //VIP 10911 -2 NettyServer 端口綁定 ,開始服務 if (this.fastRemotingServer != null) { this.fastRemotingServer.start(); } //初始化組裝NettyClient bootstrap 信息 if (this.brokerOuterAPI != null) { this.brokerOuterAPI.start(); } // if (this.pullRequestHoldService != null) { this.pullRequestHoldService.start(); } //開啟檢測不活動連接的服務,定時任務,每10s運行一次 if (this.clientHousekeepingService != null) { this.clientHousekeepingService.start(); } // if (this.filterServerManager != null) { this.filterServerManager.start(); } //向nameServer 注冊broker topic 信息 this.registerBrokerAll(true, false); //每30s 向 每個nameServer 注冊broker topic 信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS); // if (this.brokerStatsManager != null) { this.brokerStatsManager.start(); } // if (this.brokerFastFailure != null) { this.brokerFastFailure.start(); } }
向nameServer 注冊broker topic 信息:
Broker 在每經過30s 都會向nameserver 報告注冊自己的topic 信息
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { //1.得到TopicConfigManager 維護的topicConfigTable map 結構信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } //2.發送注冊消息 RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills()); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } }
private RegisterBrokerResult registerBroker( final String namesrvAddr, final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { //1.注冊broker 消息頭 RequestCode.REGISTER_BROKER RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); //2.設置body RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); request.setBody(requestBody.encode()); if (oneway) { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } //3.同步發送 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
TopicConfigManager
topicConfigManager 維護了broker 所有的topic 信息,也是與將 topicConfigTable 定時上報給namesrv ;
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; private transient final Lock lockTopicConfigTable = new ReentrantLock(); //維護了topic 的相關信息 private final ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(1024); private final DataVersion dataVersion = new DataVersion(); private final Set<String> systemTopicList = new HashSet<String>(); private transient BrokerController brokerController;