rocketmq 官網網址:http://rocketmq.apache.org/docs/quick-start/
准備
linux 服務器
操作系統 CentOS
1.下載zip 到linux系統上(下載二進制包,不要下載資源包)
隨便下載一個鏡像倉庫下載:rocketmq-all-4.7.1-bin-release.zip
2.開始安裝
2.1 rocketmq 是基於JVM運行的,所以要有java 環境 java -version 查看,沒有則需要安裝

2.2 用unzip rocketmq-all-4.7.1-bin-release.zip 解壓壓縮包
2.3 重命名 rename rocketmq-all-4.7.1-bin-release/ rocketmq rocketmq-all-4.7.1-bin-release/
3.啟動
3.1 修改日志位置:rocketmq 默認的日志位置再${user.home} linux 對應的位置在 /root/home 文件下,修改日志位置到rocketmq 下
cd /data/middleware/rocketmq/conf 打開 logback_namesrv.xml
將${user.home} 修改為 /data/middleware/rocketmq
在/data/middleware/rocketmq 創建logs 文件夾

3.1 找到bin目錄下 /data/middleware/rocketmq/bin
sh mqnamesrv 啟動server

啟用為后台運行,並輸入運行日志到namesrv.log 中
nohup sh mqnamesrv > /data/middleware/rocketmq/logs/rocketmqlogs/namesrv.log 2>&1 &

3.2 修改broker 的日志文件地址和啟動broker

啟動報錯,因為mq需要的內存空間不足,需要重新分配內存空間
查看mqbroker 腳本發現最終執行的是runbroker.sh .在其中看到JAVA_OPT 的配置,修改默認配置

紅框中配置的堆棧空間已經大於服務器剩余內存2G,所以設置為1G

再次啟動
nohup sh mqbroker -n localhost:9876 > /data/middleware/rocketmq/logs/rocketmqlogs/broker.log 2>&1 &

4.測試是否可運行
4.1 暴露服務地址 export NAMESRV_ADDR=localhost:9876
4.2 啟動消費者:

4.3 啟動生產者:打開一個新控制台,生產者開始投遞消息

4.4 消費者開始消費

5.本地連接遠程 xxx.xxx.xxx.xxx 表示公網ip
5.1 本地也需要安裝rocketmq ,修改rocketmq 下conf/broker.xml 增加如下配置
brokerIP1=xxx.xxx.xxx.xxx
5.2 服務器安全組開放9876 端口
5.3 windows 本地啟動:
start .\mqnamesrv
start .\mqbroker -n xxx.xxx.xxx.xxx:9876
5.4 測試代碼
消費者
package com.example.demo.rocketmq; import java.util.List; /** * * @description: 消費者 * * @author: coderxiao * * @create: 2020-09-23 20:13 **/ public class RocketConsumer { public static void main(String[] args) { DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xy"); consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876"); try { consumer.subscribe("TopicTest","*"); } catch (MQClientException e) { e.printStackTrace(); } consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("業務數據"+new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } } }
生產者
public class RocketProducer { @Test public void test1() { DefaultMQProducer producer=new DefaultMQProducer("xy"); producer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876"); producer.setCreateTopicKey("TopicTest"); try { producer.start(); for (int i=0;i<10;i++){ try { Message message=new Message("TopicTest","tagA", ("who are you"+i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult=producer.send(message); System.out.println("發送結果:"+sendResult); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }catch (Exception e){e.printStackTrace();} } } catch (MQClientException e) { e.printStackTrace(); }finally { producer.shutdown(); } } }
解決:loseChannel: close the connection to remote address[] result: true 這個問題
本地連接遠程消費
