RocketMQ入門、安裝、詳解、外網配置


version 2.0 【更新於 2020.03.20】

  • 本次更新重點,主從同步!!!(很多小伙伴出現主從不同步的問題)
  • 主從同步關注點(詳見配置說明):
    • 1. brokerRole
    • 2. brokerIP2
    • 3. 重啟程序順序
  • 官方壓縮包由tar.gz改為zip
  • bin目錄下多了一個文件夾,從而grep命令參數多了--exclude-dir
  • 注意 nameserver 和 master 以及 slave 的區別
  • 注意重啟方式,不要使用kill -9
  • nameserver集群、broker(單master/單slave;多master/多slave;單master/多slave)

下載&安裝

下載地址http://rocketmq.apache.org/docs/quick-start/

解壓 tar -zxvf rocketmq-all-4.4.0-bin-release.tar.gz

解壓 unzip rocketmq-all-4.7.0-bin-release.zip【version 2.0】 

進入HOME目錄 cd rocketmq-all-4.4.0-bin-release

開啟端口 9876/9876 10909/10912 驗證 netstat -nltp

修改配置

啟動腳本內存配置

cd bin

grep "Xmx" *

grep 'Xmx' * --exclude-dir="*" 【version 2.0】

grep 'MaxDirectMemorySize' * --exclude-dir="*" 【version 2.0】

vim runbroker.sh runserver.sh vim tools.sh

 JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"

改為 JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=1g"

Broker配置 【master】

vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
# 0 表示master,大於0 表示slave
brokerId = 0
# 服務節點/注冊中心
namesrvAddr=注冊中心IP:9876
# 多服務節點/注冊中心,使用分號【;】
#namesrvAddr=注冊中心IP:9876;注冊中心IP:9876;注冊中心IP:9876
# Broker服務地址
brokerIP1=當前機器IP
# BrokerHAIP地址,供slave同步消息的地址
brokerIP2=當前機器IP
# 刪除時間【時】此處表示凌晨4點
deleteWhen = 04
# 數據存儲時間【時】 此處表示48小時
fileReservedTime = 48
# SYNC_MASTER(同步雙寫) 、ASYNC_MASTER(異步復制) 、SLAVE
brokerRole = SYNC_MASTER
# SYNC_FLUSH(同步刷盤) 和ASYNC_FLUSH(異步刷盤),寫磁盤
flushDiskType = SYNC_FLUSH
# 是否自動創建topic 線上改為false 測試true
autoCreateTopicEnable=true

 

Broker配置 【slave】

vim conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
# 0 表示master,大於0 表示slave
brokerId = 1
# 服務節點/注冊中心
namesrvAddr=注冊中心IP:9876
# 多服務節點/注冊中心,使用分號【;】
#namesrvAddr=注冊中心IP:9876;注冊中心IP:9876;注冊中心IP:9876
# Broker服務地址
brokerIP1=當前機器IP
# BrokerHAIP地址,供slave同步消息的地址
brokerIP2=當前機器IP
# 刪除時間【時】此處表示凌晨4點
deleteWhen = 04
# 數據存儲時間【時】 此處表示48小時
fileReservedTime = 48
# SYNC_MASTER(同步雙寫) 、ASYNC_MASTER(異步復制) 、SLAVE
brokerRole = SLAVE
# SYNC_FLUSH(同步刷盤) 和ASYNC_FLUSH(異步刷盤),寫磁盤
flushDiskType = SYNC_FLUSH
# 是否自動創建topic 線上改為false 測試true
autoCreateTopicEnable=true

 

啟動

  1. 【NamesrvStartup】啟動namesrv nohup bin/mqnamesrv -n 注冊中心IP:9876 > mqnamesrv.log 2>&1 &

  2. 檢查端口監聽是否為0.0.0.0:9876/注冊中心IP:9876 命令 netstat -anpt | grep 9876

  3. 【BrokerStartup】啟動master節點 nohup sh bin/mqbroker -n 注冊中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

  4. 【BrokerStartup】啟動slave節點 nohup sh bin/mqbroker -n 注冊中心IP:9876 -c conf/broker.conf > broker.log 2>&1 &

  5. 查看是否注冊成功(集群信息) bin/mqadmin clusterList -n 注冊中心IP:9876

停止 不要使用kill -9!!!

  1. 停止 slave bin/mqshutdown broker

  2. 停止 master bin/mqshutdown broker

  3. 停止 namesrv bin/mqshutdown namesrv

常用命令

  • cd rocketmq-all-4.4.0-bin-release/
  • 注冊中心機器上

---集群相關

查詢集群信息 bin/mqadmin clusterList -n localhost:9876

打印Broker配置 bin/mqbroker -m -n localhost:9876

更新Broker配置 bin/mqadmin updateBrokerConfig -c DefaultCluster -k autoCreateTopicEnable -v false -n localhost:9876

查看Broker統計信息 bin/mqadmin brokerstatus –n localhost:9876 –b locahost:10909

 

---訂閱組相關

創建訂閱組 bin/mqadmin updateSubGroup -n localhost:9876 -c ClusterName -g GroupName

列出消費組 bin/mqadmin consumerProgress -n localhost:9876

查看消費組IP bin/mqadmin consumerStatus -g GroupName -n localhost:9876

查看消費組數據堆積 bin/mqadmin consumerProgress -n localhost:9876 -g GroupName

刪除訂閱組 bin/mqadmin deleteSubGroup -n localhost:9876 -c ClusterName -g GroupName

 

---Topic相關

創建Topic bin/mqadmin updateTopic -c ClusterName -n localhost:9876 -t TopicName

Topic列表 bin/mqadmin topicList -n localhost:9876

發送Topic消息測試 bin/mqadmin checkMsgSendRT -n localhost:9876 -t TopicName -s 1024

打印Topic消息 bin/mqadmin printMsg -n localhost:9876 -t TopicName

Topic詳情統計 bin/mqadmin topicstatus -n localhost:9876 -t TopicName

獲取Topic的cluster bin/mqadmin topicClusterList -n localhost:9876 -t TopicName

刪除Topic bin/mqadmin deleteTopic -n localhost:9876 -t TopicName -c ClusterName

查看Topic路由 bin/mqadmin topicRoute -n localhost:9876 -t TopicName

查看Topic狀態 bin/mqadmin topicStatus -n localhost:9876 -t TopicName

根據ID查詢消息 bin/mqadmin queryMsgById -i msgId -n localhost:9876

根據偏移量查詢消息 bin/mqadmin queryMsgByOffset -b BrokerName -i 3 -n localhost:9876 -o 299 -t TopicName

 

broker配置說明

基礎配置

配置 描述 默認值 例子
namesrvAddr nameServer地址,如果nameserver是多台集群的話,用分號分割 namesrvAddr=10.1.219.75:9876;10.1.219.76:9876
brokerClusterName 所屬集群名字。Cluster 的地址,如果集群機器數比較多,可以分成多個Cluster ,每個Cluster 供一個業務群使用 brokerClusterName=rocketmq-cluster
brokerName Broker 的名稱, Master 和Slave 通過使用相同的Broker 名稱來表明相互關系,以說明某個Slave 是哪個Master 的Slave   brokerName=broker-a
brokerId 一個Master Barker 可以有多個Slave, 0 表示Master ,大於0 表示不同的 Slave 的ID   brokerId=0
fileReservedTime 在磁盤上保存消息的時長,單位是小時,自動刪除超時的消息   fileReservedTime=48
deleteWhen 與fileReservedTim巳參數呼應,表明在幾點做消息刪除動作,默認值04 表示凌晨4 點   deleteWhen=04
brokerRole brokerRole 有3 種: SYNCMASTER(同步雙寫) 、ASYNCMASTER(異步復制) 、SLAVE 。關鍵詞SYNC 和ASYNC 表示Master 和Slave 之間同步消息的機制, SYNC 的意思是當Slave 和Master 消息同步完成后,再返回發送成功的狀態   brokerRole=SYNC_MASTER
flushDiskType flushDiskType 表示刷盤策略,分為SYNCFLUSH 和ASYNCFLUSH兩種,分別代表同步刷盤和異步刷盤。同步刷盤情況下,消息真正寫人磁盤后再返回成功狀態;異步刷盤情況下,消息寫人page_cache 后就返回成功狀態   flushDiskType=ASYNC_FLUSH
listenPort Broker 監聽的端口號,如果一台機器上啟動了多個Broker , 則要設置不同的端口號,避免沖突   listenPort=10911
storePathRootDir 存儲消息以及一些配置信息的根目錄   storePathRootDir=/app/custom/data/rocketmq/store-a

