常用MQ介紹及對比--《MQ詳解及四大MQ比較》
RocketMQ環境搭建--《RocketMQ之三:RocketMQ集群環境搭建》
RocketMQ物理部署結構
RocketMQ的消息存儲--《RocketMQ之六:RocketMQ消息存儲》
RocketMQ各角色基本數據結構
RocketMQ生產者發送消息過程
RocketMQ消費者
RocketMQ Broker
RocketMQ優化
一、RocketMQ介紹
RocketMQ 是阿里巴巴開源的分布式消息中間件。支持事務消息、順序消息、批量消息、定時消息、消息回溯等。它里面有幾個區別於標准消息中件間的概念,如Group、Topic、Queue等。系統組成則由Producer、Consumer、Broker、NameServer等。
RocketMQ 特點
- 是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式等特點
- Producer、Consumer、隊列都可以分布式
- Producer 向一些隊列輪流發送消息,隊列集合稱為 Topic,Consumer 如果做廣播消費,則一個 Consumer 實例消費這個 Topic 對應的所有隊列,如果做集群消費,則多個 Consumer 實例平均消費這個 Topic 對應的隊列集合
- 能夠保證嚴格的消息順序
- 支持拉(pull)和推(push)兩種消息模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
- 支持多種消息協議,如 JMS、OpenMessaging 等
- 較少的依賴
1.1、RocketMQ的核心概念
消息隊列 RocketMQ 在任何一個環境都是可擴展的,生產者必須是一個集群,消息服務器必須是一個集群,消費者也同樣。集群級別的高可用,是消息隊列 RocketMQ 跟其他的消息服務器的主要區別,消息生產者發送一條消息到消息服務器,消息服務器會隨機的選擇一個消費者,只要這個消費者消費成功就認為是成功了。
注意:文中所提及的消息隊列 RocketMQ 的服務端或者服務器包含 Name Server、Broker 等。服務端不等同於 Broker。
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。Broker 在實際部署過程中對應一台服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲於不同的 Broker。Message Queue 用於存儲消息的物理地址,每個Topic中的消息地址存儲於多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
圖中所涉及到的概念如下所述:
- Name Server: 名稱服務充當路由消息的提供者。是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。在消息隊列 RocketMQ 中提供命名服務,更新和發現 Broker 服務。
NameServer
即名稱服務,兩個功能:- 接收
broker
的請求,注冊broker
的路由信息 - 接收
client(producer/consumer)
的請求,根據某個topic
獲取其到broker
的路由信息NameServer
沒有狀態,可以橫向擴展。每個broker
在啟動的時候會到NameServer
注冊;Producer
在發送消息前會根據topic
到NameServer
獲取路由(到broker
)信息;Consumer
也會定時獲取topic
路由信息。
- 接收
- Broker:消息中轉角色,負責存儲消息,轉發消息。可以理解為消息隊列服務器,提供了消息的接收、存儲、拉取和轉發服務。
broker
是RocketMQ的核心,它不不能掛的,所以需要保證broker
的高可用。
broker分為 Master Broker 和 Slave Broker,一個 Master Broker 可以對應多個 Slave Broker,但是一個 Slave Broker 只能對應一個 Master Broker。
Master與Slave的對應關系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。
每個Broker與Name Server集群中的所有節點建立長連接,定時注冊Topic信息到所有Name Server。Broker 啟動后需要完成一次將自己注冊至 Name Server 的操作;隨后每隔 30s 定期向 Name Server 上報 Topic 路由信息。
- 生產者:與 Name Server 集群中的其中一個節點(隨機)建立長鏈接(Keep-alive),定期從 Name Server 讀取 Topic 路由信息,並向提供 Topic 服務的 Master Broker 建立長鏈接,且定時向 Master Broker 發送心跳。
- 消費者:與 Name Server 集群中的其中一個節點(隨機)建立長連接,定期從 Name Server 拉取 Topic 路由信息,並向提供 Topic 服務的 Master Broker、Slave Broker 建立長連接,且定時向 Master Broker、Slave Broker 發送心跳。Consumer 既可以從 Master Broker 訂閱消息,也可以從 Slave Broker 訂閱消息,訂閱規則由 Broker 配置決定。
另外,Broker中還存在一些非常重要的名詞需要說明:
- 2.1、Topic、Queue、tags
RocketMQ的Topic/Queue和JMS中的Topic/Queue概念有一定的差異,JMS中所有消費者都會消費一個Topic消息的副本,而Queue中消息只會被一個消費者消費;但到了RocketMQ中Topic只代表普通的消息隊列,而Queue是組成Topic的更小單元。
topic
:
表示消息的第一級類型,比如一個電商系統的消息可以分為:交易消息、物流消息...... 一條消息必須有一個Topic。
- Queue:主題被划分為一個或多個子主題,稱為“message queues”。一個
topic
下,我們可以設置多個queue(消息隊列)
。當我們發送消息時,需要要指定該消息的topic
。RocketMQ會輪詢該topic
下的所有隊列,將消息發送出去。
定義:Queue是Topic在一個Broker上的分片,在分片基礎上再等分為若干份(可指定份數)后的其中一份,是負載均衡過程中資源分配的基本單元。
集群消費模式下一個消費者只消費該Topic中部分Queue中的消息,當一個消費者開啟廣播模式時則會消費該Topic下所有Queue中的消息。
先看一張有關Topic和Queue的關系圖:
- Tags
Tags是Topic下的次級消息類型/二級類型(注:Tags也支持TagA || TagB
這樣的表達式),可以在同一個Topic下基於Tags進行消息過濾。Tags的過濾需要經過兩次比對,首先會在Broker端通過Tag hashcode進行一次比對過濾,匹配成功傳到consumer端后再對具體Tags進行比對,以防止Tag hashcode重復的情況。比如交易消息又可以分為:交易創建消息,交易完成消息..... 一條消息可以沒有Tag
。RocketMQ提供2級消息分類,方便大家靈活控制。標簽,換句話說,為用戶提供了額外的靈活性。有了標簽,來自同一個業務模塊的不同目的的消息可能具有相同的主題和不同的標簽。標簽將有助於保持您的代碼干凈和連貫,並且標簽還可以為RocketMQ提供的查詢系統提供幫助。
Queue中具體的存儲單元結構如下圖,最后面的8個Byte存儲Tag信息。

- 3.1、Producer 與 Producer Group
Producer
表示消息隊列的生產者。消息隊列的本質就是實現了publish-subscribe模式,生產者生產消息,消費者消費消息。所以這里的Producer
就是用來生產和發送消息的,一般指業務系統。RocketMQ提供了發送:普通消息(同步、異步和單向(one-way)消息)、定時消息、延時消息、事務消息。見1.2 消息類型章節
Producer Group
是一類Producer
的集合名稱,這類Producer
通常發送一類消息,且發送邏輯一致。相同角色的生產者被分組在一起。同一生產者組的另一個生產者實例可能被broker聯系,以提交或回滾事務,以防原始生產者在交易后崩潰。
警告:考慮提供的生產者在發送消息時足夠強大,每個生產者組只允許一個實例,以避免對生產者實例進行不必要的初始化。
- 4.1、Consumer 與 Consumer Group
Consumer:消息消費者,一般由業務后台系統異步的消費消息。
Push Consumer:
Consumer 的一種,應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法。Pull Consumer:
Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。
Consumer Group:Consumer Group
是一類Consumer
的集合名稱,這類Consumer
通常消費一類消息,且消費邏輯一致(使用相同 Group ID 的訂閱者屬於同一個集群。同一個集群下的訂閱者消費邏輯必須完全一致(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點)。消費者群體是 一個偉大的概念,它實現了負載平衡和容錯的目標,在信息消費方面,是非常容易的。
警告:消費者群體的消費者實例必須訂閱完全相同的主題。
1.2、組件的關系
1.2.1、Broker,Producer和Consumer
如果不考慮負載均衡和高可用,最簡單的Broker,Producer和Consumer之間的關系如下圖所示:

1.2.2、Topic,Topic分片和Queue
Queue是RocketMQ中的另一個重要概念。在對該概念進行分析介紹前,我們先來看上面的這張圖:

從本質上來說,RocketMQ中的Queue是數據分片的產物。為了更好地理解Queue的定義,我們還需要引入一個新的概念:Topic分片。在分布式數據庫和分布式緩存領域,分片概念已經有了清晰的定義。同理,對於RocketMQ,一個Topic可以分布在各個Broker上,我們可以把一個Topic分布在一個Broker上的子集定義為一個Topic分片。對應上圖,TopicA有3個Topic分片,分布在Broker1,Broker2和Broker3上,TopicB有2個Topic分片,分布在Broker1和Broker2上,TopicC有2個Topic分片,分布在Broker2和Broker3上。
將Topic分片再切分為若干等分,其中的一份就是一個Queue。每個Topic分片等分的Queue的數量可以不同,由用戶在創建Topic時指定。
queue數量指定方式:
1、代碼指定:producer.setDefaultTopicQueueNums(8);
2、配置文件指定
同時設置broker服務器的配置文件broker.properties:defaultTopicQueueNums=16
3、rocket-console控制台指定
我們知道,數據分片的主要目的是突破單點的資源(網絡帶寬,CPU,內存或文件存儲)限制從而實現水平擴展。RocketMQ 在進行Topic分片以后,已經達到水平擴展的目的了,為什么還需要進一步切分為Queue呢?
解答這個問題還需要從負載均衡說起。以消息消費為例,借用Rocket MQ官方文檔中的Consumer負載均衡示意圖來說明:

如圖所示,TOPIC_A在一個Broker上的Topic分片有5個Queue,一個Consumer Group內有2個Consumer按照集群消費的方式消費消息,按照平均分配策略進行負載均衡得到的結果是:第一個 Consumer 消費3個Queue,第二個Consumer 消費2個Queue。如果增加Consumer,每個Consumer分配到的Queue會相應減少。Rocket MQ的負載均衡策略規定:Consumer數量應該小於等於Queue數量,如果Consumer超過Queue數量,那么多余的Consumer 將不能消費消息。
在一個Consumer Group內,Queue和Consumer之間的對應關系是一對多的關系:一個Queue最多只能分配給一個Consumer,一個Cosumer可以分配得到多個Queue。這樣的分配規則,每個Queue只有一個消費者,可以避免消費過程中的多線程處理和資源鎖定,有效提高各Consumer消費的並行度和處理效率。
由此,我們可以給出Queue的定義:
Queue是Topic在一個Broker上的分片等分為指定份數后的其中一份,是負載均衡過程中資源分配的基本單元。
二、RocketMQ發布訂閱大體流程
a、producer生產者連接nameserver,產生數據放入不同的topic;
b、對於RocketMQ,一個Topic可以分布在各個Broker上,我們可以把一個Topic分布在一個Broker上的子集定義為一個Topic分片;
c、將Topic分片再切分為若干等分,其中的一份就是一個Queue。每個Topic分片等分的Queue的數量可以不同,由用戶在創建Topic時指定。
d、consumer消費者連接nameserver,根據broker分配的Queue來消費數據。
三、消息的種類
3.1、按照發送的特點分:
- 同步消息
- 異步消息
- 單向消息
1)同步消息(可靠同步發送):同步發送是指消息發送方發出數據后,會阻塞直到MQ服務方發回響應消息。應用場景:此種方式應用場景非常廣泛,例如重要通知郵件、報名短信通知、營銷短信系統等。
關鍵代碼:SendResult sendResult = producer.send(msg);
2)異步消息(可靠異步發送):異步發送是指發送方發出數據后,不等接收方發回響應,接着發送下個數據包的通訊方式。MQ 的異步發送,需要用戶實現異步發送回調接口(SendCallback),在執行消息的異步發送時,應用不需要等待服務器響應即可直接返回,通過回調接口接收服務器響應, 並對服務器的響應結果進行處理。應用場景:異步發送一般用於鏈路耗時較長,對 RT 響應時間較為敏感的業務場景,例如用戶視頻上傳后通知啟動轉碼服務,轉碼完成后通知推送轉碼結果等。
關鍵代碼:producer.sendAsync(msg, new SendCallback() {//...});
3)單向(one-way)消息:單向(Oneway)發送特點為只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。應用場景:適用於某些耗時非常短,但對可靠性要求並不高的場景,例如日志收集。
單向只發送,不等待返回,所以速度最快,一般在微秒級,但可能丟失
關鍵代碼:
producer.sendOneway(msg);
3.2、按照使用功能特點分:
- 普通消息(訂閱)--見《RocketMQ之二:分布式開放消息系統RocketMQ的原理與實踐(消息的順序問題、重復問題、可靠消息/事務消息)》
- 順序消息--見《RocketMQ之二:分布式開放消息系統RocketMQ的原理與實踐(消息的順序問題、重復問題、可靠消息/事務消息)》
- 廣播消息
- 延時消息
- 批量消息
- 事務消息
4)定時消息
// 定時消息,單位毫秒(ms),在指定時間戳(當前時間之后)進行投遞,例如 2016-03-07 16:21:00 投遞。如果被設置成當前時間戳之前的某個時刻,消息將立刻投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 發送消息,只要不拋異常就是成功
SendResult sendResult = producer.send(msg);
5)延時消息
Message sendMsg = new Message(topic, tags, message.getBytes()); sendMsg.setDelayTimeLevel(delayLevel); // 默認3秒超時 SendResult sendResult = rocketMQProducer.send(sendMsg);
6)事務消息
RocketMQ提供類似X/Open XA的分布式事務功能來確保業務發送方和MQ消息的最終一致性,其本質是通過半消息(prepare消息和commit消息)的方式把分布式事務放在MQ端來處理。
其中:
1,發送方向消息隊列 RocketMQ 服務端發送消息。
2,服務端將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
3,發送方開始執行本地事務邏輯。
4,發送方根據本地事務執行結果向服務端提交二次確認(Commit 或是 Rollback),服務端收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;服務端收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
補償流程:
5,在斷網或者是應用重啟的特殊情況下,上述步驟 4 提交的二次確認最終未到達服務端,經過固定時間后服務端將對該消息發起消息回查。
6,發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
7,發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟 4 對半消息進行操作。
RocketMQ的半消息機制的注意事項是
1,根據第六步可以看出他要求發送方提供業務回查接口。
2,不能保證發送方的消息冪等,在ack沒有返回的情況下,可能存在重復消息
3,消費方要做冪等處理。
更多介紹見《RocketMQ之二:分布式開放消息系統RocketMQ的原理與實踐(消息的順序問題、重復問題、可靠消息/事務消息)》單獨對事務消息說明。
四、消息發布和訂閱
在RocketMQ中,producer發布消息,consumer訂閱消息。消息的收發模型如下圖:
4.1、producer端,所有消息發布原理圖
producer完全無狀態,可以集群部署。
4.2、consumer端,consumer有兩種消息的獲取模式
- 一種是Push模式,即MQServer主動向消費端推送;
- 另外一種是Pull模式,即消費端在需要時,主動到MQServer拉取。
但在具體實現時,Push和Pull模式都是采用消費端主動拉取的方式。
消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:

Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求后,如果有消息就立即返回數據,Consumer端收到返回的消息后,再回調消費者設置的Listener方法。如果broker在收到Pull請求時,消息隊列里沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。
當然,Consumer端是通過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>
中的PullRequest
發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest
時,如果發現沒有消息,就會把PullRequest
扔到ConcurrentHashMap中緩存起來。
broker在啟動時,會啟動一個線程不停的從ConcurrentHashMap取出PullRequest
檢查,直到有數據返回。
4.3、consumer端,consumer有兩種消息消費模式
基本概念
消息隊列 RocketMQ 是基於發布/訂閱模型的消息系統。消息的訂閱方訂閱關注的 Topic,以獲取並消費消息。由於訂閱方應用一般是分布式系統,以集群方式部署有多台機器。因此消息隊列 RocketMQ 約定以下概念。
集群:使用相同 Group ID 的訂閱者屬於同一個集群。同一個集群下的訂閱者消費邏輯必須完全一致(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點。
集群消費:當使用集群消費模式時,消息隊列 RocketMQ 認為任意一條消息只需要被集群內的任意一個消費者處理即可。
一個Consumer Group
中的Consumer
實例平均分攤消費消息。例如某個Topic
有 9 條消息,其中一個Consumer Group
有 3 個實例(可能是 3 個進程,或者 3 台機器),那么每個實例只消費其中的 3 條消息。
廣播消費:當使用廣播消費模式時,消息隊列 RocketMQ 會將每條消息推送給集群內所有注冊過的客戶端,保證消息至少被每台機器消費一次。
一條消息被多個Consumer
消費,即使這些Consumer
屬於同一個Consumer Group
,消息也會被Consumer Group
中的每個Consumer
都消費一次。在廣播消費中的Consumer Group
概念可以認為在消息划分方面無意義。
場景對比
集群消費模式:
適用場景&注意事項
- 消費端集群化部署,每條消息只需要被處理一次。
- 由於消費進度在服務端維護,可靠性更高。
- 集群消費模式下,每一條消息都只會被分發到一台機器上處理。如果需要被集群下的每一台機器都處理,請使用廣播模式。
- 集群消費模式下,不保證每一次失敗重投的消息路由到同一台機器上,因此處理消息時不應該做任何確定性假設。
廣播消費模式:
適用場景&注意事項
- 廣播消費模式下不支持順序消息。
- 廣播消費模式下不支持重置消費位點。
- 每條消息都需要被相同邏輯的多台機器處理。
- 消費進度在客戶端維護,出現重復的概率稍大於集群模式。
- 廣播模式下,消息隊列 RocketMQ 保證每條消息至少被每台客戶端消費一次,但是並不會對消費失敗的消息進行失敗重投,因此業務方需要關注消費失敗的情況。
- 廣播模式下,客戶端每一次重啟都會從最新消息消費。客戶端在被停止期間發送至服務端的消息將會被自動跳過,請謹慎選擇。
- 廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。
- 目前僅 Java 客戶端支持廣播模式。
- 廣播模式下服務端不維護消費進度,所以消息隊列 RocketMQ 控制台不支持消息堆積查詢、消息堆積報警和訂閱關系查詢功能。
使用集群模式模擬廣播:
如果業務需要使用廣播模式,也可以創建多個 Group ID,用於訂閱同一個 Topic。
適用場景&注意事項
- 每條消息都需要被多台機器處理,每台機器的邏輯可以相同也可以不一樣。
- 消費進度在服務端維護,可靠性高於廣播模式。
- 對於一個 Group ID 來說,可以部署一個消費端實例,也可以部署多個消費端實例。 當部署多個消費端實例時,實例之間又組成了集群模式(共同分擔消費消息)。 假設 Group ID 1 部署了三個消費者實例 C1、C2、C3,那么這三個實例將共同分擔服務器發送給 Group ID 1 的消息。 同時,實例之間訂閱關系必須保持一致。
五、負載均衡
生產端負載均衡,看下圖:
producer發送消息負載均衡圖
首先分析一下RocketMQ的客戶端發送消息的源碼:
在整個應用生命周期內,生產者需要調用一次start方法來初始化,初始化主要完成的任務有:
如果沒有指定namesrv地址,將會自動尋址
啟動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向所有broker發送心跳...
啟動負載均衡的服務
初始化完成后,開始發送消息,發送消息的主要代碼如下:
代碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由信息並更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達到負載均衡的目的。
如果Producer發送消息失敗,會自動重試,重試的策略:
-
重試次數 < retryTimesWhenSendFailed(可配置)
-
總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
-
同時滿足上面兩個條件后,Producer會選擇另外一個隊列發送消息
消費端的負載均衡,先看下圖:
Producer
向一些隊列輪流發送消息,隊列集合稱為Topic
,Consumer
如果做廣播消費,則一個consumer
實例消費這個Topic
對應的所有隊列;如果做集群消費,則多個Consumer
實例平均消費這個Topic
對應的隊列集合。
上圖的集群模式里,每個consumer消費部分消息,這里的負載均衡是怎樣的呢?
消費端會通過RebalanceService線程,20秒鍾做一次基於topic下的所有隊列負載:
- 遍歷Consumer下的所有topic,然后根據topic訂閱所有的消息
- 獲取同一topic和Consumer Group下的所有Consumer
- 然后根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等
如同上圖所示:如果有 3 個隊列,2 個 consumer,那么第一個 Consumer 消費 2 個隊列,第二 consumer 消費 1 個隊列。這里采用的就是平均分配策略,它類似於我們的分頁,TOPIC下面的所有queue就是記錄,Consumer的個數就相當於總的頁數,那么每頁有多少條記錄,就類似於某個Consumer會消費哪些隊列。
通過這樣的策略來達到大體上的平均消費,這樣的設計也可以很方面的水平擴展Consumer來提高消費能力。
六、總結
1、異步復制和同步雙寫總結
2、集群方式對比
3、高可用演練場景