最近在自己本地搭建rocketMQ,過程中遇到一些問題,現在總結一下,便於以后查看.
首先打開rocket官網:http://rocketmq.apache.org, 點擊latest realease,有source版本和binary版本供下載,點擊quikstart可以看到source版本的構建步驟:
source版本需要先安裝了以下軟件:
- 建議使用64位操作系統,Linux / Unix / Mac;
- 64位JDK 1.8+;
- Maven 3.2.x;
- Git;
- 4g +免費磁盤用於Broker服務器
我系統環境是OS 10.13.6,下載的是binary版本,省去了構建成為二進制文件的步驟.
將rocketmq-all-4.4.0-bin-release.zip文件解壓后得到rocketmq-all-4.4.0-bin-release文件夾,
配置環境變量ROCKETMQ_HOME=xxx/rocketmq-all-4.4.0-bin-release,其中xxx表示你的文件夾父路徑,
例如/Users/david/Downloads/packages/rocketmq-all-4.4.0-bin-release.
再將ROCKETMQ_HOME加入到PATH中如 $ROCKETMQ_HOME/bin:$PATH
打開teiminal,輸入sh mqnamesrv啟動名稱服務器,如下圖:
可以看到”Java HotSpot(TM) 64-Bit Server VM warning“警告信息,不要怕,
它是說CMS垃圾收集器已經過時,在未來的JVM版本中會被其他收集器替代,可以不用管他.
namesrv已經正常啟動了
同樣,再開一個terminal,輸入sh mqbroker 啟動broker.
如果在啟動過程中遇到:
ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
不要慌,這是說你的JAVA_HOME環境變量設置有問題,請檢查環境變量.
如果檢查后發現環境變量沒問題,但還是報以上錯誤,這可能是系統問題吧,
記得OS系統有好幾個文件可以配置環境變量,最常見的是用戶目錄下的.bash_profile,對當前用戶生效,還有一個全局的,在etc目錄下的profile文件
可以再確認一下,如果兩個文件都配置了依然報錯,那只好用最戳的方式解決:
打開mq的安裝目錄,找到bin文件夾下的runserver.sh和runbroker.sh,
在文件中主動加入你的JAVA_HOME,如下圖:
一切順利解決,至此namesrv和broker都已經成功啟動,下面來寫幾個Javademo
首先加入maven依賴包:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
使用GRADLE構建的話加入:
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
使用RocketMQ有以下3種發送消息的方式,分別適用不同場景:
1.同步發送消息:可靠的同步傳輸用於廣泛的場景,如重要的通知消息,短信通知,短信營銷系統等。
2.異步發送消息:異步傳輸通常用於響應時間敏感的業務場景。
3.以單向模式發送消息:單向傳輸用於需要中等可靠性的情況,例如日志收集。
我們選第一種發送方式寫一個demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
通過循環發送100條消息,運行代碼,結果拋出如下錯誤:
該錯誤表示找不到這個topic的路由,關於這個錯誤,網上也有一些對策,無外乎以下幾個原因:
1.broker沒有連上namesrv
2.producer沒有連上namesrv
3.namesrv沒有創建並維護該topic信息
4.netty包版本沖突
5.防火牆問題
前面3條可以通過查找日志確定問題,
1).broker.log里面如果查到register broker to name server localhost:9876 OK 這樣的信息表示broker已經連上namesrv了.
如果沒有,那么你可以重新啟動broker: sh mqbroker -n localhost:9876
2).namesrv.log里面如果能夠查到 new topic registered, TopicTest QueueData這樣的信息,表示你的topic也已經被成功創建.
如果沒有,那么你可以通過mq的admin工具主動生成該topic: sh mqadmin updateTopic -b localhost:10911 -n localhost:9876 -t TopicTest
3).producer有沒有連上namesrv,只需檢查一下代碼中namesrv的值對不對就行了
4).至於4和5那就需要你自己去跟蹤源碼發現問題了.
現在再來運行生產者代碼,成功發送100條消息:
消費者也來寫個demo:
package com.example.demo.common.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
成功消費100條消息:
另外,關於rocketMQ的一些基本概念請參考另一篇: https://www.cnblogs.com/EX-JINDAWEI001/p/11428655.html
下次來看看本地偽集群怎么搭建: https://www.cnblogs.com/EX-JINDAWEI001/p/11433979.html