本博客以當前RocketMQ最新版介紹:v4.4.0
環境要求
- 64位JDK 1.8+;
- Maven 3.2.x; // 源碼編譯時需要用到
二進制文件安裝
- 下載二進制文件:http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip
- 二進制文件放到任意目錄(由於是純凈的ubuntu鏡像,docker環境,因此我放在/root/xxx目錄下)
- 解壓zip包,並重命名文件名
> unzip rocketmq-all-4.4.0-bin-release.zip && mv rocketmq-all-4.4.0-bin-release rocketmq
- 啟動server
> cd /root/rocketmq
> nohup sh bin/mqnamesrv & // 第一次安裝時,可執行sh bin/mqnamesrv觀察是否能夠啟動
> tailf -f ~/logs/rocketmqlogs/namesrv.log
// 觀察到以下日志時,server已啟動成功
2019-09-07 18:06:13 INFO main - The Name Server boot success. serializeType=JSON
- 啟動broker
> nohup sh bin/mqbroker -n localhost:9876
> tailf -f ~/logs/rocketmqlogs/broker.log
// 觀察到以下日志時,server已啟動成功
2019-09-07 20:40:06 INFO main - The broker[0daf9bd41237, 172.17.0.2:10911] boot success. serializeType=JSON and name server is 172.17.0.2:9876
注:broker啟動如果過一會直接退出,無任何日志或報錯的話,檢查一下機子的內存是否充足。RocketMQ的broker默認內存為8g。
修改文件:/root/rocketmq/bin/runbroker.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
- 測試生產者和消費者
// 在測試之前,我們需要先設置環境變量:export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
Producer的源碼
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 默認消費組
DefaultMQProducer producer = new DefaultMQProducer("default");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 消息發送
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
// 關閉生產者
producer.shutdown();
}
}
Consumer的源碼
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 指定消費組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default");
// 設置消費偏移點
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 訂閱topic,以及tag
consumer.subscribe("TopicTest", "*");
// 注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 收到數據后,返回ack確認
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
- 關閉server和broker
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK