rocketmq linux 安裝教程 以及本地連接遠程


rocketmq 官網網址:http://rocketmq.apache.org/docs/quick-start/

准備

  linux 服務器

  操作系統 CentOS

  

1.下載zip 到linux系統上(下載二進制包,不要下載資源包)

  隨便下載一個鏡像倉庫下載:rocketmq-all-4.7.1-bin-release.zip

2.開始安裝

  2.1 rocketmq 是基於JVM運行的,所以要有java 環境  java -version 查看,沒有則需要安裝

   

 

   2.2 用unzip rocketmq-all-4.7.1-bin-release.zip 解壓壓縮包

  2.3  重命名  rename rocketmq-all-4.7.1-bin-release/  rocketmq rocketmq-all-4.7.1-bin-release/

3.啟動

  3.1 修改日志位置:rocketmq 默認的日志位置再${user.home}  linux 對應的位置在 /root/home 文件下,修改日志位置到rocketmq 下

    cd /data/middleware/rocketmq/conf  打開 logback_namesrv.xml 

    將${user.home} 修改為 /data/middleware/rocketmq

    在/data/middleware/rocketmq 創建logs 文件夾

  

 

 

 

 3.1 找到bin目錄下 /data/middleware/rocketmq/bin 

    sh mqnamesrv 啟動server 

 

 

  啟用為后台運行,並輸入運行日志到namesrv.log 中

  nohup sh mqnamesrv  > /data/middleware/rocketmq/logs/rocketmqlogs/namesrv.log 2>&1 &

 

 3.2 修改broker 的日志文件地址和啟動broker

  

 

 

   啟動報錯,因為mq需要的內存空間不足,需要重新分配內存空間

  查看mqbroker 腳本發現最終執行的是runbroker.sh .在其中看到JAVA_OPT 的配置,修改默認配置

  

 

 紅框中配置的堆棧空間已經大於服務器剩余內存2G,所以設置為1G

 

 再次啟動

  nohup sh  mqbroker -n localhost:9876 > /data/middleware/rocketmq/logs/rocketmqlogs/broker.log 2>&1 &

 

 4.測試是否可運行

  4.1 暴露服務地址  export NAMESRV_ADDR=localhost:9876

  4.2 啟動消費者:

  

 

  4.3 啟動生產者:打開一個新控制台,生產者開始投遞消息

  

 

   4.4 消費者開始消費

  

 

 5.本地連接遠程  xxx.xxx.xxx.xxx 表示公網ip

 

  5.1 本地也需要安裝rocketmq ,修改rocketmq 下conf/broker.xml 增加如下配置

    brokerIP1=xxx.xxx.xxx.xxx 

  5.2 服務器安全組開放9876 端口

  5.3 windows 本地啟動:

     start .\mqnamesrv

    start .\mqbroker -n xxx.xxx.xxx.xxx:9876

  5.4 測試代碼

    消費者

  

package com.example.demo.rocketmq;
import java.util.List;

/**
*
* @description: 消費者
*
* @author: coderxiao
*
* @create: 2020-09-23 20:13
**/
public class RocketConsumer {

    public static void main(String[] args) {
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("xy");
        consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
        try {
            consumer.subscribe("TopicTest","*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("業務數據"+new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

}

 

  生產者

  

public class RocketProducer {

    @Test
    public  void test1() {
        DefaultMQProducer producer=new DefaultMQProducer("xy");
        producer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
        producer.setCreateTopicKey("TopicTest");
        try {
            producer.start();
            for (int i=0;i<10;i++){
                try {
                    Message message=new Message("TopicTest","tagA", ("who are you"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult=producer.send(message);
                    System.out.println("發送結果:"+sendResult);
                } catch (RemotingException e) {
                    e.printStackTrace();
                } catch (MQBrokerException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch (Exception e){e.printStackTrace();}
            }

        } catch (MQClientException e) {
            e.printStackTrace();
        }finally {
            producer.shutdown();
        }
    }
}

解決:loseChannel: close the connection to remote address[] result: true 這個問題

本地連接遠程消費

 

  

    

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM