一、Windows環境搭建RocketMQ
1. 下載RocketMQ Binary壓縮包,並解壓縮,我的安裝目錄為E:\programs\rocketmq\rocketmq-all-4.5.1
2. 配置環境變量:ROCKETMQ_HOME,其值為RocektMQ的安裝目錄
3. 啟動NameServer
在命令行中進入RocketMQ安裝目錄下的bin目錄,執行start mqnamesrv.cmd,執行完成后會彈出一個新窗口,不要關閉該窗口
4. 啟動Broker
在命令行中進入RocketMQ安裝目錄下的bin目錄,執行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,執行完成后會彈出一個新窗口,不要關閉該窗口
首次執行中遇到了如下問題:錯誤: 找不到或無法加載主類 Files\Java\jdk1.8.0_144\lib\dt.jar;C:\Program
看錯誤提示應該是CLASSPATH變量中有空格,導致讀取失敗
可以通過修改bin目錄下的runbroker.cmd文件,找到其中的倒數第二行: set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%",在%CLASSPATH%兩邊加雙引號,引起來,這樣就可以識別CLASSPATH變量中的空格。再次執行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,成功。
二、ubuntu環境搭建RocketMQ
1. 下載RocketMQ的源碼,並解壓縮
unzip rocketmq-all-4.5.1-source-release.zip
2. 在解壓縮的目錄中,使用maven構建項目,構建完成后,進入rocketmq目錄
cd rocketmq-all-4.4.0/ mvn -Prelease-all -DskipTests clean install -U cd distribution/target/apache-rocketmq
3. 視情況修改runbroker.sh和runserver.sh文件中JVM參數,默認參數設置的內存比較大,個人電腦運行不起來,需要調小
runbroker.sh中
原配置:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
修改為:JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m"
runserver.sh中
原配置:JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改為:JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
4. 啟動nameserver:
在構建完成的rocketmq目錄下,執行如下語句,啟動nameserver
nohup sh bin/mqnamesrv &
查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
5. 啟動broker
啟動broker語句和查看日志語句
nohup sh bin/mqbroker -n localhost:9876 & tail -f ~/logs/rocketmqlogs/broker.log
6. 關閉broker和server語句
sh bin/mqshutdown broker sh bin/mqshutdown namesrv
三、示例
生產者
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws Exception { //創建一個消息生產者,並設置一個消息生產者組 DefaultMQProducer producer = new DefaultMQProducer("zs_producer_group"); //指定NameServer地址 producer.setNamesrvAddr("localhost:9876"); //初始化Producer,在整個應用生命周期中只需要初始化一次 producer.start(); for(int i = 0;i<100;i++) { //創建一個消息對象,指定其主題、標簽和消息內容 Message msg = new Message( "topic_example_java" /*消息主題名*/, "TagA" /*消息標簽*/, ("Hello Java demo RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /*消息內容*/ ); //發送消息並返回結果 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //一旦生產者實例不再被使用,則將其關閉,包括清理資源、關閉網絡連接等 producer.shutdown(); } }
消費者
import java.util.Date; import java.util.List; import java.io.UnsupportedEncodingException; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws Exception { //創建一個消息消費者,並設置一個消息消費者組 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("zs_consumer_group"); //指定NameServer地址 consumer.setNamesrvAddr("localhost:9876"); //設置Consumer第一次啟動時是從隊列頭部還是隊列尾部開始消費的 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //訂閱指定Topic下的所有消息 consumer.subscribe("topic_example_java","*"); //注冊消息監聽器 consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) ->{ //默認list里只有一條消息,可以通過設置參數來批量接收消息 if(list!= null) { for(MessageExt ext: list) { try { System.out.println(new Date()+ new String(ext.getBody(),"UTF-8")); }catch(UnsupportedEncodingException e) { e.printStackTrace(); } } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //消息者對象在使用之前必須要調用start方法初始化 consumer.start(); System.out.println("消息消費者已啟動"); } }
運行結果:
消費者
生產者