RocketMq Broker 啟動流程


啟動源:  

RocketMq  Broker 啟動 從 rocketmq broker 啟動mqbroker 啟動腳本可以得知,最終運行的是 BrokerStartup  的main 方法,並將腳本參數傳遞。

export ROCKETMQ_HOME
//運行的啟動腳本   $@ 表示附加的所有參數信息傳遞給 BrokerStartup
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@

關於Broker啟動參數,我們不用死記,用的時候可以參考

  org.apache.rocketmq.common.BrokerConfig    -------> broker 配置文件

  org.apache.rocketmq.remoting.netty.NettyServerConfig  ------> netty 服務端配置文件

  org.apache.rocketmq.remoting.netty.NettyClientConfig  ------> netty 客戶端配置文件

  org.apache.rocketmq.store.config.MessageStoreConfig   ------> store 配置文件

apache 命令腳手架:

  命令腳手架即我們啟動broker main 方法時候,會附帶一些參數信息,使用命令腳手架可以讓我們很方便的知道對應的參數信息,我們可以通過 -c 配置文件路徑 ,來初始化配置。

啟動流程:

 /* main 方法啟動
     * @param args
     */
    public static void main(String[] args) {
        //創建brkerController
        start(createBrokerController(args));
    }

1.命令腳手架注冊:

  可以得知broker 啟動追加的參數信息

    -n :  指定broker 的  namesrvAddr 地址

    -h :打印命令

    -c: 指定配置文件的路徑

    -p: 啟動時候日志打印配置信息

    -m:啟動時候日志打印導入的配置信息

 //1.注冊一些 -n -h 命令
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            //2.注冊 -c -p -m 命令
            commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }

2.設置 nettyServerConfig綁定的端口信息

 //設置broker 服務端綁定的端口為10911 
            nettyServerConfig.setListenPort(10911);

3.  組裝配置文件信息:

       //1. -c  后面追加的參數為配置文件路徑
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);

                    properties2SystemEnv(properties);
                    //2.反射將配置文件broker信息注入BrokerConfig 配置類中
                    MixAll.properties2Object(properties, brokerConfig);
                    //3.反射將配置文件NettyServer信息注入NettyServerConfig  配置類中
                    MixAll.properties2Object(properties, nettyServerConfig);
                    //4.反射將配置文件NettyClient信息注入NettyClientConfig 配置類中
                    MixAll.properties2Object(properties, nettyClientConfig);
                    //5.反射將配置文件store 信息注入MessageStoreConfig 配置類中
                    MixAll.properties2Object(properties, messageStoreConfig);

                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }
            //處理其他命令,注入到brokerConfig 中,比如我們的-n 命令

            MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

 4.創建BrokerController:

       //1.創建 BrokerController instance
            final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
            //2.進行初始化 Broker controller
            boolean initResult = controller.initialize();

  創建實例:

public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        //broker 配置信息
        this.brokerConfig = brokerConfig;
        //nettyServer配置信息
        this.nettyServerConfig = nettyServerConfig;
        //nettyClient 配置信息
        this.nettyClientConfig = nettyClientConfig;
        //store 配置信息
        this.messageStoreConfig = messageStoreConfig;
        //consumer 偏移量管理器,會讀取store/config/consumerOffset.json  json 配置文件,維護了offsetTable Map 結構
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        //topic 配置管理器,會讀取store/config/topics.json
        this.topicConfigManager = new TopicConfigManager(this);
        //拉去消息處理器,用來處理消費端消息拉去,關聯的業務code 為RequestCode.PULL_MESSAGE
        this.pullMessageProcessor = new PullMessageProcessor(this);
        //
        this.pullRequestHoldService = new PullRequestHoldService(this);
        //
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
        //
        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        // 消費過濾管理器會讀取store/config/consumerFilter.json
        this.consumerFilterManager = new ConsumerFilterManager(this);
        //生產者管理器
        this.producerManager = new ProducerManager();
        //用於清除不活動的連接,可以看到里面有一些掃描生產者以及消費者不活動連接的方法
        this.clientHousekeepingService = new ClientHousekeepingService(this);
        //
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
        //NettyClient 初始化
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
        //
        this.filterServerManager = new FilterServerManager(this);
        //
        this.slaveSynchronize = new SlaveSynchronize(this);
        //發送線程池隊列
        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
        //拉取線程池隊列
        this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
        //查詢線程池隊列
        this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
        //broker 狀態管理器
        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
        //
        this.brokerFastFailure = new BrokerFastFailure(this);
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }

   初始化brokercotroller

 public boolean initialize() throws CloneNotSupportedException {
        boolean result = this.topicConfigManager.load();
        //加載對應管理器的配置文件
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

        if (result) {
            try {
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
                MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
                this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
            } catch (IOException e) {
                result = false;
                log.error("Failed to initialize", e);
            }
        }
        //加載CommitLog 文件
        result = result && this.messageStore.load();

        if (result) {
            //初始化NettyServer
            this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
            NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
            fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
            //初始化VIP NettyServer 端口為在109011 -2 
            this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
            //初始化一些線程池
            this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getSendMessageThreadPoolNums(),
                this.brokerConfig.getSendMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.sendThreadPoolQueue,
                new ThreadFactoryImpl("SendMessageThread_"));

            this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getPullMessageThreadPoolNums(),
                this.brokerConfig.getPullMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.pullThreadPoolQueue,
                new ThreadFactoryImpl("PullMessageThread_"));

            this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                this.brokerConfig.getQueryMessageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.queryThreadPoolQueue,
                new ThreadFactoryImpl("QueryMessageThread_"));

            this.adminBrokerExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(
                    "AdminBrokerThread_"));

            this.clientManageExecutor = new ThreadPoolExecutor(
                this.brokerConfig.getClientManageThreadPoolNums(),
                this.brokerConfig.getClientManageThreadPoolNums(),
                1000 * 60,
                TimeUnit.MILLISECONDS,
                this.clientManagerThreadPoolQueue,
                new ThreadFactoryImpl("ClientManageThread_"));

            this.consumerManageExecutor =
                Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
                    "ConsumerManageThread_"));
            //注冊消息處理器,針對客戶端發過來的消息code,會有針對的處理器進行處理
            this.registerProcessor();

            final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis();
            final long period = 1000 * 60 * 60 * 24;
            //執行定定時任務
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.getBrokerStats().record();
                    } catch (Throwable e) {
                        log.error("schedule record error.", e);
                    }
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

           //定時 保存consumerOffset.json 文件
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

            //定時保存 consumerfilter.json 文件
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerFilterManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumer filter error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
            //

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.protectBroker();
                    } catch (Throwable e) {
                        log.error("protectBroker error.", e);
                    }
                }
            }, 3, 3, TimeUnit.MINUTES);
            //打印水印日志

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.printWaterMark();
                    } catch (Throwable e) {
                        log.error("printWaterMark error.", e);
                    }
                }
            }, 10, 1, TimeUnit.SECONDS);
            //

            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
                    } catch (Throwable e) {
                        log.error("schedule dispatchBehindBytes error.", e);
                    }
                }
            }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

            //
            if (this.brokerConfig.getNamesrvAddr() != null) {
                this.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());
                log.info("Set user specified name server address: {}", this.brokerConfig.getNamesrvAddr());
            } else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
                        } catch (Throwable e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }

            if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
                if (this.messageStoreConfig.getHaMasterAddress() != null && this.messageStoreConfig.getHaMasterAddress().length() >= 6) {
                    this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());
                    this.updateMasterHAServerAddrPeriodically = false;
                } else {
                    this.updateMasterHAServerAddrPeriodically = true;
                }

                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.slaveSynchronize.syncAll();
                        } catch (Throwable e) {
                            log.error("ScheduledTask syncAll slave exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            } else {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            BrokerController.this.printMasterAndSlaveDiff();
                        } catch (Throwable e) {
                            log.error("schedule printMasterAndSlaveDiff error.", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
            }
        }

        return result;
    }

  5. 啟動BrokerController

 public void start() throws Exception {
        //刷盤,同步,高可用開啟
        if (this.messageStore != null) {
            this.messageStore.start();
        }
        //10911 NettyServer 端口綁定 ,開始服務
        if (this.remotingServer != null) {
            this.remotingServer.start();
        }
        //VIP 10911 -2 NettyServer 端口綁定 ,開始服務
        if (this.fastRemotingServer != null) {
            this.fastRemotingServer.start();
        }
        //初始化組裝NettyClient bootstrap 信息
        if (this.brokerOuterAPI != null) {
            this.brokerOuterAPI.start();
        }
        //

        if (this.pullRequestHoldService != null) {
            this.pullRequestHoldService.start();
        }
        //開啟檢測不活動連接的服務,定時任務,每10s運行一次
        if (this.clientHousekeepingService != null) {
            this.clientHousekeepingService.start();
        }
        //
        if (this.filterServerManager != null) {
            this.filterServerManager.start();
        }
        //向nameServer 注冊broker topic 信息
        this.registerBrokerAll(true, false);
     //每30s 向 每個nameServer 注冊broker topic 信息 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
        //
        if (this.brokerStatsManager != null) {
            this.brokerStatsManager.start();
        }
        //
        if (this.brokerFastFailure != null) {
            this.brokerFastFailure.start();
        }
    }

  向nameServer 注冊broker topic 信息:

  Broker 在每經過30s 都會向nameserver 報告注冊自己的topic 信息

 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);

 

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
        //1.得到TopicConfigManager 維護的topicConfigTable map 結構信息
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }
        //2.發送注冊消息
        RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
            this.filterServerManager.buildNewFilterServerList(),
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills());

        if (registerBrokerResult != null) {
            if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
            }

            this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

            if (checkOrderConfig) {
                this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
            }
        }
    }

 

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        //1.注冊broker 消息頭 RequestCode.REGISTER_BROKER
        RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        //2.設置body
        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        request.setBody(requestBody.encode());

        if (oneway) {
            try {
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }
        //3.同步發送
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

 

TopicConfigManager

  topicConfigManager 維護了broker 所有的topic 信息,也是與將 topicConfigTable 定時上報給namesrv ;

 private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private static final long LOCK_TIMEOUT_MILLIS = 3000;
    private transient final Lock lockTopicConfigTable = new ReentrantLock();
    //維護了topic 的相關信息
    private final ConcurrentMap<String, TopicConfig> topicConfigTable =
        new ConcurrentHashMap<String, TopicConfig>(1024);
    private final DataVersion dataVersion = new DataVersion();
    private final Set<String> systemTopicList = new HashSet<String>();
    private transient BrokerController brokerController;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM