本地啟動 NameServer 和 Broker | 讀 RocketMQ 源碼前的准備工作


  1. clone 並導入源碼
  2. 本地啟動 NameServer
  3. 本地啟動 Broker
  4. 本地運行生產者與消費者代碼

完成上述步驟之后,RocketMQ的源碼環境就搭建完畢了,之后就可以在本地啟動以及收發消息,調試和分析RocketMQ的源碼了。

clone 並導入源碼

在 github 上選擇對應的的代碼 https://github.com/apache/rocketmq/tree/rocketmq-all-4.7.0,將其 clone 下來,再切出 4.7.0 版本的源碼。Clone 到本地之后,用 IDEA 打開項目。

clone代碼

項目結構

目錄結構

模塊 作用
broker Broker 相關代碼
client Producer、Consumer 客戶端代碼,用於生產消息、消費消息
common 公共代碼
dev 開發相關的信息
distribution 部署相關,比如配置文件
example 例子
filter 過濾器
logappender 日志相關
logging 日志相關
namesvr NameServer
openmessaging 開放消息標准
remoting 遠程網絡通信,基於 netty 實現
srvutil 工具類
store 消息如何在 Broker 中進行存儲相關代碼
style 代碼檢查
test 測試
tools 命令行監控

本地啟動 NameServer

接下來我們要做的是在本地啟動 NameServer,包括兩個步驟:

  1. 在 IDEA 中配置啟動相關的信息,NameServer 的啟動類是org.apache.rocketmq.namesrv.NamesrvStartup
  2. 准備好啟動 NameServer 需要的配置文件和目錄

看上圖:

  1. 配置啟動類的名字 NameServerStartup
  2. 配置主類的路徑 org.apache.rocketmq.namesrv.NamesrvStartup
  3. 工作目錄,也就是當前代碼所在的目錄
  4. 運行目錄 ROCKETMQ_HOME,這個目錄里面放的是運行時需要的配置文件、數據、日志等。你需要創建一個目錄,在里面創建 conflogsstore目錄

接着將源碼中 distrbution 模塊中的 logback_namesvr.xml 文件拷貝到上面的 conf 目錄下,並將這個文件中的${user.home}全部替換為前面配置的運行目錄。

然后運行配置好的啟動類,就會讀取 conf 里的配置文件,並將日志打印在logs目錄里,數據都會寫在store目錄里。看到 IDEA 的打印出下面這樣的信息,就說明 NameServer 啟動成功了。

本地運行 Broker

啟動 Broker 和啟動 NameServer 的過程類似。首先也是配置啟動類:

  1. Broker 的啟動類在 org.apache.rocketmq.broker.BrokerStartup
  2. 不一樣的地方是要設置一個參數 -c你的broker.conf配置文件的路徑,因為程序啟動的時候會讀-c這個參數
  3. 接着還是設置工作目錄和運行目錄,選擇 module 為 rocketmq-broker

接着把distrbution模塊中的 broker.conflogback_broker.xml 文件拷貝到 conf目錄下:

  1. 將 logback_broker.xml 的${user.home}替換為你的 RocketMQ 運行目錄
  2. broker.conf 按照下面的配置方式進行配置
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
# nameserver的地址
namesrvAddr=127.0.0.1:9876
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
# 運行目錄的store目錄
storePathRootDir=/Users/shui/Desktop/rocketmq-nameserver/store
# commitLog的存儲路徑
storePathCommitLog=你的store目錄/commitlog
# consume queue文件的存儲路徑
storePathConsumeQueue=你的store目錄/consumequeue
# 消息索引文件的存儲路徑
storePathIndex=你的store目錄/store/index
# checkpoint文件的存儲路徑
storeCheckpoint=你的store目錄/checkpoint
# abort文件的存儲路徑
abortFile=你的store目錄/abort

最后運行主類,看到控制台打印如下信息就表示啟動成功:

此時 rocketmqlogs,里面有一個broker.log,就可以看到Broker的啟動日志了:

本地運行生產者與消費者代碼

在控制台創建一個 topic 名為 TopicTest。如果不知道如何使用 RocketMQ 的控制台,可以看我之前寫這篇文章:https://www.cnblogs.com/shuiyj/p/13200658.html。

接着去修改 example 中給出的生產者和消費者代碼 org.apache.rocketmq.example.quickstart.Consumerorg.apache.rocketmq.example.quickstart.Producer

生產者

改動兩個地方:

  1. 設置 NameServer 地址,讓生產者可以獲取到 Broker 地址
  2. 本來發送 1000 條信息,改少一點發送 3 條,便於消費的時候觀察
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        // 其他代碼不變
      	// 在這里設置 NameServer 地址,保證  Producer 可以從 NameServer 獲取到 Broker 地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        /*
         * Launch the instance.
         */
        producer.start();
	     	
      // 本來是發送 1000 條消息,改成發送 3 條
        for (int i = 0; i < 3; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

看到控制台輸出如下所示的信息,表示消息發送成功了。

SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, offsetMsgId=C0A8010800002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, offsetMsgId=C0A8010800002A9F00000000000000CA, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, offsetMsgId=C0A8010800002A9F0000000000000194, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]

消費者

消息者只改動一個地方,就是設置 NameServer 地址,也是為了獲取到 Broker 的地址。

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 省略其它代碼...

        // 設置 NameServer 地址,保證  Consumer 可以從 NameServer 獲取到 Broker 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

可以看到消費到了 3 條數據,並打印出了消息的相關信息。

00:24:23.571 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=3, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869675, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869676, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F00000000000000CA, commitLogOffset=202, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064336, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AB0001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=0, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869678, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869679, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000194, commitLogOffset=404, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064339, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F13AE0002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [brokerName=broker-a, queueId=2, storeSize=202, queueOffset=0, sysFlag=0, bornTimestamp=1593274869552, bornHost=/192.168.1.8:54010, storeTimestamp=1593274869574, storeHost=/192.168.1.8:10911, msgId=C0A8010800002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1593275064340, UNIQ_KEY=24098A28085A1DB0ECAD4CD655E7AF1548B818B4AAC28B1F132F0000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM