概述
**本人博客網站 **IT小神 www.itxiaoshen.com
定義
Apache RocketMQ官網地址 https://rocketmq.apache.org/ Latest release v4.9.2
Apache RocketMQ GitHub源碼地址 https://github.com/apache/rocketmq
Apache RocketMQ™是一個分布式消息傳遞和流媒體平台、統一的消息傳遞引擎,輕量級的數據處理平台;具有低延遲、高性能和可靠性、萬億級容量和靈活的可伸縮性。
今天我們又來學習一個Apache頂級項目Apache RocketMQ,RocketMQ由國人阿里團隊采用Java語言開發和開源的,曾獲得2016、2018中國最受歡迎的開源軟件獎。RocketMQ憑借其強大的存儲能力和強大的消息索引能力,以及各種類型消息和消息的特性脫穎而出。Apache RocketMQ官網地址及其GitHub都提供非常詳細中文學習文檔如Apache RocketMQ開發者指南等,學習起來可謂是非常之流暢、酸爽、so easy!讓我們通過官網和及其GitHub來深入學習這個與時俱進非常優秀互聯網主流的消息中間件。
為何需要Apache RocketMQ?
阿里早期是基於ActiveMQ 5的分布式消息傳遞中間件,隨着隊列和虛擬主題的增加ActiveMQ IO模塊達到了瓶頸,當時也研討過Kafka但當時的Kafka不能滿足阿里的要求(特別是在低延遲和高可靠性方面),因此阿里決定自行研發一個消息中間件,從傳統的發布/訂閱場景到高容量的實時零損失容忍度事務系統,這就是RocketMQ誕生的原因。
RocketMQ vs. ActiveMQ vs. Kafka
下表展示了RocketMQ、ActiveMQ和Kafka(根據awesome-java的Apache最流行的消息傳遞解決方案)之間的比較。根據個人經驗,如果不是大數據場景下如大數據日志采集等場景外建議優先使用RocketMQ,性能和功能都有保障,當然需要用於雲原生領域還有Apache Pulsar雲原生分布式消息和流平台,這個在前面的文章也有較少。
安裝部署
安裝說明
the latest release is 4.9.2
二進制下載地址 https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
源碼下載地址 https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
Apache RocketMQ部署方式有單Master模式、多Master模式、多Master多slave模式、Dledger的集群部署模式等,官網也提供額外的CLI Admin Tool和運維工具mqadmin。在二進制包下conf目錄提供了兩主兩從異步方式、兩主兩從同步方式、兩主無從、Dledger集群的配置模板。
網絡部署特點
- NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
- Broker部署相對復雜,Broker分為Master與Slave,Master提供RW訪問,而Slave只接受讀訪問;一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與消息的讀負載。
- Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
- Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,並向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取。
配置推薦
在部署RocketMQ集群時,推薦的配置如下所示:
部署方式說明
-
單Master模式
- 這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。
-
多Master模式
- 一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
- 優點:配置簡單,單個Master宕機或重啟維護對應用無影響,在磁盤配置為RAID10時,即使機器宕機不可恢復情況下,由於RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條不丟),性能最高;
- 缺點:單台機器宕機期間,這台機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到影響。
- 一個集群無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
-
多Master多Slave模式-異步復制
- 每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
- 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,同時Master宕機后,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工干預,性能同多Master模式幾乎一樣;
- 缺點:Master宕機,磁盤損壞情況下會丟失少量消息。
- 每個Master配置一個Slave,有多對Master-Slave,HA采用異步復制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
-
多Master多Slave模式-同步雙寫
- 每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
- 優點:數據與服務都無單點故障,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高;
- 缺點:性能比異步復制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點宕機后,備機不能自動切換為主機。
- 每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即只有主備都寫成功,才向應用返回成功,這種模式的優缺點如下:
單Master部署
單Master模式部署非常簡單,這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用。不建議線上環境使用,可以用於本地測試。先啟動NameServer后啟動Broker。
#linux部署,解壓下載zip進入二級制加壓的根目錄
unzip rocketmq-all-4.9.2-bin-release.zip
cd rocketmq-4.9.2
#啟動NameServer
nohup sh bin/mqnamesrv &
#查看NameServer運行日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
#查看Broker運行日志
tail -f ~/logs/rocketmqlogs/broker.log
#關閉Broker
sh bin/mqshutdown broker
#關閉NameServer
sh bin/mqshutdown namesrv
Dledger集群部署
多主多從模式有模板配置,根據不同配置拉起Broker即可,但是從上面我們知道在多主多從模式下是不支持自動容災切換功能,因此還不具備完全的高可用,我們這里使用Dledger集群部署實現自動容災切換;之前我們在ZooKeeper章節也了解到分布式一致性算法,其實Dledger也是依賴Raft算法實現選舉的功能。Dledger一個基於java庫用於構建高可用性、高耐用性、強一致性的提交,它可以作為分布式存儲系統的持久化層,如消息傳遞、流、kv、db等。Dledger是已被證明可以應用於生產級別的產品。
NameServer需要先於Broker啟動,且如果在生產環境使用,為了保證高可用,建議一般規模的集群啟動3個NameServer。我們本次准備3台服務器192.168.50.95(n0)、192.168.50.156(n1)、192.168.50.196(n2)。
cd rocketmq-4.9.2
#3台服務器啟動Name Server
nohup sh bin/mqnamesrv &
#驗證Name Server 是否啟動成功
tail -f ~/logs/rocketmqlogs/namesrv.log
在conf\dledger參考broker-n0.conf數據創建文件名為broker.conf數據內容如下,其他兩台和這個數據一樣,只需要修改dLegerSelfId為n1和n2即可。
vi conf/dledger/broker.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876
storePathRootDir=/home/commons/rocketmq-4.9.2/rmqstore/node00
storePathCommitLog=/home/commons/rocketmq-4.9.2/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.50.95:40911;n1-192.168.50.156:40911;n2-192.168.50.196:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
#可以3台分別先創建配置文件路徑,非必要
mkdir /home/commons/rocketmq-4.9.2/rmqstore/node00
mkdir /home/commons/rocketmq-4.9.2/rmqstore/node00/commitlog
#3台分別啟動broker
nohup sh bin/mqbroker -c conf/dledger/broker.conf &
#查看Broker運行日志
tail -f ~/logs/rocketmqlogs/broker.log
通過 mqadmin 運維命令查看集群狀態,可指定任意一台Name Server
sh bin/mqadmin clusterList -n 192.168.50.95:9876
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-PcYuzQb9-1638544959716)(http://www.itxiaoshen.com:3001/assets/1638501706653H0QPMbWB.png)]
BID 為 0 的表示 Master,其余都是 Follower,從當前看192.168.50.156為Master,我們進行容災切換測試,停掉192.168.50.156上的Broker進程,等待約 10s 左右,用 clusterList 命令再次查看集群,就會發現 Leader 切換到另一個節點192.168.50.196上
再次啟動192.168.50.156上的broker重新再加入集群並作為集群的Follower
簡單收發消息測試
#192.168.50.95上執行測試工具的生產者發送消息
export NAMESRV_ADDR="192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876"
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-HFLt4ZFo-1638544959726)(http://www.itxiaoshen.com:3001/assets/1638502366597EjsQW4e8.png)]
#192.168.50.95上執行測試工具的消費者接收消息
export NAMESRV_ADDR="192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876"
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Java示例
常用消息樣例說明
-
簡單消息(三種方式發送消息)
- 可靠同步,使用的比較廣泛,比如:重要的消息通知,短信通知。
- 可靠異步,通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
- 單向傳輸,用在不特別關心發送結果的場景,例如日志發送。
-
順序消息
- RocketMQ使用FIFO順序提供有序消息,RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
- 比如用訂單場景,一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
-
廣播消息
- 向一個主題的所有訂閱者發送消息。
-
延遲消息
- 延遲消息與普通消息的不同之處在於它們將在稍后提供的時間內被傳遞,比如電商里提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
-
批量消息
- 批量發送消息可以提高發送小消息的性能。
- 約束:同一批的消息應該有:相同的主題,相同的waitStoreMsgOK,不支持延遲。
-
過濾消息
- 在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。
- 在RocketMQ定義的語法下可以使用SQL表達式篩選消息,SQL特性可以通過發送消息時的屬性來進行計算。
- 只有使用push模式的消費者才能用使用SQL92標准的sql語句。
-
Logappender日志
- RocketMQ日志提供log4j、log4j2和logback日志框架作為業務應用
-
OpenMessaging
- 旨在建立消息和流處理規范,以為金融、電子商務、物聯網和大數據領域提供通用框架及工業級指導方案。在分布式異構環境中,設計原則是面向雲、簡單、靈活和獨立於語言。符合這些規范將幫助企業方便的開發跨平台和操作系統的異構消息傳遞應用程序。提供了openmessaging-api 0.3.0-alpha的部分實現。
-
事務消息
-
可以將其視為兩階段提交消息實現,以確保分布式系統中的最終一致性。事務性消息確保本地事務的執行和消息的發送能夠被原子地執行。
-
限制約束
- 事務消息不支持延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的
transactionCheckMax
參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax
) 則 Broker 將丟棄此消息,並在默認情況下同時打印錯誤日志。用戶可以通過重寫AbstractTransactionalMessageCheckListener
類來修改這個行為。 - 事務消息將在 Broker 配置文件中的參數 transactionTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於
transactionTimeout
參數。 - 事務性消息可能不止一次被檢查或消費。
- 提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。
-
事務性消息有三種狀態:
(1) TransactionStatus。CommitTransaction:提交事務,它意味着允許使用者使用此消息。
(2) TransactionStatus。rollback transaction:回滾事務,它意味着消息將被刪除並且不允許使用。
(3) TransactionStatus。未知:中間狀態,這意味着MQ需要進行回查以確定狀態。
-
簡單消息示例代碼
pom加入maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
可靠同步生產者實現代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("DefaultTopic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
可靠異步生產者實現代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("DefaultTopic",
"TagA",
"OrderID888888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
單向傳輸生產者實現代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("DefaultTopic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
消費者實現代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("DefaultTopic", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可靠同步生產者發送消息
消費者消費消息
其他消息示例可以參考官網的樣例使用即可
面試題
說說RocketMQ架構和組成?
從Apache RocketMQ官網架構圖看可知道其由四個大部分組成,分別為名稱服務器集群、Broker集群、生產者集群和消費者集群;它們中的每一個都可以水平擴展而不存在單一的故障點。
-
NameServer Cluster(命名服務器集群):名稱服務器提供輕量級的服務發現和路由。每個Name Server記錄完整的路由信息,提供相應的讀寫服務,支持快速的存儲擴展。我們知道Kafka是依賴ZooKeeper來實現服務發現和路由的。
- Broker管理,NameServer接受來自Broker集群的注冊,並提供心跳機制來檢查代理是否處於活動狀態。
- 路由管理,每個NameServer將保存關於代理集群的全部路由信息和用於客戶端查詢的隊列信息。
- Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。
- NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一台NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。
- RocketMQ客戶端(生產者/消費者)將從NameServer查詢隊列路由信息,客戶端可以通過多種方式找到NameServer的地址,下面列出幾種
- 編程方式,如producer.setNamesrvAddr("ip:port")。
- Java選項,使用rocketmq.namesrv.addr。
- 環境變量使用NAMESRV_ADDR。
- HTTP Endpoint。
-
Broker Cluster(代理集群):Broker是作為RocketMQ最核心消息Server,Broker通過提供輕量級的TOPIC和QUEUE機制來負責消息存儲。它們支持Push和Pull模型,包含容錯機制(2副本或3副本),並提供強大的填充峰值和按原始時間順序累積數千億條消息的能力。此外,Broker提供災難恢復、豐富的度量統計信息和警報機制,這些都是傳統消息中間件系統所缺乏的;代理服務器負責消息存儲和傳遞、消息查詢、HA保證等,Broker服務器有幾個重要的子模塊:
- 遠程模塊,Broker的入口,處理來自客戶機的請求。
- 客戶端管理器,管理客戶端(生產者/消費者)並維護消費者的主題訂閱。
- 存儲服務,提供簡單的api在物理磁盤中存儲或查詢消息。
- HA服務,在主Broker和從Broker之間提供數據同步功能。
- 索引服務,根據指定的鍵為消息構建索引,並提供快速的消息查詢。
-
Producer Cluster(生產者集群):生產者支持分布式部署;分布式生產者通過多種負載均衡模式向Broker集群發送消息;發送過程支持快速失敗和低延遲。
-
Consumer Cluster(消費者集群):消費者也支持Push和Pull模型中的分布式部署;它還支持集群使用和消息廣播;它提供了實時消息訂閱機制,可以滿足大多數用戶的需求。
說說RocketMQ核心概念?
Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中,ConsumerGroup 由多個Consumer 實例構成。
-
消息模型
- Clustering:集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
- Broadcasting:廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
-
生產者組:同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一致。如果發送的是事務消息且原始生產者在發送之后崩潰,則Broker服務器會聯系同一生產者組的其他生產者實例以提交或回溯消費。
- Producer(生產者):負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。
-
消費者組:同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費(Clustering)和廣播消費(Broadcasting)。
- Consumer (消費者):負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
- Pull:主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。拉取型消費者主動從broker中拉取消息消費,只要拉取到消息,就會啟動消費過程,稱為主動型消費。
- Push:Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。推送型消費者就是要注冊消息的監聽器,監聽器是要用戶自行實現的。當消息達到broker服務器后,會觸發監聽器拉取消息,然后啟動消費過程。但是從實際上看還是從broker中拉取消息,稱為被動消費型。
- push:消費端慢的話導致消費端緩沖區溢出。
- pull:考慮拉的頻率,可能導致很多無效請求的RPC開銷影響整體網絡性能。
- Consumer (消費者):負責消費消息,一般是后台系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、並將其提供給應用程序。從用戶應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
-
Broker Server :消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息並存儲、同時為消費者的拉取請求作准備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
-
TOPIC:主題,表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬於一個主題,是RocketMQ進行消息訂閱的基本單位。生產者在其中傳遞消息,消費者在其中提取消息。一個Topic可能有0個、一個或多個生產者向它發送消息;從消費者的角度來看一個主題可以由零個、一個或多個消費者群體訂閱。類似地,一個消費者組可以訂閱一個或多個主題,只要該組的實例保持訂閱一致。
- message queue:消息隊列,一個Topic可以划分成多個消息隊列。Topic只是個邏輯上的概念,消息隊列是消息的物理管理單位,當發送消息的時候,Broker會輪詢包含該Topic的所有消息隊列,然后將消息發出去。有了消息隊列,可以使得消息的存儲可以分布式集群化,具有了水平的擴展能力。
- message:消息系統所傳輸信息的物理載體,生產和消費數據的最小單位,每條消息必須屬於一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
- message order:當使用DefaultMQPushConsumer時,可以決定有序或並發地使用消息.
- Orderly:有序地使用消息意味着對於每個消息隊列,消息的使用順序與生產者發送消息的順序相同。如果您正在處理全局順序是強制性的場景,請確保您使用的Topic只有一個消息隊列;消費者通過同一個消息隊列( Topic 分區,稱作 Message Queue) 收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。如果指定了有序消費,則消息消費的最大並發性是消費組訂閱的消息隊列的數量。
- Concurrently:當並發地使用消息時,消息使用的最大並發性僅受為每個客戶端指定的線程池的限制;在此模式下不再保證消息順序。
- 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
- message order:當使用DefaultMQPushConsumer時,可以決定有序或並發地使用消息.
- tag:為消息設置的標志,用於同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設置不同標簽。標簽能夠有效地保持代碼的清晰度和連貫性,並優化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
- offset:是指消息隊列中的offset,可以認為就是下標,消息隊列可看做數組。offset是java long型,64位,理論上100年不會溢出,所以可以認為消息隊列是一個長度無限的數據結構。
- RocketMQ支持按照下面兩種維度(“按照Message Id查詢消息”、“按照Message Key查詢消息”)進行消息查詢。
RocketMQ集群的工作流程?
- 啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當於一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
- 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
- Producer發送消息,啟動時先跟NameServer集群中的其中一台建立長連接,並從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
RocketMQ消息存儲設計?
RocketMQ的設計理念很大程度借鑒了kafka,RocketMQ消息存儲是整個系統的核心,直接決定着吞吐性能和高可用性;RocketMQ存儲消息是直接操作文件,借助java NIO的力量,使得I/O性能十分高。當消息來的時候,順序寫入CommitLog。為了Consumer消費消息的時候,能夠方便的根據topic查詢消息,在CommitLog的基礎上衍生出了ConsumerQueue文件,存放了某topic的消息在CommitLog中的偏移位置。此外為了支持根據消息key查詢消息,RocketMQ的強大的支持消息索引的特性靠的就是indexFile索引文件。
- CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G, 文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當第一個文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。
- CommitLog文件的最大的一個特點就是消息順序寫入日志文件,當文件滿了,寫入下一個文件;隨機讀寫,關於commitLog的文件的落盤有兩種,一種是同步刷盤,一種是異步刷盤,可通過 flushDiskType 進行配置。
- CommitLog除了消息本身,它記錄了消息的方方面面的信息,通過一條CommitLog可以還原出很多東西。例如消息是何時、由哪個producer發送的,被發送到了哪個消息隊列,屬於哪個topic,有哪些屬性等等。RokcetMQ存儲的消息其實存儲的就是這個CommitLog記錄;可以將CommitLog記錄等同於消息,而CommitLog指存儲消息的文件。
- CommitLog類屬性很多,但是最重要的是mappedFileQueue屬性。消息最終存儲在CommitLog里,實際上CommitLog是一個邏輯上的概念。真正的文件是一個個MappedFile,然后組成了mappedFileQueue。一個MappedFile最多能存放1G的CommitLog,這個大小在MessageStoreConfi類里面定義了的。
- MappedFile 中WriteBuffer使用的是堆外內存,MappedByteBuffer是直接將文件映射到內存中,兩者的使用是互斥的。如果啟用了臨時緩沖池(默認不啟用),那么就會使用WriteBuffer寫CommitLog,否則就是MappedByteBuffer寫CommitLog。
- ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由於RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設計,每一個條目共20個字節,分別為8字節的commitlog物理偏移量、4字節的消息長度、8字節tag hashcode,單個文件由30W個條目組成,可以像數組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;
- IndexFile:IndexFile(索引文件)提供了一種可以通過key或時間區間來查詢消息的方法。Index文件的存儲位置是:\(HOME \store\index\){fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故RocketMQ的索引文件其底層實現為hash索引。
在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個實例下所有的隊列共用一個日志數據文件(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體內容都存儲於一個CommitLog中)針對Producer和Consumer分別采用了數據和索引部分相分離的存儲結構,Producer發送消息至Broker端,然后Broker端使用同步或者異步的方式對消息刷盤持久化,保存至CommitLog中。只要消息被刷盤持久化至磁盤文件CommitLog中,那么Producer發送的消息就不會丟失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息后,可以等下一次消息拉取,同時服務端也支持長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,只要這段時間內有新消息到達,將直接返回給消費端。這里,RocketMQ的具體做法是,使用Broker端的后台服務線程—ReputMessageService不停地分發請求並異步構建ConsumeQueue(邏輯消費隊列)和IndexFile(索引文件)數據。
說說RocketMQ存儲底層實現?
- MappedByteBuffer
- RocketMQ主要通過MappedByteBuffer對文件進行讀寫操作。其中,利用了NIO中的FileChannel模型將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減少了傳統IO將磁盤文件數據在操作系統內核地址空間的緩沖區和用戶應用程序地址空間的緩沖區之間來回進行拷貝的性能開銷),將對文件的操作轉化為直接對內存地址進行操作,從而極大地提高了文件的讀寫效率(正因為需要使用內存映射機制,故RocketMQ的文件存儲都使用定長結構來存儲,方便一次將整個文件映射至內存)。
- PageCache
- 是OS對文件的緩存,用於加速對文件的讀寫。一般來說,程序對文件進行順序讀寫的速度幾乎接近於內存的讀寫速度,主要原因就是由於OS使用PageCache機制對讀寫訪問操作進行了性能優化,將一部分的內存用作PageCache。對於數據的寫入,OS會先寫入至Cache內,隨后通過異步的方式由pdflush內核線程將Cache內的數據刷盤至物理磁盤上。對於數據的讀取,如果一次讀取文件時出現未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時,會順序對其他相鄰塊的數據文件進行預讀取。
- 在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的數據較少,並且是順序讀取,在page cache機制的預讀取作用下,Consume Queue文件的讀性能幾乎接近讀內存,即使在有消息堆積情況下也不會影響性能。而對於CommitLog消息存儲的日志數據文件來說,讀取消息內容時候會產生較多的隨機訪問讀取,嚴重影響性能。如果選擇合適的系統IO調度算法,比如設置調度算法為“Deadline”(此時塊存儲采用SSD的話),隨機讀的性能也會有所提升。
說說RocketMQ文件存儲模型層次結構?
- RocketMQ業務處理器層
- Broker端對消息進行讀取和寫入的業務邏輯入口,這一層主要包含了業務邏輯相關處理操作(根據解析RemotingCommand中的RequestCode來區分具體的業務操作類型,進而執行不同的業務處理流程),比如前置的檢查和校驗步驟、構造MessageExtBrokerInner對象、decode反序列化、構造Response返回對象等。
- RocketMQ數據存儲組件層
- 該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息數據文件的訪問入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志數據文件進行讀寫操作(具體的讀寫訪問操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該組件初始化時候,還會啟動很多存儲相關的后台服務線程,包括AllocateMappedFileService(MappedFile預分配服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統計服務線程)、IndexService(索引文件服務線程)等。
- RocketMQ存儲邏輯對象層
- 該層主要包含了RocketMQ數據文件存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引數據文件提供訪問服務,ConsumerQueue為邏輯消息隊列提供訪問服務,CommitLog則為消息存儲的日志數據文件提供訪問服務。這三個模型類也是構成了RocketMQ存儲層的整體結構(對於這三個模型類的深入分析將放在后續篇幅中)。
- 封裝的文件內存映射層
- RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成數據文件的讀寫。其中,采用MappedByteBuffer這種內存映射磁盤文件的方式完成對大文件的讀寫,在RocketMQ中將該類封裝成MappedFile類。這里限制的問題在上面已經講過;對於每類大文件(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的文件(單個IndexFile文件大小約為400M、單個ConsumerQueue文件大小約5.72M、單個CommitLog文件大小為1G),其中每個分隔文件的文件名為前面所有文件的字節大小數+1,即為文件的起始偏移量,從而實現了整個大文件的串聯。這里,每一種類的單個文件均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/隨機讀、內存數據刷盤、內存清理等和文件相關的服務)。
- 磁盤存儲層
- 主要指的是部署RocketMQ服務器所用的磁盤。這里,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(如IOPS、吞吐量和訪問時延等指標)對順序寫/隨機讀操作帶來的影響。
如何保證 RocketMQ 不丟失消息?
一條消息從生產到被消費,將會經歷生產階段、存儲階段、消費階段三個階段。
-
生產階段,Producer 新建消息,然后通過網絡將消息投遞給 MQ Broker。
-
生產者(Producer) 通過網絡發送消息給 Broker,當 Broker 收到之后,將會返回確認響應信息給 Producer;所以生產者只要接收到返回的確認響應,就代表消息在生產階段未丟失。
-
返回消息方式可以是同步也可以是異步,但不管是同步還是異步的方式,都會碰到網絡問題導致發送失敗的情況。針對這種情況,我們可以設置合理的重試次數,當出現網絡問題,可以自動重試。
// 同步發送消息重試次數,默認為 2 mqProducer.setRetryTimesWhenSendFailed(3); // 異步發送消息重試次數,默認為 2 mqProducer.setRetryTimesWhenSendAsyncFailed(3);
-
-
存儲階段,消息將會存儲在 Broker 端磁盤中。
-
默認情況下,消息只要到了 Broker 端,將會優先保存到內存中,然后立刻返回確認響應給生產者。隨后 Broker 定期批量的將一組消息從內存異步刷入磁盤。這種方式減少 I/O 次數,可以取得更好的性能,但是如果發生機器掉電,異常宕機等情況,消息還未及時刷入磁盤,就會出現丟失消息的情況。
-
若想保證 Broker 端不丟消息,保證消息的可靠性,我們需要將消息保存機制修改為同步刷盤方式,即消息存儲磁盤成功,才會返回響應。若 Broker 未在同步刷盤時間內(默認為 5s)完成刷盤,將會返回
SendStatus.FLUSH_DISK_TIMEOUT
狀態給生產者。 -
集群部署:為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丟失,消息還需要復制到 slave 節點。默認方式下,消息寫入 master 成功,就可以返回確認響應給生產者,接着消息將會異步復制到 slave 節點。此時若 master 突然宕機且不可恢復,那么還未復制到 slave 的消息將會丟失。為了進一步提高消息的可靠性,我們可以采用同步的復制方式,master 節點將會同步等待 slave節點復制完成,才會返回確認響應。提高消息的高可靠性,但是會降低性能,生產實踐中需要綜合選擇。
## master 節點配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER ## slave 節點配置 brokerRole=slave flushDiskType = SYNC_FLUSH
-
-
消費階段, Consumer 將會從 Broker 拉取消息。
-
消費者從 broker 拉取消息,然后執行相應的業務邏輯。一旦執行成功,將會返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
狀態給 Broker。如果 Broker 未收到消費確認響應或收到其他狀態,消費者下次還會再次拉取到該條消息,進行重試。這樣的方式有效避免了消費者消費過程發生異常,或者消息在網絡傳輸中丟失的情況。
-
說說RocketMQ同步異步復制和刷盤?
- 復制
- 為了確保成功發布的消息不會丟失,RocketMQ提供了同步和異步兩種復制模式獲得更強的持久性和更高的可用性。
- 同步Broker要等到提交日志被復制到從服務器后才進行確認。
- 相反,異步Broker在主服務器上處理消息后立即返回。
- 刷盤
- 同步刷盤:在消息達到Broker的內存之后,必須刷到commitLog日志文件中才算成功,然后返回Producer數據已經發送成功。
- 異步刷盤:異步刷盤是指消息達到Broker內存后就返回Producer數據已經發送成功,會喚醒一個線程去將數據持久化到CommitLog日志文件中。
優缺點分析:同步刷盤保證了消息不丟失,但是響應時間相對異步刷盤要多出10%左右,適用於對消息可靠性要求比較高的場景。異步刷盤的吞吐量比較高,RT小,但是如果broker斷電了內存中的部分數據會丟失,適用於對吞吐量要求比較高的場景。
說說RocketMQ負載均衡?
RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。
nameServer保存着Topic的路由信息,路由記錄了broker集群節點的通訊地址,broker的名稱以及讀寫隊列數量等信息。寫隊列writeQueue表示生產者可以寫入的隊列數,如果不做配置默認為4,也就是queueId是0,1,2,3.broker收到消息后根據queueId生成消息隊列,生產者負載均衡的過程的實質就是選擇broker集群和queueId的過程。讀隊列readQueue表示broker中可以供消費者讀取信息的隊列個數,默認也是4個,也就是queueId也是0,1,2,3。消費者拿到路由信息后會選擇queueId,從對應的broker中讀取數據消費
- Producer的負載均衡
- Producer端在發送消息的時候,會先根據Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶端在默認方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息。具體的容錯策略均在MQFaultStrategy這個類中定義。這里有一個sendLatencyFaultEnable開關變量,如果開啟,在隨機遞增取模的基礎上,再過濾掉not available的Broker代理。所謂的"latencyFaultTolerance",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關閉,采用隨機遞增取模的方式選擇一個隊列(MessageQueue)來發送消息,latencyFaultTolerance機制是實現消息發送高可用的核心關鍵所在。簡單的說選擇的標准:盡量不選剛剛選過的broker,盡量不選發送上條消息延遲過高或沒有響應的broker,也就是找到一個可用的
- Consumer的負載均衡
- 將MessageQueue中的消息隊列分配到消費者組里的具體消費者;Consumer在啟動的時候會實例化RebalanceImpl,這個類負責消費端的負載均衡。在Consumer實例的啟動流程中的啟動MQClientInstance實例部分,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執行一次)。通過查看源碼可以發現,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法,該方法是實現Consumer端負載均衡的核心
- 負載均衡算法
- 平均分配算法
- 環形算法
- 指定機房算法
- 就近機房算法
- 一致性哈希算法
- 手動配置算法
RocketMQ如何保證順序消息?
- 在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。順序消費不能是並發的。
- 怎么保證消息發到同一個queue里?RocketMQ給我們提供了MessageQueueSelector接口,可以重寫里面的接口,實現自己的算法,比如判斷i%2==0,那就發送消息到queue1否則發送到queue2。
RocketMQ如何實現消息去重?
- 這個得依賴於消息的冪等性原則:就是用戶對於同一種操作發起的多次請求的結果是一樣的,不會因為操作了多次就產生不一樣的結果。只要保持冪等性,不管來多少條消息,最后處理結果都一樣,需要Consumer端自行實現。
- 在RocketMQ去重的方案:因為每個消息都有一個MessageId, 保證每個消息都有一個唯一鍵,可以是數據庫的主鍵或者唯一約束,也可以是Redis緩存中的鍵,當消費一條消息前,先檢查數據庫或緩存中是否存在這個唯一鍵,如果存在就不再處理這條消息,如果消費成功,要保證這個唯一鍵插入到去重表中。
說說RocketMQ分布式事務消息?
半消息:是指暫時還不能被Consumer消費的消息,Producer成功發送到broker端的消息,但是此消息被標記為“暫不可投遞”狀態,只有等Producer端執行完本地事務后經過二次確認了之后,Consumer才能消費此條消息。主要分為正常事務消息的發送及提交、事務消息的補償流程兩大塊。RocketMQ事務消息依賴半消息,二次確認以及消息回查機制。
- 1、Producer向broker發送半消息
- 2、Producer端收到響應,消息發送成功,此時消息是半消息,標記為“不可投遞”狀態,Consumer消費不了。
- 3、Producer端執行本地事務。
- 4、正常情況本地事務執行完成,Producer向Broker發送Commit/Rollback,如果是Commit,Broker端將半消息標記為正常消息,Consumer可以消費,如果是Rollback,Broker丟棄此消息。
- 5、異常情況,Broker端遲遲等不到二次確認。在一定時間后,會查詢所有的半消息,然后到Producer端查詢半消息的執行情況。
- 6、Producer端查詢本地事務的狀態
- 7、根據事務的狀態提交commit/rollback到broker端。(5,6,7是消息回查)
簡單歸納RocketMQ高性能原因?
-
網絡模型,RocketMQ 使用 Netty 框架實現高性能的網絡傳輸,也遵循了Reactor多線程模型,同時又在這之上做了一些擴展和優化。而Netty高性能我們在前一篇文章也以學習過,這里就不重復說了。
-
順序寫、隨機讀、零拷貝。
-
多主多從,創建topic時,多個message queue可以在多個broker上,master提供讀寫,從broker可以分擔讀消息的壓力。
-
同步復制和異步復制。
-
同步刷盤和異步刷盤(PageCache)。
-
同步和異步發送消息。
-
業務線程池隔離,RocketMQ 對 Broker 的線程池進行了精細的隔離。使得消息的生產、消費、客戶端心跳、客戶端注冊等請求不會互相干擾。
-
並行消費和批量消費。
最后:去哪兒網開源的QMQ消息中間件也可以好好的研究,功能非常齊全,消息中間件的應用是比較簡單的,更多應該思考和理解主流開源中間件Kafka、RocketMQ、QMQ、Palsar等的設計思想。