進階配置

配置 描述 默認值 例子
autoCreateTopicEnable 是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉   autoCreateTopicEnable=true
defaultTopicQueueNums 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數。   defaultTopicQueueNums=4
autoCreateSubscriptionGroup 是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉   autoCreateTopicEnable=true
mapedFileSizeCommitLog commitLog每個文件的大小,默認1G 1G mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue ConsumeQueue每個文件默認存30W條,根據業務情況調整   mapedFileSizeConsumeQueue=300000

存儲配置

配置 描述 默認值 例子
storePathRootDir 存儲消息以及一些配置信息的根目錄   storePathRootDir=/app/custom/data/rocketmq/store-a
storePathCommitLog commitLog 存儲路徑   storePathCommitLog=/data/rocketmq/store/commitlog
storePathConsumeQueue 消費隊列存儲路徑存儲路徑   storePathConsumeQueue=/data/rocketmq/store/consumequeue
storePathIndex 消息索引存儲路徑   storePathIndex=/data/rocketmq/store/index
storeCheckpoint checkpoint 文件存儲路徑   storeCheckpoint=/data/rocketmq/store/checkpoint
abortFile abort 文件存儲路徑   abortFile=/data/rocketmq/store/abort

JAVA示例

  1. pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.5.0</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-tcnative</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
    </dependencies>

 

  2. Producer 生產者

public class Producer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
                DefaultMQProducer("GroupName");
        // Specify name server addresses.
        producer.setNamesrvAddr("IP:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicName" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

 

  3. Consumer 消費者

public class Consumer {
    public static void main(String[] args) {
        String topicName = "TopicName";
        DefaultMQPushConsumer consumer =
                new DefaultMQPushConsumer("GroupName");
        consumer.setNamesrvAddr("IP:9876");
        try {
            consumer.subscribe(topicName, "*");
            /**
             * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>
             * 如果非第一次啟動,那么按照上次消費的位置繼續消費
             */
            consumer.setConsumeFromWhere(
                    ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            /**
             * 如果是順序消息,這邊的監聽就要使用MessageListenerOrderly監聽
             * 並且,返回結果也要使用ConsumeOrderlyStatus
             */
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    //設置自動提交,如果不設置自動提交就算返回SUCCESS,消費者關閉重啟 還是會重復消費的
                    context.setAutoCommit(true);
                    try {
                        for (MessageExt msg : msgs) {
                            String recString = null;
                            try {
                                recString = new String(msg.getBody(), "UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                e.printStackTrace();
                            }
                            System.out.println(recString);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //如果出現異常,消費失敗,掛起消費隊列一會會,稍后繼續消費
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                    //消費成功
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

  4. 打包成jar的插件[可選]

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <!--jdk 版本-->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--全限定名-->
                                    <mainClass>com.package.Consumer</mainClass>
                                </transformer>
                            </transformers>
                            <artifactSet> </artifactSet>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

 個人微信,有什么建議、意見或補充,歡迎及時溝通!!!(添加時注明“博客園”,謝謝)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM