# RocketMQ4.4入門進階+實戰
## 第一章 RocketMQ
### 1.1 MQ介紹
MQ全稱Message Queue,消息隊列(MQ)是一種應用程序對應用程序的通信方式,應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們,消息傳遞指的是程序之間通過在消息中發送數據進行通信。對立誒的使用除去了接收和發送應用程序同時執行的要求。
### 1.2 主流MQ對比
目前市面上成熟主流的MQ有Kafka、RocketMQ、RabbitMQ,我們這里對每款MQ做一個簡單介紹。
Kafka
```tex
Apache下的一個子項目,使用scala實現的一個高性能分布式Publish/Subscribe消息系統。
1、快速持久化,通過磁盤順序讀寫與零拷貝機制,可以在0(1)的系統開銷下進行消息持久化;
2、高吞吐,在一台普通的服務器上即可以達到10w/s的吞吐量;
3、高堆積,支持topic下消費者較長時間離線,消息堆積量大;
4、完全的分布式系統,Broker、Producer、Consumer都原生支持分布式、依賴zookeeper自動實現負載均衡;
5、支持Hadoop數據並行加載,對應像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。
```
RocketMQ
```tex
RocketMQ的前身是Metaq,當Metag3.0發布時,產品名稱改為RocketMQ,RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1、能夠保證嚴格的消息順序
2、提供豐富的消息拉取模式
3、高效的訂閱者水平擴展能力
4、實時的消息訂閱機制
5、支持事務消息
6、億級消息堆積能力
```
RabbitMQ
```tcl
使用Erlang編寫的一個開源消息隊列,本身支持此很多的協議:AMQP、XMPP、SMTP、STOMP,也正是如此,使的它變得非常重量級,更適合於企業級的開發。
```
特性對比
### 1.3 RocktMQ環境要求
RocketMQ對環境有要求,如下:
```tex
64Bit OS, Linux/Unix/Mac is recommended;
64Bit JDK 1.8+;
Maven 3.2.x;
Git;
4g+ free disk for Broker server
```
### 1.4 RocketMQ下載
RocketMQ目前已經捐獻給了Apache,官方網址為http://rocketmq.apache.org/目前最新版本已經是4.6.1版本, 我們可以點擊Latest releaseV4.6.1進入下載頁。


### 1.5 單節點RocketMQ安裝
#### 1.5.1 環境准備
我們先准備一台centos虛擬機,ip:192.168.211.143,在hosts文件中配置地址與IP的映射關系。
| IP | hostname | mastername |
| --------------- | -------------------- | ---------------- |
| 192.168.211.143 | Rocketmq-nameserver1 | Rocketmq-master1 |
修改/etc/hosts文件,加入如下映射關系
```tex
192.168.211.143 rocketmq-nameserver1
192.168.211.143 rocketmq-master1
```
#### 1.5.2 安裝配置
我們可以把安裝文件上傳到虛擬機上,並解壓安裝
解壓文件存放到/usr/local/server/mq目錄下
```shell
unzip rocketmq-all-4.6.1-bin-release.zip -d /usr/local/server/mq
```
更新解壓后的文件名
```shell
mv rocketmq-all-4.6.1-bin-release rocketmq
```
創建RocketMQ存儲文件的目錄,執行如下命令
```shell
[root@localhost rocketmq]# mkdir logs
[root@localhost rocketmq]# mkdir store
[root@localhost rocketmq]# cd store
[root@localhost store]# mkdir commitlog
[root@localhost store]# mkdir consumerqueue
[root@localhost store]# mkdir index
```
文件夾說明
```shell
logs:存儲RocketMQ日志記錄
store:存儲RocketMQ數據文件目錄
commitlog:存儲RocketMQ消息信息
consumerqueue、index:存儲消息的索引數據
```
conf目錄配置文件說明
```shell
2m-2s-async:2主2從異步
2m-2s-sync :2主2從同步
2m-noslave :2主沒有從
```
我們這里先配置簡單節點,可以修改2m-2s-async的配置實現。
進入2m-2s-async目錄,修改第一個配置文件broker-a.properties
```shell
vi broker-a.properties
```
將如下配置覆蓋掉broker-a.properties所有配置
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
進入conf目錄,替換所有xml中的${user.name},保證日志路徑正確
```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```
注意:sed -i 在這里起一個批量替換的作用
```shell
sed -i 's#原字符串#新字符#g' 替換的文件
```
RocketMQ對內存要求比較高,最少1G,如果內存太少,會影響RocketMQ的運行效率和執行性能,我們需要修改bin目錄小的runbroker.sh和runserver.sh文件
runbroker.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```
runserver.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```
先啟動namesrv
```shell
nohup sh mqnamesrv &
```
再啟動broker
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
```
輸入jps查看進程
```shell
[root@localhost rocketmqlogs]# jps
3225 NamesrvStartup
3290 BrokerStartup
3454 Jps
```
#### 1.5.3 RocketMQ控制台安裝
我們這里直接將RocketMQ控制代碼的源碼放到項目中運行,后面課程結束后我們直接運行jar包即可。RocketMQ的控制台由一些RocketMQ愛好者提供。
下載地址:https://github.com/apache/rocketmq-externals/tree/master
### 1.6 RocketMQ架構介紹

上面是RocketMQ的部署結構圖,RocketMQ 網絡部署特點:
```shell
1、Name Server 是一個幾乎無狀態節點,可集群部署,節點乀間無任何信息同步。
2、Broker部署相對復雜,Broker分為 Master與Slave,一個 Master可以對應多個Slave,但是一個Slave 只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave,Master也可以部署多個,每個Broker與Name Server 集群中的所有節點建立長連接,定時注冊 Topic信息到所有Name Server。
3、Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
4、Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的 Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
```
## 第二章 RocketMQ快速入門
### 2.1 消息生產和消費介紹
使用RocketMQ可以發送普通消息、順序消息、事務消息,順序消息能實現有序消費,事務消息可以解決分布式事務實現數據最終一致性。
RocketMQ有2中常見的消費模式,分別是DefaultMQPushConsumer和DefaultMQPullConsumer模式,這2種模式字面理解一個是推送消息,一個是拉取消息,這里有個誤區,其實無論是Push還是Pull,其本質都是拉取消息,只是實現機制不一樣。
DefaultMQPushConsumer其實並不是broker主動向consumer推送消息,而是consumer向broker發出請求,保持了一種長連接,broker會每5秒檢測一次是否有消息,如果有消息,則將消息推送給consumer。使用DefaultMQPushConsumer實現消費,broker會主動記錄消息消費的偏移量。
DefaultMQPullConsumer是消費方主動去broker拉取數據,一般會在本地使用定時任務實現,使用它獲得消息狀態方便、負載均衡性能可控,但消息的及時性差,而且需要手動記錄消息消費的偏移量信息,所以在實際應用場景中推薦使用Push方式。
RocketMQ發送的消息默認會存儲到4個隊列中去,當然創建幾個隊列存儲數據,可以自己定義。

RocketMQ作為MQ消息中間件,ack機制必不可少,在RocketMQ中常見的應答狀態如下:
```java
LocalTransactionState:主要針對事務消息的應答狀態
public enum LocalTransactionState {
// 消息提交
COMMIT_MESSAGE,
// 消息回滾
ROLLBACK_MESSAGE,
// 未知狀態,一般用於處理超時等現象
UNKNOWN;
}
```
```java
ConsumeConcurrentlyStatus:主要針對消息消費的應答狀態
public enum ConsumeConcurrentlyStatus {
// 消息消費成功
CONSUME_SUCCESS,
// 消息重試,一般消息消費失敗后,RocketMQ為了保障數據的可靠性,具有重試機制
RECONSUME_LATER;
}
```
重發時間是:(broker.log中有)
```xml
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
```
### 2.2 RocketMQ普通消息生產者
#### 2.2.1 工程創建
我們實現一個最基本的消息發送,先創建一個springboot工程,工程名叫rocketmq-demo1
pom.xml
```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.leo</groupId>
<artifactId>study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>study</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<rocketmq.version>4.6.1</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- rocket mq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
```
#### 2.2.2 消息發送
消息發送有這么幾個步驟:
```java
// 1、創建DefaultMQProducer
// 2、設置Namesrv地址
// 3、開啟DefaultMQProducer
// 4、創建消息Message
// 5、發送消息
// 6、關閉DefaultMQProducer
```
我們創建一個Producer類,按照上面步驟實現消息發送,代碼如下:
```java
public class Producer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");
// 2、設置Namesrv地址
producer.setNamesrvAddr("");
// 3、開啟DefaultMQProducer
producer.start();
// 4、創建消息Message
Message message = new Message("Topic_Demo", "Tags", "Keys_1", "Hello RocketMQ".getBytes());
// 5、發送消息
SendResult result = producer.send(message);
System.out.println(result);
// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```
### 2.3 RocketMQ普通消息消費者
#### 2.3.1消息消費
消費者消費消息有這么幾個步驟:
```java
// 1、創建DefaultMQPushConsumer
// 2、設置namesrv地址
// 3、設置消息拉取最大數
// 4、設置subscribe,這里是要讀取訂單主題信息
// 5、創建消息監聽MessageListener
// 6、獲取消息信息
// 7、返回消息讀取狀態
```
創建Consumer類,按照上面步驟實現消息消費,代碼如下:
```java
public class Consumer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Demo",
"*");
// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 7、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
### 2.4 RocketMQ順序消息
消息有序指的是可以按照消息的發送順序來消費。RocketMQ可以嚴格的保證消息順序,但這個順序,不是全局順序,只是分區(queue)順序,要全局順序只能一個分區。
如何保證順序
```java
在MQ模型中,順序需要由3個階段去保證:
1、消息被發送時保證順序
2、消息被存儲時保持和發送的順序一致
3、消息被消費時保持和存儲的順序一致
```
發送時保持順序意味着對於有順序要求的消息,用戶應該在同一線程中采用同步的方式發送。存儲保持和順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。
創建一個OrderProducer類,按照步驟實現消息發送,代碼如下:
```java
public class OrderProducer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");
// 2、設置Namesrv地址
producer.setNamesrvAddr("");
// 3、開啟DefaultMQProducer
producer.start();
// 5、發送消息
// 第一個參數:發送的消息信息
// 第二個參數:選擇指定的消息隊列對象(會將所有的消息隊列傳進來)
// 第三個參數:指定對應的隊列下表
// 連續發送5條消息
for (int i = 0; i < 5; i++) {
// 4、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
Message message = new Message(
"Topic_Order_Demo",
"Tags", "Keys_" + i,
("Hello RocketMQ_" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult result = producer.send(
message,
(List<MessageQueue> list, Message msg, Object arg) -> {
// 獲取隊列的下標
Integer index = (Integer) arg;
return list.get(index);
},
1
);
System.out.println(result);
}
// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```
創建一個OrderConsumer類,按照步驟實現消息消費,代碼如下:
```java
public class OrderConsumer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Order_Demo",
"*");
// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Order Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
### 2.5 RocketMQ事務消息
在RocketMQ4.3.0版本后,開放了事務消息這一特性,對於分布式事務而言,最常說的還是兩階段提交協議。
#### 2.5.1 RocketMQ事務消息流程
RocketMQ的事務消息,主要是通過消息的異步處理,可以保證本地事務和消息發送同時成功執行或失敗,從而保證數據的最終一致性,這里我們先看看一條事務消息從誕生到結束的整個時間線流程:

#### 2.5.2 事務消息生產者
我們創建一個事務消息生產者TransactionProducer,事務消息發送消息對象是TransactionMQProducer,為了實現本地事務操作和回查,我們需要創建一個監聽器,監聽器需要實現TransactionListener接口,實現步驟如下:
```java
// 1、創建TransactionMQProducer
// 2、設置Namesrv地址
// 3、指定消息監聽對象,用於執行本地事務和消息回查
// 4、指定線程池
// 5、開啟TransactionMQProducer
// 6、創建消息Message
// 7、發送事務消息
// 8、關閉TransactionMQProducer
```
TransactionProducer代碼如下:
```java
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 1、創建TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("demo_producer_transaction_group");
// 2、設置Namesrv地址
producer.setNamesrvAddr("");
// 3、指定消息監聽對象,用於執行本地事務和消息回查
producer.setTransactionListener(new TransactionListenerImpl());
// 4、指定線程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
);
producer.setExecutorService(executorService);
// 5、開啟TransactionMQProducer
producer.start();
// 6、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
Message message = new Message(
"Topic_Transaction_Demo",
"Tags",
"Keys_T",
"Hello Transaction RocketMQ Message".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 7、發送事務消息
// 第一個參數:發送的消息信息
// 第二個參數:選擇指定的消息隊列對象(會將所有的消息隊列傳進來)
TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
System.out.println(result);
// 8、關閉TransactionMQProducer
producer.shutdown();
}
}
```
監聽器代碼如下:
```java
public class TransactionListenerImpl implements TransactionListener {
/**
* 存儲對應事務的狀態信息,key:事務ID,value:當前事務執行的狀態
*/
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 執行本地事務
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
// 事務ID
String transactionId = message.getTransactionId();
// 0:執行中,狀態未知,1:本地事務執行成功,2:本地事務執行失敗
localTrans.put(transactionId, 0);
// 業務執行,處理本地事務
System.out.println("Hello-----Transaction");
try {
System.out.println("正在執行本地事務");
Thread.sleep(2000);
System.out.println("本地事務執行成功");
localTrans.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
localTrans.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 消息回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
// 獲取事務ID
String transactionId = messageExt.getTransactionId();
// 獲取對應事務ID的執行狀態
Integer status = localTrans.get(transactionId);
System.out.println("消息回查--TransactionId:" + transactionId + ", 狀態:" + status);
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
```
#### 2.5.3 事務消息消費者
事務消息的消費者和普通消費者一樣,代碼如下:
```java
public class TransactionConsumer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_transaction_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 設置消息順序
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Transaction_Demo",
"*");
// 5、創建消息監聽MessageListener
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg: list) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic: " + topic + ", tags: " + tags + ", keys: " + keys + ", body: " + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
#### 2.5.4 RocketMQ實現分布式事務流程
MQ事務消息解決分布式事務問題,但第三方MQ支持事務消息的中間件不多,比如RocketMQ,它支持事務消息的方式也是類似於二階段提交,但是市面上一些主流的MQ都是不支持事務消息的,比如RabbitMQ和Kafka都不支持。
以阿里的RocketMQ中間件為例,其思路大致為:
第一階段Prepared消息,會拿到消息的地址。
第二階段執行本地事務。
第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。
也就是說在業務方法內要向消息隊列提交兩次請求,一發送消息和一次確認消息。如果確認消息發送失敗了,RocketMQ會定期掃描消息集群中的事務消息,這時候發現了Prepared消息,它會向消息發送者確認,所以生產方需要實現一個check接口,RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

### 2.6 消息廣播/批量發送
上面發送消息,我們測試的時候,可以發現消息只有一個消費者能收到,如果我們想實現消息廣播,讓每個消費者都能收到消息也是可以實現的。而且上面發消息的時候,每次都是發送單挑Message對象,能否批量發送呢?答案是可以的。
#### 2.6.1 消息生產者
創建消息生產者BroadcastingProducer,代碼如下:
```java
public class BroadcastingProducer {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_broadcasting_group");
// 2、設置Namesrv地址
producer.setNamesrvAddr("");
// 3、開啟DefaultMQProducer
producer.start();
// 4、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message(
"Topic_broadcasting_Demo",
"Tags",
"Keys_" + i,
("Hello RocketMQ_" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
messages.add(message);
}
// 5、發送消息
SendResult result = producer.send(messages);
System.out.println(result);
// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```
#### 2.6.2 消息消費者
BroadcastingConsumerA代碼如下:
```java
public class BroadcastingConsumerA {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_broadcasting_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、默認是集群模式,改成廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 4、、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 5、、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_broadcasting_Demo",
"*");
// 6、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 7、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer_A消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 8、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
BroadcastingConsumerB代碼如下:
```java
public class BroadcastingConsumerB {
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_broadcasting_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、默認是集群模式,改成廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 4、、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 5、、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_broadcasting_Demo",
"*");
// 6、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 7、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer_B消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 8、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
##第三章 RocketMQ集群
### 3.1 RocketMQ集群模式
#### 3.1.1 單個Master
這是一種風險比較大的方式(無集群),因為一旦Broker重啟或者宕機期間,將會導致整個服務不可用,因此是不建議線上環境去使用的。
#### 3.1.2 多個Master
一個集群全部都是Master,沒有Slave,它的有點和缺點如下:
優點:配置簡單,單個Master宕機或者重啟維護對應用沒有什么影響,在磁盤配置為RAID10時,即使機器宕機不可恢復的情況下,消息也不會丟失(異步刷盤會丟失少量消息,同步刷盤則是一條都不會丟失),性能最高。
缺點:當單個Broker宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息的實時性會收到影響。
#### 3.1.3 多Master多Slave模式-異步復制
每個Master配置一個Slave,有多對的Master-Slave,HA采用的是異步復制方式,主備有短暫的消息延遲,毫秒級別的(Master收到消息之后立刻向應用返回成功標識,同時向Slave寫入消息),優缺點如下:
優點:即使是磁盤損壞了,消息丟失的非常少,且消息實時性不受影響,因為Master宕機之后,消費者仍可以從Slave消費,此過程對應用透明,不需要人工干預,性能同多個Master模式幾乎一樣。
缺點:Master宕機,磁盤損壞的情況下,會丟失少量的消息。
#### 3.1.4 多Master多Slave模式-同步雙寫
每個Master配置一個Slave,有多對的Master-Slave,HA采用的是同步雙寫模式,主備都寫成功,才會向應用返回成功。
優點:數據和服務都無單點,Master宕機的情況下,消息無延遲,服務可用性與數據可用性都非常高。
缺點:性能比異步復制模式略低,大約低10%左右,發送單個Master的RT會略高,目前主機宕機后,Slave不同自動切換為主機,后續會支持自動切換功能。
### 3.2 RocketMQ主從搭建
#### 3.2.1 環境准備
我們先准備2台centos虛擬機,ip:192.168.211.142/192.168.211.143,在hosts文件中配置地址與IP的映射關系。
| IP | hostname | mastername |
| :-------------: | :------------------: | :---------------------: |
| 192.168.211.142 | rocketmq-nameserver2 | rocketmq-master1-slave1 |
| 192.168.211.143 | ocketmq-nameserver1 | rocketmq-master1 |
修改2台機器的/etc/hosts文件,加入如下映射關系:
```shell
192.168.211.143 rocketmq-nameserver1
192.168.211.143 rocketmq-master1
192.168.211.142 rocketmq-nameserver2
192.168.211.142 rocketmq-master1-slave1
```
#### 3.2.2 安裝配置(slave)
由於之前已經安裝了一台RocketMQ,所以我們只需要安裝從節點(192.168.211.142)即可。我們可以把安裝文件上傳到虛擬機上,並解壓安裝。
從192.168.211.143目錄下將RocketMQ拷貝到192.168.211.142服務器上,如下代碼:
```shell
scp rocketmq-all-4.6.1-bin-release.zip 192.168.211.142:/usr/local/server/sofeware
```
解壓文件存放到/usr/local/server/mq目錄下:
```shell
unzip rocketmq-all-4.6.1-bin-release.zip -d /usr/local/server/mq
```
更改j解壓后的文件名:
```shell
mv rocketmq-all-4.6.1-bin-release rocketmq
```
創建RocketMQ存儲文件的目錄,執行如下命令:
```shell
[root@localhost rocketmq]# mkdir logs
[root@localhost rocketmq]# mkdir store
[root@localhost rocketmq]# cd store
[root@localhost store]# mkdir commitlog
[root@localhost store]# mkdir consumerqueue
[root@localhost store]# mkdir index
```
進入conf目錄,替換所有xml中的${user.home},保證日志路徑正確
```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```
注意:sed -i 在這里起一個批量替換的作用
```shell
sed -i 's#原字符串#新字符#g' 替換的文件
```
RocketMQ對內存要求比較高,最少1G,如果內存太少,會影響RocketMQ的運行效率和執行性能,我們需要修改bin目錄小的runbroker.sh和runserver.sh文件
runbroker.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```
runserver.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```
#### 3.2.3 主從配置
在192.168.211.143的/usr/local/server/mq/rocketmq/conf/2m-2s-async/目錄下,將broker-a-s.properties文件修改成以下:
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
broker-a-s.properties文件和broker-a.properties文件區別並不大,主要改了brokerId=1和brokerRole=SLAVE
在192.168.211.143的/usr/local/server/mq/rocketmq/conf/2m-2s-async/目錄下將配置文件拷貝到192.168.211.142服務器上,代碼如下:
```shell
scp broker-a.properties broker-a-s.properties 192.168.211.142:/usr/local/server/mq/rocketmq/conf/2m-2s-async/
```
停止192.168.211.143服務
```shell
sh mqshutdown broker
sh mqshutdown namesrv
```
啟動192.168.211.143的namesrv
```shell
nohup sh mqnamesrv &
```
啟動192.168.211.142的namesrv
```shell
nohup sh mqnamesrv &
```
啟動192.168.211.143的broker
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
```
啟動192.168.211.143的broker
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a-s.properties > /dev/null 2>&1 &
```
#### 3.2.4 控制台配置
修改rocketmq-console-ng控制台項目的application.properties配置文件,添加上主從地址:
```shell
rocketmq.config.namesrvAddr=192.168.211.143:9876;192.168.211.142:9876
```
運行控制台項目,並打開控制台的集群,可以看到集群信息。
#### 3.2.5 主從模式故障演練
主從模式,即使Master宕機后,消費者仍然可以從Slave消費,但不能接收新的消息,我們通過程序來演示一次該流程。
##### 3.2.5.1 master宕機收消息演示
創建MasterSlaveProducer,實現消息發送,代碼如下
```java
public class MasterSlaveProducer {
/**
* 指定namesrv地址
*/
private static String NAMESRV_ADDRESS = "192.168.211.143:9876;192.168.211.142:9876";
public static void main(String[] args) throws Exception {
// 創建一個DefaultMQProducer, 指定消息發送組
DefaultMQProducer producer = new DefaultMQProducer("Test_Quick_Producer_Name");
// 指定namesrv地址
producer.setNamesrvAddr(NAMESRV_ADDRESS);
// 啟動producer
producer.start();
// 創建消息
Message message = new Message(
"Test_Quick_Topic",
"TagA",
"KeyA",
"hello_rocketmq master-slave".getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 發送消息
SendResult result = producer.send(message);
System.out.println(result);
// 關閉produccer
producer.shutdown();
}
}
```
執行消息發送后,可以打開控制台觀察集群信息
停掉192.168.211.143的broker節點
```shell
./mqshutdown broker
```
創建MasterSlaveConsumer,實現消息消費,代碼如下:
```java
public class MasterSlaveConsumer {
/**
* 指定namesrv地址
*/
private static String NAMESRV_ADDRESS = "192.168.211.143:9876;192.168.211.142:9876";
public static void main(String[] args) throws Exception {
// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
// 2、設置namesrv地址
consumer.setNamesrvAddr("");
// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);
// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Demo",
"*");
// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 7、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 開啟Consumer
consumer.start();
}
}
```
運行程序,發現消費者可以從slave中消費消息
##### 3.2.5.2 master宕機發消息演示
關掉192.169.211.143的broker后,再次嘗試發送消息,會報如下錯誤:
```shell
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException:No route info of this topic, Test_Quick_Topic
...
```
### 3.3 RocketMQ集群搭建-雙主雙從
我們這里搭建一個雙主雙從的集群,采用同步復制異步刷盤方式進行集群,在工作中,我們也推薦這么做,我們先把環境准備一下。
#### 3.3.1 准備工作
准備4台機器,如下:
| ip | hostname | mastername |
| --------------- | -------------------- | ---------------------- |
| 192.168.211.141 | rocketmq-nameserver1 | rocketmq-master1 |
| 192.168.211.142 | rocketmq-nameserver2 | rocketmq-master2 |
| 192.168.211.143 | rocketmq-nameserver3 | rocketmq-master1-slave |
| 192.168.211.144 | ocketmq-nameserver4 | rocketmq-master2-slave |
#### 3.3.2 RocketMQ安裝
在每台機器上安裝RocketMQ,安裝過程和上面的**單節點RocketMQ安裝流程**基本類似。
#### 3.3.3 修改映射路徑
修改每台機器的/etc/hosts文件,添加如下映射路徑
```shell
192.168.211.141 rocketmq-nameserver1
192.168.211.142 rocketmq-nameserver2
192.168.211.143 rocketmq-nameserver3
192.168.211.144 rocketmq-nameserver4
192.168.211.141 rocketmq-master1
192.168.211.142 rocketmq-master2
192.168.211.143 rocketmq-master1-slave
192.168.211.144 rocketmq-master2-slave
```
#### 3.3.4 RocketMQ節點配置
4台機器一起解壓rocketmq壓縮文件,最后將解壓文件存放在/usr/local/server/mq/rockermq/目錄下。
分別在4台機器的rocketmq目錄下執行如下操作:
```shell
mkdir logs
mkdir store
cd store/
mkdir commitlog
mkdir consumequeue
mkdir index
```
在4台機器的conf目錄下執行:
```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```
進入4台機器的bin目錄,修改runbroker.sh和runserver.sh
runbroker.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```
runserver.sh
```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```
進入141集群rocketmq/conf/2m-2s-sync目錄下配置對應的配置文件,修改broker-a.properties配置文件,如下:
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
修改broker-b.properties文件,如下:
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
修改修改broker-a-s.properties文件,如下:
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
修改修改broker-b-s.properties文件,如下:
```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```
將這4個文件分別復制到142、143、144服務器對應的目錄下(rocketmq/conf/2m-2s-sync)
#### 3.3.5 RocketMQ集群啟動測試
bin目錄下,
先在每台服務器啟動namesrv
```shell
nohup sh mqnamesrv &
```
再啟動broker
141服務器執行如下啟動命令:
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-a.properties > /dev/null 2>&1 &
```
142服務器執行如下啟動命令:
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-b.properties > /dev/null 2>&1 &
```
143服務器執行如下啟動命令:
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-a-s.properties > /dev/null 2>&1 &
```
144服務器執行如下啟動命令:
```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-b-s.properties > /dev/null 2>&1 &
```
輸入jps查看進程
```shell
[root@localhost bin]# jps
43761 NamesrvStartup
43803 BrokerStartup
44093 jps
[root@localhost bin]#
```
啟動后,需改控制台的namesrv地址如下:
```properties
rocketmq.config.namesrvAddr=192.168.211.141:9876;192.168.211.142:9876;192.168.211.143:9876;192.168.211.144:9876
```
使用md查看:https://www.mdeditor.com/