RocketMQ 4.5.1 單機環境搭建以及生產消費測試


為了學習和方便測試,總是要啟動一個單機版的。下載 http://rocketmq.apache.org/dowloading/releases/

1. 要先配置環境變量

ROCKETMQ_HOME

E:\rocketmq-all-4.5.1-bin-release

2. 然后進入bin目錄啟動NameServer

3. 啟動Broker

啟動

E:\rocketmq-all-4.5.1-bin-release\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

可能會出現一個錯誤: 找不到或無法加載主類 Files\Java\jdk1.8.0_161\lib;C:\Program

 

解決方法:(打開bin目錄的runserver.cmd

修改成

重新啟動,成功

4. 弄個管控台方便查看

https://github.com/apache/rocketmq-externals

下載好進入 rocketmq-console 目錄打包

mvn clean package -Dmaven.test.skip=true

進入target目錄,啟動 (最后的參數的nameserver的地址,也就是我本機地址)

E:\rocketmq-externals-master\rocketmq-console\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876

最后訪問 http://localhost:8080 即可

5. 簡單測試

引入依賴

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

一個簡單的生產者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class Test {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 設置生產者組名
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // 設置NameServer地址
        producer.setNamesrvAddr("10.204.241.15:9876");
        // 啟動
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 創建一條消息,包含topic、tag以及消息內容
            Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 發送結果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 不用的時候關閉
        producer.shutdown();
    }

}

查看管控台

查看詳細

下面一個簡單的消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Test2 {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 設置生產者組名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // 設置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 訂閱的主題
        consumer.subscribe("MyTopic", "*");
        // 注冊消息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 啟動
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

控制台輸出

不要關閉消費者,查看管控台

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM