RocketMQ4.4入門進階+實戰


# RocketMQ4.4入門進階+實戰

## 第一章 RocketMQ

### 1.1 MQ介紹

MQ全稱Message Queue,消息隊列(MQ)是一種應用程序對應用程序的通信方式,應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們,消息傳遞指的是程序之間通過在消息中發送數據進行通信。對立誒的使用除去了接收和發送應用程序同時執行的要求。

### 1.2 主流MQ對比

目前市面上成熟主流的MQ有Kafka、RocketMQ、RabbitMQ,我們這里對每款MQ做一個簡單介紹。

Kafka

```tex
Apache下的一個子項目,使用scala實現的一個高性能分布式Publish/Subscribe消息系統。
1、快速持久化,通過磁盤順序讀寫與零拷貝機制,可以在0(1)的系統開銷下進行消息持久化;
2、高吞吐,在一台普通的服務器上即可以達到10w/s的吞吐量;
3、高堆積,支持topic下消費者較長時間離線,消息堆積量大;
4、完全的分布式系統,Broker、Producer、Consumer都原生支持分布式、依賴zookeeper自動實現負載均衡;
5、支持Hadoop數據並行加載,對應像Hadoop的一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。
```

RocketMQ

```tex
RocketMQ的前身是Metaq,當Metag3.0發布時,產品名稱改為RocketMQ,RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
1、能夠保證嚴格的消息順序
2、提供豐富的消息拉取模式
3、高效的訂閱者水平擴展能力
4、實時的消息訂閱機制
5、支持事務消息
6、億級消息堆積能力
```

RabbitMQ

```tcl
使用Erlang編寫的一個開源消息隊列,本身支持此很多的協議:AMQP、XMPP、SMTP、STOMP,也正是如此,使的它變得非常重量級,更適合於企業級的開發。
```

特性對比

### 1.3 RocktMQ環境要求

RocketMQ對環境有要求,如下:

```tex
64Bit OS, Linux/Unix/Mac is recommended;
64Bit JDK 1.8+;
Maven 3.2.x;
Git;
4g+ free disk for Broker server
```

### 1.4 RocketMQ下載

RocketMQ目前已經捐獻給了Apache,官方網址為http://rocketmq.apache.org/目前最新版本已經是4.6.1版本, 我們可以點擊Latest releaseV4.6.1進入下載頁。

![image-20200313101434315](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200313101434315.png)

![image-20200313101629175](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200313101629175.png)

### 1.5 單節點RocketMQ安裝

#### 1.5.1 環境准備

我們先准備一台centos虛擬機,ip:192.168.211.143,在hosts文件中配置地址與IP的映射關系。

| IP | hostname | mastername |
| --------------- | -------------------- | ---------------- |
| 192.168.211.143 | Rocketmq-nameserver1 | Rocketmq-master1 |

修改/etc/hosts文件,加入如下映射關系

```tex
192.168.211.143 rocketmq-nameserver1
192.168.211.143 rocketmq-master1
```

#### 1.5.2 安裝配置

我們可以把安裝文件上傳到虛擬機上,並解壓安裝

解壓文件存放到/usr/local/server/mq目錄下

```shell
unzip rocketmq-all-4.6.1-bin-release.zip -d /usr/local/server/mq
```

更新解壓后的文件名

```shell
mv rocketmq-all-4.6.1-bin-release rocketmq
```

創建RocketMQ存儲文件的目錄,執行如下命令

```shell
[root@localhost rocketmq]# mkdir logs
[root@localhost rocketmq]# mkdir store
[root@localhost rocketmq]# cd store
[root@localhost store]# mkdir commitlog
[root@localhost store]# mkdir consumerqueue
[root@localhost store]# mkdir index
```

文件夾說明

```shell
logs:存儲RocketMQ日志記錄
store:存儲RocketMQ數據文件目錄
commitlog:存儲RocketMQ消息信息
consumerqueue、index:存儲消息的索引數據
```

conf目錄配置文件說明

```shell
2m-2s-async:2主2從異步
2m-2s-sync :2主2從同步
2m-noslave :2主沒有從
```

我們這里先配置簡單節點,可以修改2m-2s-async的配置實現。

進入2m-2s-async目錄,修改第一個配置文件broker-a.properties

```shell
vi broker-a.properties
```

將如下配置覆蓋掉broker-a.properties所有配置

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=ASYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

進入conf目錄,替換所有xml中的${user.name},保證日志路徑正確

```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```

注意:sed -i 在這里起一個批量替換的作用

```shell
sed -i 's#原字符串#新字符#g' 替換的文件
```

RocketMQ對內存要求比較高,最少1G,如果內存太少,會影響RocketMQ的運行效率和執行性能,我們需要修改bin目錄小的runbroker.sh和runserver.sh文件

runbroker.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```

runserver.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```

先啟動namesrv

```shell
nohup sh mqnamesrv &
```

再啟動broker

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
```

輸入jps查看進程

```shell
[root@localhost rocketmqlogs]# jps
3225 NamesrvStartup
3290 BrokerStartup
3454 Jps
```

#### 1.5.3 RocketMQ控制台安裝

我們這里直接將RocketMQ控制代碼的源碼放到項目中運行,后面課程結束后我們直接運行jar包即可。RocketMQ的控制台由一些RocketMQ愛好者提供。

下載地址:https://github.com/apache/rocketmq-externals/tree/master

### 1.6 RocketMQ架構介紹

![image-20200313142941806](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200313142941806.png)

上面是RocketMQ的部署結構圖,RocketMQ 網絡部署特點:

```shell
1、Name Server 是一個幾乎無狀態節點,可集群部署,節點乀間無任何信息同步。
2、Broker部署相對復雜,Broker分為 Master與Slave,一個 Master可以對應多個Slave,但是一個Slave 只能對應一個Master,Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave,Master也可以部署多個,每個Broker與Name Server 集群中的所有節點建立長連接,定時注冊 Topic信息到所有Name Server。
3、Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
4、Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,並向提供Topic服務的 Master、Slave 建立長連接,且定時向Master、Slave 發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。
```

## 第二章 RocketMQ快速入門

### 2.1 消息生產和消費介紹

使用RocketMQ可以發送普通消息、順序消息、事務消息,順序消息能實現有序消費,事務消息可以解決分布式事務實現數據最終一致性。

RocketMQ有2中常見的消費模式,分別是DefaultMQPushConsumer和DefaultMQPullConsumer模式,這2種模式字面理解一個是推送消息,一個是拉取消息,這里有個誤區,其實無論是Push還是Pull,其本質都是拉取消息,只是實現機制不一樣。

DefaultMQPushConsumer其實並不是broker主動向consumer推送消息,而是consumer向broker發出請求,保持了一種長連接,broker會每5秒檢測一次是否有消息,如果有消息,則將消息推送給consumer。使用DefaultMQPushConsumer實現消費,broker會主動記錄消息消費的偏移量。

DefaultMQPullConsumer是消費方主動去broker拉取數據,一般會在本地使用定時任務實現,使用它獲得消息狀態方便、負載均衡性能可控,但消息的及時性差,而且需要手動記錄消息消費的偏移量信息,所以在實際應用場景中推薦使用Push方式。

RocketMQ發送的消息默認會存儲到4個隊列中去,當然創建幾個隊列存儲數據,可以自己定義。

![image-20200313161014296](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200313161014296.png)

RocketMQ作為MQ消息中間件,ack機制必不可少,在RocketMQ中常見的應答狀態如下:

```java
LocalTransactionState:主要針對事務消息的應答狀態

public enum LocalTransactionState {
// 消息提交
COMMIT_MESSAGE,
// 消息回滾
ROLLBACK_MESSAGE,
// 未知狀態,一般用於處理超時等現象
UNKNOWN;
}
```

```java
ConsumeConcurrentlyStatus:主要針對消息消費的應答狀態

public enum ConsumeConcurrentlyStatus {
// 消息消費成功
CONSUME_SUCCESS,
// 消息重試,一般消息消費失敗后,RocketMQ為了保障數據的可靠性,具有重試機制
RECONSUME_LATER;
}
```

重發時間是:(broker.log中有)

```xml
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
```

### 2.2 RocketMQ普通消息生產者

#### 2.2.1 工程創建

我們實現一個最基本的消息發送,先創建一個springboot工程,工程名叫rocketmq-demo1

pom.xml

```xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.leo</groupId>
<artifactId>study</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>study</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
<rocketmq.version>4.6.1</rocketmq.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- rocket mq -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

```

#### 2.2.2 消息發送

消息發送有這么幾個步驟:

```java
// 1、創建DefaultMQProducer
// 2、設置Namesrv地址
// 3、開啟DefaultMQProducer
// 4、創建消息Message
// 5、發送消息
// 6、關閉DefaultMQProducer
```

我們創建一個Producer類,按照上面步驟實現消息發送,代碼如下:

```java
public class Producer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_group");

// 2、設置Namesrv地址
producer.setNamesrvAddr("");

// 3、開啟DefaultMQProducer
producer.start();

// 4、創建消息Message
Message message = new Message("Topic_Demo", "Tags", "Keys_1", "Hello RocketMQ".getBytes());

// 5、發送消息
SendResult result = producer.send(message);
System.out.println(result);

// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```

### 2.3 RocketMQ普通消息消費者

#### 2.3.1消息消費

消費者消費消息有這么幾個步驟:

```java
// 1、創建DefaultMQPushConsumer
// 2、設置namesrv地址
// 3、設置消息拉取最大數
// 4、設置subscribe,這里是要讀取訂單主題信息
// 5、創建消息監聽MessageListener
// 6、獲取消息信息
// 7、返回消息讀取狀態
```

創建Consumer類,按照上面步驟實現消息消費,代碼如下:

```java
public class Consumer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Demo",
"*");

// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}

// 7、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

### 2.4 RocketMQ順序消息

消息有序指的是可以按照消息的發送順序來消費。RocketMQ可以嚴格的保證消息順序,但這個順序,不是全局順序,只是分區(queue)順序,要全局順序只能一個分區。

如何保證順序

```java
在MQ模型中,順序需要由3個階段去保證:
1、消息被發送時保證順序
2、消息被存儲時保持和發送的順序一致
3、消息被消費時保持和存儲的順序一致
```

發送時保持順序意味着對於有順序要求的消息,用戶應該在同一線程中采用同步的方式發送。存儲保持和順序一致則要求在同一線程中被發送出來的消息A和B,存儲時在空間上A一定在B之前。而消費保持和存儲一致則要求消息A、B到達Consumer之后必須按照先A后B的順序被處理。

創建一個OrderProducer類,按照步驟實現消息發送,代碼如下:

```java
public class OrderProducer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_order_group");

// 2、設置Namesrv地址
producer.setNamesrvAddr("");

// 3、開啟DefaultMQProducer
producer.start();

// 5、發送消息
// 第一個參數:發送的消息信息
// 第二個參數:選擇指定的消息隊列對象(會將所有的消息隊列傳進來)
// 第三個參數:指定對應的隊列下表

// 連續發送5條消息
for (int i = 0; i < 5; i++) {

// 4、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
Message message = new Message(
"Topic_Order_Demo",
"Tags", "Keys_" + i,
("Hello RocketMQ_" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);


SendResult result = producer.send(
message,
(List<MessageQueue> list, Message msg, Object arg) -> {
// 獲取隊列的下標
Integer index = (Integer) arg;
return list.get(index);
},
1
);
System.out.println(result);
}

// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```

創建一個OrderConsumer類,按照步驟實現消息消費,代碼如下:

```java
public class OrderConsumer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_order_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Order_Demo",
"*");

// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerOrderly) (list, consumeOrderlyContext) -> {

// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Order Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}

return ConsumeOrderlyStatus.SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

### 2.5 RocketMQ事務消息

在RocketMQ4.3.0版本后,開放了事務消息這一特性,對於分布式事務而言,最常說的還是兩階段提交協議。

#### 2.5.1 RocketMQ事務消息流程

RocketMQ的事務消息,主要是通過消息的異步處理,可以保證本地事務和消息發送同時成功執行或失敗,從而保證數據的最終一致性,這里我們先看看一條事務消息從誕生到結束的整個時間線流程:

![image-20200314162145933](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200314162145933.png)

#### 2.5.2 事務消息生產者

我們創建一個事務消息生產者TransactionProducer,事務消息發送消息對象是TransactionMQProducer,為了實現本地事務操作和回查,我們需要創建一個監聽器,監聽器需要實現TransactionListener接口,實現步驟如下:

```java
// 1、創建TransactionMQProducer
// 2、設置Namesrv地址
// 3、指定消息監聽對象,用於執行本地事務和消息回查
// 4、指定線程池
// 5、開啟TransactionMQProducer
// 6、創建消息Message
// 7、發送事務消息
// 8、關閉TransactionMQProducer
```

TransactionProducer代碼如下:

```java
public class TransactionProducer {

public static void main(String[] args) throws Exception {

// 1、創建TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("demo_producer_transaction_group");

// 2、設置Namesrv地址
producer.setNamesrvAddr("");

// 3、指定消息監聽對象,用於執行本地事務和消息回查
producer.setTransactionListener(new TransactionListenerImpl());

// 4、指定線程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
);
producer.setExecutorService(executorService);

// 5、開啟TransactionMQProducer
producer.start();

// 6、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
Message message = new Message(
"Topic_Transaction_Demo",
"Tags",
"Keys_T",
"Hello Transaction RocketMQ Message".getBytes(RemotingHelper.DEFAULT_CHARSET)
);

// 7、發送事務消息
// 第一個參數:發送的消息信息
// 第二個參數:選擇指定的消息隊列對象(會將所有的消息隊列傳進來)

TransactionSendResult result = producer.sendMessageInTransaction(message, "hello-transaction");
System.out.println(result);

// 8、關閉TransactionMQProducer
producer.shutdown();
}
}
```

監聽器代碼如下:

```java
public class TransactionListenerImpl implements TransactionListener {

/**
* 存儲對應事務的狀態信息,key:事務ID,value:當前事務執行的狀態
*/
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

/**
* 執行本地事務
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {

// 事務ID
String transactionId = message.getTransactionId();

// 0:執行中,狀態未知,1:本地事務執行成功,2:本地事務執行失敗
localTrans.put(transactionId, 0);

// 業務執行,處理本地事務
System.out.println("Hello-----Transaction");

try {

System.out.println("正在執行本地事務");
Thread.sleep(2000);
System.out.println("本地事務執行成功");

localTrans.put(transactionId, 1);
} catch (InterruptedException e) {
e.printStackTrace();
localTrans.put(transactionId, 2);
return LocalTransactionState.ROLLBACK_MESSAGE;
}

return LocalTransactionState.COMMIT_MESSAGE;
}

/**
* 消息回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

// 獲取事務ID
String transactionId = messageExt.getTransactionId();

// 獲取對應事務ID的執行狀態
Integer status = localTrans.get(transactionId);

System.out.println("消息回查--TransactionId:" + transactionId + ", 狀態:" + status);

switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}

return LocalTransactionState.UNKNOW;
}
}
```

#### 2.5.3 事務消息消費者

事務消息的消費者和普通消費者一樣,代碼如下:

```java
public class TransactionConsumer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_transaction_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 設置消息順序
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Transaction_Demo",
"*");

// 5、創建消息監聽MessageListener
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
for (MessageExt msg: list) {
try {
String topic = msg.getTopic();
String tags = msg.getTags();
String keys = msg.getKeys();
String body = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic: " + topic + ", tags: " + tags + ", keys: " + keys + ", body: " + body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

#### 2.5.4 RocketMQ實現分布式事務流程

MQ事務消息解決分布式事務問題,但第三方MQ支持事務消息的中間件不多,比如RocketMQ,它支持事務消息的方式也是類似於二階段提交,但是市面上一些主流的MQ都是不支持事務消息的,比如RabbitMQ和Kafka都不支持。

以阿里的RocketMQ中間件為例,其思路大致為:

第一階段Prepared消息,會拿到消息的地址。

第二階段執行本地事務。

第三階段通過第一階段拿到的地址去訪問消息,並修改狀態。

也就是說在業務方法內要向消息隊列提交兩次請求,一發送消息和一次確認消息。如果確認消息發送失敗了,RocketMQ會定期掃描消息集群中的事務消息,這時候發現了Prepared消息,它會向消息發送者確認,所以生產方需要實現一個check接口,RocketMQ會根據發送端設置的策略來決定是回滾還是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。

![image-20200315115557057](/Users/huojunlin/Library/Application Support/typora-user-images/image-20200315115557057.png)

### 2.6 消息廣播/批量發送

上面發送消息,我們測試的時候,可以發現消息只有一個消費者能收到,如果我們想實現消息廣播,讓每個消費者都能收到消息也是可以實現的。而且上面發消息的時候,每次都是發送單挑Message對象,能否批量發送呢?答案是可以的。

#### 2.6.1 消息生產者

創建消息生產者BroadcastingProducer,代碼如下:

```java
public class BroadcastingProducer {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQProducer
DefaultMQProducer producer = new DefaultMQProducer("demo_producer_broadcasting_group");

// 2、設置Namesrv地址
producer.setNamesrvAddr("");

// 3、開啟DefaultMQProducer
producer.start();

// 4、創建消息Message
// topic:主題,tags: 標簽,主要用於消息過濾,keys:消息的唯一值,body:消息體
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Message message = new Message(
"Topic_broadcasting_Demo",
"Tags",
"Keys_" + i,
("Hello RocketMQ_" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
messages.add(message);
}


// 5、發送消息
SendResult result = producer.send(messages);
System.out.println(result);

// 6、關閉DefaultMQProducer
producer.shutdown();
}
}
```

#### 2.6.2 消息消費者

BroadcastingConsumerA代碼如下:

```java
public class BroadcastingConsumerA {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_broadcasting_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、默認是集群模式,改成廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

// 4、、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 5、、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_broadcasting_Demo",
"*");

// 6、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 7、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer_A消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}

// 8、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

BroadcastingConsumerB代碼如下:

```java
public class BroadcastingConsumerB {

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_broadcasting_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、默認是集群模式,改成廣播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

// 4、、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 5、、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_broadcasting_Demo",
"*");

// 6、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 7、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer_B消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}

// 8、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

##第三章 RocketMQ集群

### 3.1 RocketMQ集群模式

#### 3.1.1 單個Master

這是一種風險比較大的方式(無集群),因為一旦Broker重啟或者宕機期間,將會導致整個服務不可用,因此是不建議線上環境去使用的。

#### 3.1.2 多個Master

一個集群全部都是Master,沒有Slave,它的有點和缺點如下:

優點:配置簡單,單個Master宕機或者重啟維護對應用沒有什么影響,在磁盤配置為RAID10時,即使機器宕機不可恢復的情況下,消息也不會丟失(異步刷盤會丟失少量消息,同步刷盤則是一條都不會丟失),性能最高。

缺點:當單個Broker宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息的實時性會收到影響。

#### 3.1.3 多Master多Slave模式-異步復制

每個Master配置一個Slave,有多對的Master-Slave,HA采用的是異步復制方式,主備有短暫的消息延遲,毫秒級別的(Master收到消息之后立刻向應用返回成功標識,同時向Slave寫入消息),優缺點如下:

優點:即使是磁盤損壞了,消息丟失的非常少,且消息實時性不受影響,因為Master宕機之后,消費者仍可以從Slave消費,此過程對應用透明,不需要人工干預,性能同多個Master模式幾乎一樣。

缺點:Master宕機,磁盤損壞的情況下,會丟失少量的消息。

#### 3.1.4 多Master多Slave模式-同步雙寫

每個Master配置一個Slave,有多對的Master-Slave,HA采用的是同步雙寫模式,主備都寫成功,才會向應用返回成功。

優點:數據和服務都無單點,Master宕機的情況下,消息無延遲,服務可用性與數據可用性都非常高。

缺點:性能比異步復制模式略低,大約低10%左右,發送單個Master的RT會略高,目前主機宕機后,Slave不同自動切換為主機,后續會支持自動切換功能。

### 3.2 RocketMQ主從搭建

#### 3.2.1 環境准備

我們先准備2台centos虛擬機,ip:192.168.211.142/192.168.211.143,在hosts文件中配置地址與IP的映射關系。

| IP | hostname | mastername |
| :-------------: | :------------------: | :---------------------: |
| 192.168.211.142 | rocketmq-nameserver2 | rocketmq-master1-slave1 |
| 192.168.211.143 | ocketmq-nameserver1 | rocketmq-master1 |

修改2台機器的/etc/hosts文件,加入如下映射關系:

```shell
192.168.211.143 rocketmq-nameserver1
192.168.211.143 rocketmq-master1
192.168.211.142 rocketmq-nameserver2
192.168.211.142 rocketmq-master1-slave1
```

#### 3.2.2 安裝配置(slave)

由於之前已經安裝了一台RocketMQ,所以我們只需要安裝從節點(192.168.211.142)即可。我們可以把安裝文件上傳到虛擬機上,並解壓安裝。

從192.168.211.143目錄下將RocketMQ拷貝到192.168.211.142服務器上,如下代碼:

```shell
scp rocketmq-all-4.6.1-bin-release.zip 192.168.211.142:/usr/local/server/sofeware
```

解壓文件存放到/usr/local/server/mq目錄下:

```shell
unzip rocketmq-all-4.6.1-bin-release.zip -d /usr/local/server/mq
```

更改j解壓后的文件名:

```shell
mv rocketmq-all-4.6.1-bin-release rocketmq
```

創建RocketMQ存儲文件的目錄,執行如下命令:

```shell
[root@localhost rocketmq]# mkdir logs
[root@localhost rocketmq]# mkdir store
[root@localhost rocketmq]# cd store
[root@localhost store]# mkdir commitlog
[root@localhost store]# mkdir consumerqueue
[root@localhost store]# mkdir index
```

進入conf目錄,替換所有xml中的${user.home},保證日志路徑正確

```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```

注意:sed -i 在這里起一個批量替換的作用

```shell
sed -i 's#原字符串#新字符#g' 替換的文件
```

RocketMQ對內存要求比較高,最少1G,如果內存太少,會影響RocketMQ的運行效率和執行性能,我們需要修改bin目錄小的runbroker.sh和runserver.sh文件

runbroker.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```

runserver.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```

#### 3.2.3 主從配置

在192.168.211.143的/usr/local/server/mq/rocketmq/conf/2m-2s-async/目錄下,將broker-a-s.properties文件修改成以下:

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

broker-a-s.properties文件和broker-a.properties文件區別並不大,主要改了brokerId=1和brokerRole=SLAVE

在192.168.211.143的/usr/local/server/mq/rocketmq/conf/2m-2s-async/目錄下將配置文件拷貝到192.168.211.142服務器上,代碼如下:

```shell
scp broker-a.properties broker-a-s.properties 192.168.211.142:/usr/local/server/mq/rocketmq/conf/2m-2s-async/
```

停止192.168.211.143服務

```shell
sh mqshutdown broker
sh mqshutdown namesrv
```

啟動192.168.211.143的namesrv

```shell
nohup sh mqnamesrv &
```

啟動192.168.211.142的namesrv

```shell
nohup sh mqnamesrv &
```

啟動192.168.211.143的broker

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
```

啟動192.168.211.143的broker

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-async/broker-a-s.properties > /dev/null 2>&1 &
```

#### 3.2.4 控制台配置

修改rocketmq-console-ng控制台項目的application.properties配置文件,添加上主從地址:

```shell
rocketmq.config.namesrvAddr=192.168.211.143:9876;192.168.211.142:9876
```

運行控制台項目,並打開控制台的集群,可以看到集群信息。

#### 3.2.5 主從模式故障演練

主從模式,即使Master宕機后,消費者仍然可以從Slave消費,但不能接收新的消息,我們通過程序來演示一次該流程。

##### 3.2.5.1 master宕機收消息演示

創建MasterSlaveProducer,實現消息發送,代碼如下

```java
public class MasterSlaveProducer {

/**
* 指定namesrv地址
*/
private static String NAMESRV_ADDRESS = "192.168.211.143:9876;192.168.211.142:9876";

public static void main(String[] args) throws Exception {

// 創建一個DefaultMQProducer, 指定消息發送組
DefaultMQProducer producer = new DefaultMQProducer("Test_Quick_Producer_Name");

// 指定namesrv地址
producer.setNamesrvAddr(NAMESRV_ADDRESS);

// 啟動producer
producer.start();

// 創建消息
Message message = new Message(
"Test_Quick_Topic",
"TagA",
"KeyA",
"hello_rocketmq master-slave".getBytes(RemotingHelper.DEFAULT_CHARSET)
);

// 發送消息
SendResult result = producer.send(message);
System.out.println(result);

// 關閉produccer
producer.shutdown();
}
}
```

執行消息發送后,可以打開控制台觀察集群信息

停掉192.168.211.143的broker節點

```shell
./mqshutdown broker
```

創建MasterSlaveConsumer,實現消息消費,代碼如下:

```java
public class MasterSlaveConsumer {

/**
* 指定namesrv地址
*/
private static String NAMESRV_ADDRESS = "192.168.211.143:9876;192.168.211.142:9876";

public static void main(String[] args) throws Exception {

// 1、創建DefaultMQPushConsumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");

// 2、設置namesrv地址
consumer.setNamesrvAddr("");

// 3、設置消息拉取最大數
consumer.setConsumeMessageBatchMaxSize(2);

// 4、設置subscribe,這里是要讀取訂單主題信息
// topic:指定消費的主題,subExpression:過濾規則
consumer.subscribe("Topic_Demo",
"*");

// 5、創建消息監聽MessageListener
consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
// 迭代消息
for (MessageExt msg : list) {
// 獲取主題
String topic = msg.getTopic();
// 獲取標簽
String tags = msg.getTags();
// 獲取信息
try {
// 6、獲取消息信息
String result = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("Consumer消費信息----topic: " + topic + ", tags: " + tags + ", result: " + result);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
// 消息重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}

}

// 7、返回消息讀取狀態
// 消息消費成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 開啟Consumer
consumer.start();
}
}
```

運行程序,發現消費者可以從slave中消費消息

##### 3.2.5.2 master宕機發消息演示

關掉192.169.211.143的broker后,再次嘗試發送消息,會報如下錯誤:

```shell
Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException:No route info of this topic, Test_Quick_Topic
...
```

### 3.3 RocketMQ集群搭建-雙主雙從

我們這里搭建一個雙主雙從的集群,采用同步復制異步刷盤方式進行集群,在工作中,我們也推薦這么做,我們先把環境准備一下。

#### 3.3.1 准備工作

准備4台機器,如下:

| ip | hostname | mastername |
| --------------- | -------------------- | ---------------------- |
| 192.168.211.141 | rocketmq-nameserver1 | rocketmq-master1 |
| 192.168.211.142 | rocketmq-nameserver2 | rocketmq-master2 |
| 192.168.211.143 | rocketmq-nameserver3 | rocketmq-master1-slave |
| 192.168.211.144 | ocketmq-nameserver4 | rocketmq-master2-slave |

#### 3.3.2 RocketMQ安裝

在每台機器上安裝RocketMQ,安裝過程和上面的**單節點RocketMQ安裝流程**基本類似。

#### 3.3.3 修改映射路徑

修改每台機器的/etc/hosts文件,添加如下映射路徑

```shell
192.168.211.141 rocketmq-nameserver1
192.168.211.142 rocketmq-nameserver2
192.168.211.143 rocketmq-nameserver3
192.168.211.144 rocketmq-nameserver4
192.168.211.141 rocketmq-master1
192.168.211.142 rocketmq-master2
192.168.211.143 rocketmq-master1-slave
192.168.211.144 rocketmq-master2-slave
```

#### 3.3.4 RocketMQ節點配置

4台機器一起解壓rocketmq壓縮文件,最后將解壓文件存放在/usr/local/server/mq/rockermq/目錄下。

分別在4台機器的rocketmq目錄下執行如下操作:

```shell
mkdir logs
mkdir store
cd store/
mkdir commitlog
mkdir consumequeue
mkdir index
```

在4台機器的conf目錄下執行:

```shell
sed -i 's#${user.home}#/usr/local/server/mq/rocketmq#g' *.xml
```

進入4台機器的bin目錄,修改runbroker.sh和runserver.sh

runbroker.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g Xmn4g"
改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g Xmn1g"
```

runserver.sh

```shell
改前:
JAVA_OPT="${JAVA_OPT} -server Xms4g Xmx4g Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
改后:
JAVA_OPT="${JAVA_OPT} -server Xms1g Xmx1g Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
```

進入141集群rocketmq/conf/2m-2s-sync目錄下配置對應的配置文件,修改broker-a.properties配置文件,如下:

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

修改broker-b.properties文件,如下:

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示Master,>0表示slave
brokerId=0
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SYNC_MASTER
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

修改修改broker-a-s.properties文件,如下:

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-a
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

修改修改broker-b-s.properties文件,如下:

```properties
#所屬集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置文件填寫的不一樣
brokerName=broker-b
#0 表示Master,>0表示slave
brokerId=1
#nameServer地址,分號分隔
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876;rocketmq-nameserver3:9876;rocketmq-nameserver4:9876
#在發送消息時,自動創建服務器不存在的Topic,默認創建的隊列數
defaultTopicQueueNums=4
#是否允許Broker自動創建Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許Broker自動創建訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker對外服務的監聽端口
listenPort=10911
#刪除文件時間點,默認是凌晨4點
deleteWhen=04
#文件保留時間,默認48小時
fileReservedTime=120
#commitLog每個文件的大小默認1G
mapedFileSizeCommitLog=1073741824
#ConsumerQueue每個文件默認存30W條,根據業務情況調整
mapedFileSizeConsumerQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理文件磁盤空間
diskMaxUsedSpaceRatio=88
#存儲路徑
storePathRootDir=/usr/local/server/mq/rocketmq/store
#commitLog存儲路徑
storePathCommitLog=/usr/local/server/mq/rocketmq/store/commitlog
#消息隊列存儲路徑
storePathConsumerQueue=/usr/local/server/mq/rocketmq/store/consumerqueue
#消息索引存儲路徑
storePathIndex=/usr/local/server/mq/rocketmq/store/index
#checkpoint文件存儲路徑
storeCheckpoint=/usr/local/server/mq/rocketmq/store/checkpoint
#abort文件存儲路徑
abortFile=/usr/local/server/mq/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumerQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumerQueueThorougnInterval=60000
#Broker的角色
# - ASYNC_MASTER 異步復制Master
# - SYNC_MASTER 同步雙寫Master
# - SLAVE
brokerRole=SLAVE
#刷盤方式
# - ASYNC_FLUSH 異步刷盤
# - SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發消息線程池數量
#sendMessageThreadPoolNums=128
#拉消息線程池數量
#pullMessageThreadPoolNums=128
```

將這4個文件分別復制到142、143、144服務器對應的目錄下(rocketmq/conf/2m-2s-sync)

#### 3.3.5 RocketMQ集群啟動測試

bin目錄下,

先在每台服務器啟動namesrv

```shell
nohup sh mqnamesrv &
```

再啟動broker

141服務器執行如下啟動命令:

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-a.properties > /dev/null 2>&1 &
```

142服務器執行如下啟動命令:

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-b.properties > /dev/null 2>&1 &
```

143服務器執行如下啟動命令:

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-a-s.properties > /dev/null 2>&1 &
```

144服務器執行如下啟動命令:

```shell
nohup sh mqbroker -c /usr/local/server/mq/rocketmq/conf/2m-2s-sync/broker-b-s.properties > /dev/null 2>&1 &
```

輸入jps查看進程

```shell
[root@localhost bin]# jps
43761 NamesrvStartup
43803 BrokerStartup
44093 jps
[root@localhost bin]#
```

啟動后,需改控制台的namesrv地址如下:

```properties
rocketmq.config.namesrvAddr=192.168.211.141:9876;192.168.211.142:9876;192.168.211.143:9876;192.168.211.144:9876
```

使用md查看:https://www.mdeditor.com/

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM