RocketMQ4.5.1環境搭建及示例


一、Windows環境搭建RocketMQ

1. 下載RocketMQ Binary壓縮包,並解壓縮,我的安裝目錄為E:\programs\rocketmq\rocketmq-all-4.5.1

2. 配置環境變量:ROCKETMQ_HOME,其值為RocektMQ的安裝目錄

3. 啟動NameServer

在命令行中進入RocketMQ安裝目錄下的bin目錄,執行start mqnamesrv.cmd,執行完成后會彈出一個新窗口,不要關閉該窗口

4. 啟動Broker

在命令行中進入RocketMQ安裝目錄下的bin目錄,執行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,執行完成后會彈出一個新窗口,不要關閉該窗口

首次執行中遇到了如下問題:錯誤: 找不到或無法加載主類 Files\Java\jdk1.8.0_144\lib\dt.jar;C:\Program

看錯誤提示應該是CLASSPATH變量中有空格,導致讀取失敗

可以通過修改bin目錄下的runbroker.cmd文件,找到其中的倒數第二行: set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%",在%CLASSPATH%兩邊加雙引號,引起來,這樣就可以識別CLASSPATH變量中的空格。再次執行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,成功。

 

二、ubuntu環境搭建RocketMQ
1. 下載RocketMQ的源碼,並解壓縮

unzip rocketmq-all-4.5.1-source-release.zip

2. 在解壓縮的目錄中,使用maven構建項目,構建完成后,進入rocketmq目錄

cd rocketmq-all-4.4.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq

3. 視情況修改runbroker.sh和runserver.sh文件中JVM參數,默認參數設置的內存比較大,個人電腦運行不起來,需要調小

runbroker.sh中

原配置:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"

修改為:JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"

runserver.sh中

原配置:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

修改為:JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

 

4. 啟動nameserver:

在構建完成的rocketmq目錄下,執行如下語句,啟動nameserver

nohup sh bin/mqnamesrv &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

5. 啟動broker

啟動broker語句和查看日志語句

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

6. 關閉broker和server語句

sh bin/mqshutdown broker

sh bin/mqshutdown namesrv

 

三、示例

 生產者

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws Exception {
        //創建一個消息生產者,並設置一個消息生產者組
        DefaultMQProducer producer = new DefaultMQProducer("zs_producer_group");
        //指定NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //初始化Producer,在整個應用生命周期中只需要初始化一次
        producer.start();
        for(int i = 0;i<100;i++) {
            //創建一個消息對象,指定其主題、標簽和消息內容
            Message msg = new Message(
                    "topic_example_java"  /*消息主題名*/,
                    "TagA"    /*消息標簽*/,
                    ("Hello Java demo RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息內容*/
                    );
            //發送消息並返回結果
            SendResult sendResult = producer.send(msg);
            
            System.out.printf("%s%n", sendResult);
        }
        //一旦生產者實例不再被使用,則將其關閉,包括清理資源、關閉網絡連接等
        producer.shutdown();    
    }
}

消費者

import java.util.Date;
import java.util.List;
import java.io.UnsupportedEncodingException;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //創建一個消息消費者,並設置一個消息消費者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zs_consumer_group");
        //指定NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        //設置Consumer第一次啟動時是從隊列頭部還是隊列尾部開始消費的
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱指定Topic下的所有消息
        consumer.subscribe("topic_example_java","*");
        //注冊消息監聽器
        consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) ->{
            //默認list里只有一條消息,可以通過設置參數來批量接收消息
            if(list!= null) {
                for(MessageExt ext: list) {
                    try {
                        System.out.println(new Date()+ new String(ext.getBody(),"UTF-8"));
                    }catch(UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        //消息者對象在使用之前必須要調用start方法初始化
        consumer.start();
        System.out.println("消息消費者已啟動");
    }
}

運行結果:

消費者

生產者

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM