1. 下載文件
wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
rocketmq-console-ng-2.0.0.jar
上傳文件 解壓 創建文件夾
#存儲RocketMQ數據文件目錄 mkdir /usr/local/rocketmq/store #存儲RocketMQ消息信息 mkdir /usr/local/rocketmq/store/commitlog #存儲消息的隊列數據 mkdir /usr/local/rocketmq/store/consumequeue #存儲消息的索引數據 mkdir /usr/local/rocketmq/store/index #存儲RocketMQ日志目錄 mkdir /usr/local/rocketmq/logs
修改broker.conf文件
/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/conf/broker.conf# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 所屬集群名字 brokerClusterName = rocketmq-cluster # broker名字 brokerName = broker-a # 0 表示 master, > 0 表示slave brokerId = 0 # nameServer地址.多個分號分割,修改為公網ip namesrvAddr=127.0.0.1:9876 # 在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=16 # 是否允許 broker 自動創建topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true # 是否允許 broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true # broker 對外服務的監聽端口 listenPort=10911 brokerIP1=127.0.0.1 # 刪除文件時間點, 默認為凌晨 4點 deleteWhen=04 # 文件保留時間, 默認72小時 fileReservedTime=72 # commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 # ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 # destroyMapedFileIntervalForcibly=120000 # redeleteHangedFileInterval=120000 # 檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 # 存儲路徑 storePathRootDir=/usr/local/rocketmq/store # commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog # 消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue # 消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index # checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint # abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort # 限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 # Broker 的角色 # - ASYNC_MASTER 異步復制Master # - SYNC_MASTER 同步雙寫Master # - SLAVE brokerRole=ASYNC_MASTER # 刷盤方式 # - ASYNC_FLUSH 異步刷盤 # - SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false # 發消息線程池數量 #sendMessageThreadPoolNums=128 # 拉消息線程池數量 #pullMessageThreadPoolNums=128
配置jvm參數
runserver.sh
[root@iZexci5fo076ghZ bin]# pwd
/usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin
[root@iZexci5fo076ghZ bin]# vi runserver.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
mqbroker.sh
#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
3. 啟動nameserver
[root@iZexci5fo076ghZ bin]# pwd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin [root@iZexci5fo076ghZ bin]# sh mqnamesrv &
4. 啟動broker
[root@iZexci5fo076ghZ rocketmq-all-4.7.1-bin-release]# pwd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release [root@iZexci5fo076ghZ rocketmq-all-4.7.1-bin-release]# sh bin/mqbroker -n localhost:9876 -c conf/broker.conf autoCreateTopicEnable=true &
5. 啟動rocketmq console
java -jar rocketmq-console-ng-2.0.0.jar --server.port=8847 --rocketmq.config.namesrvAddr=localhost:9876 --rocketmq.config.isVIPChannel=false --rocketmq.config.dataPath=/usr/local/rocketmq/store &
6. 阿里雲開放8847 端口和9876 端口
7. 訪問rocketmq consle 頁面
http://127.0.0.1:8847/#/
7. 關閉broker 和 nameserver
[root@iZexci5fo076ghZ bin]# jps 14896 NamesrvStartup 1762 jar 32227 nacos-server.jar 21764 Jps 16551 jar 19566 sentinel-dashboard-1.7.2.jar [root@iZexci5fo076ghZ bin]# pwd /usr/local/rocketmq/rocketmq-all-4.7.1-bin-release/bin [root@iZexci5fo076ghZ bin]# sh ./mqshutdown mqnamesrv [root@iZexci5fo076ghZ bin]# sh ./mqshutdown broker
java 客戶端接入
引入maven依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
配置application.yml
rocketmq:
name-server: 47.93.243.221:9876
producer:
send-message-timeout: 300000
group: rocketmq-group
消息生產者
import lombok.SneakyThrows;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/mq")
public class MqController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步消息
*/
@SneakyThrows
@GetMapping("/sync")
public void sync(){
Message message = new Message();
message.setBody("水水水水水水水水水水水水".getBytes("utf-8"));
message.setTopic("sync-topic");
SendResult sendResult = rocketMQTemplate.getProducer().send(message);
System.out.println(sendResult);
sendResult = rocketMQTemplate.syncSend("sync-topic", "水水水水水水水水水水水水");
// 同步消息發送成功會有一個返回值,我們可以用這個返回值進行判斷和獲取一些信息
System.out.println(sendResult);
}
/**
* 異步消息
*/
@SneakyThrows
@GetMapping("/async")
public void async(){
Message message = new Message();
message.setBody("水水水水水水水水水水水水".getBytes());
message.setTopic("async-topic");
rocketMQTemplate.getProducer().send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 成功回調
// System.out.println(sendResult.getMessageQueue());
}
@Override
public void onException(Throwable throwable) {
// 失敗回調
System.out.println(throwable.getMessage());
}
});
rocketMQTemplate.asyncSend("async-topic", "水水水水水水水水水水水水", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 成功回調
// System.out.println(sendResult.getMessageQueue());
}
@Override
public void onException(Throwable throwable) {
// 失敗回調
System.out.println(throwable.getMessage());
}
});
}
/**
* 單向消息
*/
@SneakyThrows
@GetMapping("/sendOneWay")
public void sendOneWay(){
Message message = new Message();
message.setTopic("oneWay-topic");
message.setBody("水水水水水水水水水水水水".getBytes());
rocketMQTemplate.getProducer().sendOneway(message);
rocketMQTemplate.sendOneWay("oneWay-topic","水水水水水水水水水水水水");
}
@GetMapping("/test")
public void test(){
Message message = new Message();
message.setBody("test消息".getBytes());
rocketMQTemplate.sendOneWay("test-topic",message);
}
}
消息消費者
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 異步消息 */ @Service @RocketMQMessageListener(consumerGroup = "my-consumer_asyn-topic", topic = "asyn-topic") public class AsynConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); } }
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 單向消息 */ @Service @RocketMQMessageListener(consumerGroup = "my-consumer_oneWay-topic", topic = "oneWay-topic") public class OneWayConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); } }
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 同步消費 */ @Service @RocketMQMessageListener(consumerGroup = "my-consumer_sync-topic", topic = "sync-topic") public class SyncConsumer implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); System.out.println(new String(body)); } }