rocketMQ 本地環境搭建


  最近在自己本地搭建rocketMQ,過程中遇到一些問題,現在總結一下,便於以后查看.

  首先打開rocket官網:http://rocketmq.apache.org, 點擊latest realease,有source版本和binary版本供下載,點擊quikstart可以看到source版本的構建步驟:

      source版本需要先安裝了以下軟件:

  1. 建議使用64位操作系統,Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 4g +免費磁盤用於Broker服務器

  

  我系統環境是OS 10.13.6,下載的是binary版本,省去了構建成為二進制文件的步驟.

將rocketmq-all-4.4.0-bin-release.zip文件解壓后得到rocketmq-all-4.4.0-bin-release文件夾,

配置環境變量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夾父路徑,

例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.

再將ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATH

 

打開teiminal,輸入sh mqnamesrv啟動名稱服務器,如下圖:

可以看到Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,

它是說CMS垃圾收集器已經過時,在未來的JVM版本中會被其他收集器替代,可以不用管他.

namesrv已經正常啟動了

同樣,再開一個terminal,輸入sh mqbroker 啟動broker.

如果在啟動過程中遇到:

ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!

 

不要慌,這是說你的JAVA_HOME環境變量設置有問題,請檢查環境變量.

如果檢查后發現環境變量沒問題,但還是報以上錯誤,這可能是系統問題吧,

記得OS系統有好幾個文件可以配置環境變量,最常見的是用戶目錄下的.bash_profile,對當前用戶生效,還有一個全局的,在etc目錄下的profile文件

可以再確認一下,如果兩個文件都配置了依然報錯,那只好用最戳的方式解決:

打開mq的安裝目錄,找到bin文件夾下的runserver.sh和runbroker.sh,

在文件中主動加入你的JAVA_HOME,如下圖:

一切順利解決,至此namesrv和broker都已經成功啟動,下面來寫幾個Javademo

首先加入maven依賴包:

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>

使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'

使用RocketMQ有以下3種發送消息的方式,分別適用不同場景:

1.同步發送消息:可靠的同步傳輸用於廣泛的場景,如重要的通知消息,短信通知,短信營銷系統等。

2.異步發送消息:異步傳輸通常用於響應時間敏感的業務場景。

3.以單向模式發送消息:單向傳輸用於需要中等可靠性的情況,例如日志收集。

我們選第一種發送方式寫一個demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}

通過循環發送100條消息,運行代碼,結果拋出如下錯誤:

該錯誤表示找不到這個topic的路由,關於這個錯誤,網上也有一些對策,無外乎以下幾個原因:

1.broker沒有連上namesrv

2.producer沒有連上namesrv

3.namesrv沒有創建並維護該topic信息

4.netty包版本沖突

5.防火牆問題

前面3條可以通過查找日志確定問題,

  1).broker.log里面如果查到register broker to name server localhost:9876 OK 這樣的信息表示broker已經連上namesrv了.

如果沒有,那么你可以重新啟動broker: sh mqbroker -n localhost:9876

  2).namesrv.log里面如果能夠查到 new topic registered, TopicTest QueueData這樣的信息,表示你的topic也已經被成功創建.

如果沒有,那么你可以通過mq的admin工具主動生成該topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest

  3).producer有沒有連上namesrv,只需檢查一下代碼中namesrv的值對不對就行了

  4).至於4和5那就需要你自己去跟蹤源碼發現問題了.

 

現在再來運行生產者代碼,成功發送100條消息:

 

 消費者也來寫個demo:

package com.example.demo.common.mq.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
public static void main(String[] args) throws MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

});

//Launch the consumer instance.
consumer.start();

System.out.printf("Consumer Started.%n");
}
}

 成功消費100條消息:

 

 另外,關於rocketMQ的一些基本概念請參考另一篇: https://www.cnblogs.com/EX-JINDAWEI001/p/11428655.html

 下次來看看本地偽集群怎么搭建: https://www.cnblogs.com/EX-JINDAWEI001/p/11433979.html

 

 

 




 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM