RocketMQ 創建和刪除 topic,以及 broker 和 nameserver 之間的心跳


命令行主類:org.apache.rocketmq.tools.command.MQAdminStartup

客戶端創建 topic

程序參數:
updateTopic -n localhost:9876 -c DefaultCluster -t topic-zhang

org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand#execute
創建 topic 步驟:
1. 從 nameserver 獲取當前集群的 master broker

2. 向 master 發送請求,創建 topic

3. broker 利用 registerBroker 同步新建的 topic 到 nameserver
broker 發送請求:org.apache.rocketmq.broker.BrokerController#registerIncrementBrokerData

 

客戶端刪除 topic

程序參數:
deleteTopic -n localhost:9876 -c DefaultCluster -t topic-zhang

org.apache.rocketmq.tools.command.topic.DeleteTopicSubCommand#deleteTopic
刪除 topic 步驟:
1. client 從 nameserver 獲取 broker 列表

2. 向 broker 發送請求,刪除 broker 中的 topic
broker 刪除 topic
org.apache.rocketmq.broker.topic.TopicConfigManager#deleteTopicConfig

3. 向 nameserver 發送請求, 刪除 nameserver 中的 topic

 

同時,broker 和 nameserver 之間有定時的 registerBroker,10s 一次:

org.apache.rocketmq.broker.BrokerController#start

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

 org.apache.rocketmq.namesrv.NamesrvController#initialize

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    }
}, 5, 10, TimeUnit.SECONDS);

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM