簡介
用官方的話來說,RcoketMQ 是一款低延遲、高可靠、可伸縮、易於使用的消息中間件,具有以下特性(ps:對於這些特性描述,大家簡單過一眼就即可,深入學習之后自然就明白了):
- 支持發布/訂閱(Pub/Sub)和點對點(P2P)消息模型
- 在一個隊列中可靠的先進先出(FIFO)和嚴格的順序傳遞
- 支持拉(pull)和推(push)兩種消息模式
- 單一隊列百萬消息的堆積能力
- 支持多種消息協議,如 JMS、MQTT 等
- 分布式高可用的部署架構,滿足至少一次消息傳遞語義
- 提供 docker 鏡像用於隔離測試和雲集群部署
- 提供配置、指標和監控等功能豐富的 Dashboard
專業術語
-
Producer
也就是常說的生產者,生產者的作用就是將消息發送到 MQ,生產者本身既可以產生消息,如讀取文本信息,將讀取的文本信息發送到 MQ。也可以對外提供接口,由外部應用來調用接口,生產者將收到的請求體內容發送到 MQ。擁有相同 Producer Group 的生產者稱為一個生產者集群。 -
Producer Group
生產者組,簡單來說發送同一類消息的多個生產者就是一個生產者組。 -
Consumer
也就是常說的消費者,接收 MQ 消息的應用程序就是一個消費者。擁有相同 Consumer Group 的消費者稱為一個消費者集群。 -
Consumer Group
消費者組,和生產者類似,消費同一類消息的多個消費者組成一個消費者組。 -
Topic
主題是對消息的邏輯分類,比如說有訂單類相關的消息,也有庫存類相關的消息,那么就需要進行分類,一個是訂單 Topic 專門用來存放訂單相關的消息,一個是庫存 Topic 專門用來存放庫存相關的消息。 -
Tag
標簽可以被認為是對主題的進一步細化,可以理解為二級分類,一般在相同業務模塊中通過引入標簽來標記不同用途,同時消費者也可以根據不同的標簽進行消息的過濾。 -
Broker
Broker 是 RocketMQ 系統的主要角色,就是前面一直說的 MQ。Broker 接收來自生產者的消息,儲存以及為消費者拉取消息的請求做好准備。 -
Name Server
Name Server 提供輕量級的服務發現和路由信息,每個 NameServer 記錄完整的路由信息,提供等效的讀寫服務,並支持快速存儲擴展。
NameService
- 可以理解為簡化的zk,起到一個注冊中心的作用
- 區別與ZK是他沒有監聽的概念,而是通過心跳包來維持自己與Broker之間的關系
- NameService集群之間的每個節點互相之間沒有通信,是無狀態的
- NameService的壓力不會太大,主要是維護Topic-Broker之間的映射關系
- 但若是broker中的topic信息量太大,broker向nameService注冊信息的時候會導致傳輸時間過長超時,NameService會誤判認為Broker下線
Broker
- 每台broker節點與所有的nameService保持長連接及心跳,並定時將Topic信息注冊到nameService中
- 每個topic默認創建4個隊列,相同的隊列中保證順序消費
- Broker同樣分為master和salve,相同的BrokerName,不同的BrokerId,一個master對應多個salve,一個salve只對應一個master
- Broker上存存topic信息,topic由多個隊列組成,隊列會均勻分布到所有的broker上
- Producer在發送消息時,會盡量平均分布到隊列中,這樣保證最終所有的消息在broker上是平均分配的
Producer
- producer與隨機的一個nameService節點建立長連接,定期從nameSerive中拉取topic-broker的映射信息
- 與提供topic的broker master建立一個長連接,producer每隔30秒向broker 發送一個心跳,broker每隔10秒掃描一下存活的鏈接
- Producer發送消息支持三種模式
- 同步
- 異步
- 單向
Comsumer
- comsumer同樣采用集群部署,支持pull、push兩種消費模式
- comsumer可分為廣播消息消費和集群消費
邏輯架構
由這張圖可以看到有四個集群,分別是 Name Server 集群、Broker 集群、Producer 集群和 Consumer 集群。
簡單說明一下圖中箭頭含義,從 Broker 開始,Broker Master1 和 Broker Slave1 是主從結構,它們之間會進行數據同步,即 Date Sync。同時每個 Broker 與 Name Server 集群中的所有節點建立長連接,定時注冊 Topic 信息到所有 Name Server 中。
生產者與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 獲取 Topic 路由信息,並向提供 Topic 服務的 Broker Master 建立長連接,且定時向 Broker Master 發送心跳。
消費者也是與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 獲取 Topic 路由信息。但是消費者與生產者不同,生產者只能將消息發送到 Broker master,消費者則可以同時和提供 Topic 服務的 Broker Master 和 Broker Slave 建立長連接,既可以從 Broker Master 訂閱消息,也可以從 Broker Slave 訂閱消息。

這三者是RocketMq中最最基本的概念。Producer是消息的生產者。Consumer是消息的消費者。消息通過Topic進行傳遞。Topic存放的是消息的邏輯地址。
具體來說是Producer將消息發往具體的Topic。Consumer訂閱Topic,主動拉取或被動接受消息。
實際上,Topic還需要拆封出更多概念
這張圖里有兩個生產者,ProducerA和ProducerB。定義了兩個Topic-TopicA和TopicB。ProducerA會發送兩種消息。
所以這里的知識點是一個Producer可以發中Topic。
TopicA有3個MessageQueue,MessageQueue記錄的是消息的物理存儲地址(在consumelog里的位置),分布在兩個broker上。Broker是一個集群部署架構上的概念,可以理解為對應的物理機器。最右邊是ConsumerGroup,每一組下又有多個Consumer,實際上也就是啟動的用來消費的JVM。一個Consumer可以訂閱多個不同的Topic。這里我有話要說,雖然從代碼層面上支持這種訂閱。但是強烈不建議一個Consumer訂閱多個不同的Topic。推薦用法是一組ConsumerGroup只訂閱一種Topic。
另外多組ConsumerGroup之間,對於同一個Topic是廣播訂閱的。(翻譯一下就是說:Topic的一條消息會廣播給所有訂閱的ConsumerGroup,就是每個ConsumerGroup都會收到),但是在一個ConsumerGroup內部給個Consumer是負載消費消息的,(翻譯一下就是:一條消息在一個group內只會被一個Consumer消費)
存儲模型
下面看看Rocketmq的消息實際是怎么存儲的?
左邊的是CommitLog。這個是真正存儲消息的地方。可以看出RocketMQ所有生產者的消息都是往這一個地方存的。
右邊是ConsumeQueue。這是一個邏輯隊列。和上文中Topic下的messageQueue是一一對應的。消費者是直接和ConsumeQueue打交道。ConsumeQueue記錄了消費位點,這個消費位點關聯了commitlog的位置。所以即使ConsumeQueue出問題,只要commitlog還在,消息就沒丟,可以恢復出來。還可以通過修改消費位點來重放或跳過一些消息。
部署模型
在部署RocketMQ時,會部署兩種角色。NameServer和Broker。NameServer主要做路由服務。生產者發送消息時,首先向NameServer拿到Topic的路由信息,即這個Topic在哪些Broker上有。Consumer也是一樣,需要知道消費隊列的路由情況。當然不是每次收發消息都去NameServer查詢一遍,簡單的說只有第一次初始化,和以后發送或這首出現問題時需要查詢一下。
Broker一般我們會部署主備兩個節點。
RocketMq沒有選舉,broker的角色是在部署時就人工確定好的。如果主掛了,備不會自動切換為主。
對於一個2主2備的集群來說,如果掛了一個主,是沒有問題的。只要另一個主上你之前也創建了Topic,那么發送的消息流量會導流到存活的主節點上,業務代碼端是無影響的。
示例代碼
生產者public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //需要一個producer group名字作為構造方法的參數,這里為producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調用start()方法啟動一個producer實例 producer.start(); //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調用producer的send()方法發送消息 //這里調用的是同步的方式,所以會有返回結果 SendResult sendResult = producer.send(msg); //打印返回結果,可以看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息之后,調用shutdown()方法關閉producer producer.shutdown(); } }
消費者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為consumer1 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTest", "*"); //設置一個Listener,主要進行消息的邏輯處理 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } }
RocketMQ的消息類型
普通消息 / 有序消息 / 延時消息
普通消息
普通消息也叫做無序消息,簡單來說就是沒有順序的消息,producer 只管發送消息,consumer 只管接收消息,至於消息和消息之間的順序並沒有保證,可能先發送的消息先消費,也可能先發送的消息后消費。
舉個簡單例子,producer 依次發送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通消息。
因為不需要保證消息的順序,所以消息可以大規模並發地發送和消費,吞吐量很高,適合大部分場景。
代碼示例:
- 生產者
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明並初始化一個producer //需要一個producer group名字作為構造方法的參數,這里為concurrent_producer DefaultMQProducer producer = new DefaultMQProducer("concurrent_producer"); //設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調用start()方法啟動一個producer實例 producer.start(); //發送10條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTestConcurrent",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調用producer的send()方法發送消息 //這里調用的是同步的方式,所以會有返回結果,同時默認發送的也是普通消息 SendResult sendResult = producer.send(msg); //打印返回結果,可以看到消息發送的狀態以及一些相關信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發送完消息之后,調用shutdown()方法關閉producer producer.shutdown(); } }
- 消費者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為concurrent_consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("concurrent_consumer"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag,*代表全部的Tag consumer.subscribe("TopicTestConcurrent", "*"); //設置一個Listener,主要進行消息的邏輯處理 //注意這里使用的是MessageListenerConcurrently這個接口 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消費狀態 //CONSUME_SUCCESS 消費成功 //RECONSUME_LATER 消費失敗,需要稍后重新消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } }
有序消息
有序消息就是按照一定的先后順序的消息類型。
舉個例子來說,producer 依次發送 order id 為 1、2、3 的消息到 broker,consumer 接到的消息順序也就是 1、2、3 ,而不會出現普通消息那樣的 2、1、3 等情況。
那么有序消息是如何保證的呢?我們都知道消息首先由 producer 到 broker,再從 broker 到 consumer,分這兩步走。那么要保證消息的有序,勢必這兩步都是要保證有序的,即要保證消息是按有序發送到 broker,broker 也是有序將消息投遞給 consumer,兩個條件必須同時滿足,缺一不可。
進一步還可以將有序消息分成
- 全局有序消息
- 局部有序消息
之前我們講過,topic 只是消息的邏輯分類,內部實現其實是由 queue 組成。當 producer 把消息發送到某個 topic 時,默認是會消息發送到具體的 queue 上。

舉個例子,producer 發送 order id 為 1、2、3、4 的四條消息到 topicA 上,假設 topicA 的 queue 數為 3 個(queue0、queue1、queue2),那么消息的分布可能就是這種情況,id 為 1 的在 queue0,id 為 2 的在 queue1,id 為 3 的在 queue2,id 為 4 的在 queue0。同樣的,consumer 消費時也是按 queue 去消費,這時候就可能出現先消費 1、4,再消費 2、3,和我們的預期不符。那么我們如何實現 1、2、3、4 的消費順序呢?道理其實很簡單,只需要把訂單 topic 的 queue 數改為 1,如此一來,只要 producer 按照 1、2、3、4 的順序去發送消息,那么 consumer 自然也就按照 1、2、3、4 的順序去消費,這就是全局有序消息。
由於一個 topic 只有一個 queue ,即使我們有多個 producer 實例和 consumer 實例也很難提高消息吞吐量。就好比過獨木橋,大家只能一個挨着一個過去,效率低下。
那么有沒有吞吐量和有序之間折中的方案呢?其實是有的,就是局部有序消息。

我們知道訂單消息可以再細分為訂單創建、訂單付款、訂單完成等消息,這些消息都有相同的 order id。同時,也只有按照訂單創建、訂單付款、訂單完成的順序去消費才符合業務邏輯。但是不同 order id 的消息是可以並行的,不會影響到業務。這時候就常見做法就是將 order id 進行處理,將 order id 相同的消息發送到 topicB 的同一個 queue,假設我們 topicB 有 2 個 queue,那么我們可以簡單的對 id 取余,奇數的發往 queue0,偶數的發往 queue1,消費者按照 queue 去消費時,就能保證 queue0 里面的消息有序消費,queue1 里面的消息有序消費。
由於一個 topic 可以有多個 queue,所以在性能比全局有序高得多。假設 queue 數是 n,理論上性能就是全局有序的 n 倍,當然 consumer 也要跟着增加才行。在實際情況中,這種局部有序消息是會比全局有序消息用的更多。
示例代碼:
- 生產者
public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { // 聲明並初始化一個producer // 需要一個producer group名字作為構造方法的參數,這里為ordered_producer DefaultMQProducer orderedProducer = new DefaultMQProducer("ordered_producer"); // 設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環境變量的方式設置,不一定非得寫死在代碼里 orderedProducer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); // 調用start()方法啟動一個producer實例 orderedProducer.start(); // 自定義一個tag數組 String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; // 發送10條消息到Topic為TopicTestOrdered,tag為tags數組按順序取值, // key值為“KEY”拼接上i的值,消息內容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { int orderId = i % 10; Message msg = new Message("TopicTestOrdered", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = orderedProducer.send(msg, new MessageQueueSelector() { // 選擇發送消息的隊列 @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // arg的值其實就是orderId Integer id = (Integer) arg; // mqs是隊列集合,也就是topic所對應的所有隊列 int index = id % mqs.size(); // 這里根據前面的id對隊列集合大小求余來返回所對應的隊列 return mqs.get(index); } }, orderId); System.out.println(sendResult); } orderedProducer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
至於是要實現全局有序,還是局部有序,在此示例代碼中,就取決於 TopicTestOrdered 這個 Topic 的隊列數了。
- 消費者
public class Consumer { public static void main(String[] args) throws MQClientException { //聲明並初始化一個consumer //需要一個consumer group名字作為構造方法的參數,這里為concurrent_consumer DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer"); //同樣也要設置NameServer地址 consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //這里設置的是一個consumer的消費策略 //CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息 //CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍 //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //設置consumer所訂閱的Topic和Tag consumer.subscribe("TopicTestOrdered", "TagA || TagC || TagD"); //設置一個Listener,主要進行消息的邏輯處理 //注意這里使用的是MessageListenerOrderly這個接口 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); //返回消費狀態 //SUCCESS 消費成功 //SUSPEND_CURRENT_QUEUE_A_MOMENT 消費失敗,暫停當前隊列的消費 return ConsumeOrderlyStatus.SUCCESS; } }); //調用start()方法啟動consumer consumer.start(); System.out.println("Consumer Started."); } }
延時消息
延時消息,簡單來說就是當 producer 將消息發送到 broker 后,會延時一定時間后才投遞給 consumer 進行消費。
RcoketMQ的延時等級為:1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h。level=0,表示不延時。level=1,表示 1 級延時,對應延時 1s。level=2 表示 2 級延時,對應5s,以此類推。
這種消息一般適用於消息生產和消費之間有時間窗口要求的場景。比如說我們網購時,下單之后是有一個支付時間,超過這個時間未支付,系統就應該自動關閉該筆訂單。那么在訂單創建的時候就會就需要發送一條延時消息(延時15分鍾)后投遞給 consumer,consumer 接收消息后再對訂單的支付狀態進行判斷是否關閉訂單。
設置延時非常簡單,只需要在Message設置對應的延時級別即可:
Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); // 這里設置需要延時的等級即可 msg.setDelayTimeLevel(3); SendResult sendResult = producer.send(msg);
RocketMQ的消息發送方式
- 同步發送
- 異步發送
- 單向發送
同步發送

簡單來說,同步發送就是指 producer 發送消息后,會在接收到 broker 響應后才繼續發下一條消息的通信方式。
由於這種同步發送的方式確保了消息的可靠性,同時也能及時得到消息發送的結果,故而適合一些發送比較重要的消息場景,比如說重要的通知郵件、營銷短信等等。在實際應用中,這種同步發送的方式還是用得比較多的。
異步發送
接着就是異步發送,異步發送是指 producer 發出一條消息后,不需要等待 broker 響應,就接着發送下一條消息的通信方式。需要注意的是,不等待 broker 響應,並不意味着 broker 不響應,而是通過回調接口來接收 broker 的響應。所以要記住一點,異步發送同樣可以對消息的響應結果進行處理。
由於異步發送不需要等待 broker 的響應,故在一些比較注重 RT(響應時間)的場景就會比較適用。比如,在一些視頻上傳的場景,我們知道視頻上傳之后需要進行轉碼,如果使用同步發送的方式來通知啟動轉碼服務,那么就需要等待轉碼完成才能發回轉碼結果的響應,由於轉碼時間往往較長,很容易造成響應超時。此時,如果使用的是異步發送通知轉碼服務,那么就可以等轉碼完成后,再通過回調接口來接收轉碼結果的響應了。
單向發送

單向發送,見名知意,就是一種單方向通信方式,也就是說 producer 只負責發送消息,不等待 broker 發回響應結果,而且也沒有回調函數觸發,這也就意味着 producer 只發送請求不等待響應結果。
由於單向發送只是簡單地發送消息,不需要等待響應,也沒有回調接口觸發,故發送消息所耗費的時間非常短,同時也意味着消息不可靠。所以這種單向發送比較適用於那些耗時要求非常短,但對可靠性要求並不高的場景,比如說日志收集。
下面通過一張表格,簡單總結一下同步發送、異步發送和單向發送的特點。

可以看到,從發送 TPS 來看,由於單向發送不需要等待響應也沒有回調接口觸發,發送速度非常快,一般都是微秒級的,在消息體大小一樣的情況下,其發送 TPS 最大。而同步發送,需要等待響應結果的返回,受網絡狀況的影響較大,故發送 TPS 就比較小。異步發送不等待響應結果,發送消息時幾乎不受網絡的影響,故相比同步發送來說,其發送 TPS 要大得多。
關於可靠性,大家需要牢記前面提過的,異步發送並不意味着消息不可靠,異步發送也是會接收到響應結果,也能對響應結果進行處理。即使發送失敗,也可以通過一些補償手段進行消息重發。和同步發送比起來,異步發送的發送 TPS 更大,更適合那些調用鏈路較長的一些場景。在實際使用中,同步發送和異步發送都是較為常用的兩種方式,大家要視具體業務場景進行合理地選擇。
pull和push消費模式
- pull是主動型消費,即能從服務器拉取到數據就開始消費
- 首先通過打算消費的topic拿到MessageQueue中的集合消息,然后遍歷拿取,並記錄下次取消息時的offset位
- push是被動型消費,多了一個注冊消費監聽器,本質還是從服務器拉取數據,但是要等到消費監聽器被觸發以后,才會進行消費
- push方式中,注冊MessageListener監聽器,取到消息后,喚醒MessageListener中的consumerMessage()來消費
RocketMQ的消費模式
- 集群消費
- 廣播消費
- ( 使用集群消費模擬廣播消費 )
首先明確一點,RocketMQ 是基於發布訂閱模型的消息中間件。所謂的發布訂閱就是說,consumer 訂閱了 broker 上的某個 topic,當 producer 發布消息到 broker 上的該 topic 時,consumer 就能收到該條消息。
之前我們講過 consumer group 的概念,即消費同一類消息的多個 consumer 實例組成一個消費者組,也可以稱為一個 consumer 集群,這些 consumer 實例使用同一個 group name。需要注意一點,除了使用同一個 group name,訂閱的 tag 也必須是一樣的,只有符合這兩個條件的 consumer 實例才能組成 consumer 集群。

當 consumer 使用集群消費時,每條消息只會被 consumer 集群內的任意一個 consumer 實例消費一次。舉個例子,當一個 consumer 集群內有 3 個consumer 實例(假設為consumer 1、consumer 2、consumer 3)時,一條消息投遞過來,只會被consumer 1、consumer 2、consumer 3中的一個消費。
同時記住一點,使用集群消費的時候,consumer 的消費進度是存儲在 broker 上,consumer 自身是不存儲消費進度的。消息進度存儲在 broker 上的好處在於,當你 consumer 集群是擴大或者縮小時,由於消費進度統一在broker上,消息重復的概率會被大大降低了。
注意:在集群消費模式下,並不能保證每一次消息失敗重投都投遞到同一個 consumer 實例。

當 consumer 使用廣播消費時,每條消息都會被 consumer 集群內所有的 consumer 實例消費一次,也就是說每條消息至少被每一個 consumer 實例消費一次。舉個例子,當一個 consumer 集群內有 3 個 consumer 實例(假設為 consumer 1、consumer 2、consumer 3)時,一條消息投遞過來,會被 consumer 1、consumer 2、consumer 3都消費一次。
與集群消費不同的是,consumer 的消費進度是存儲在各個 consumer 實例上,這就容易造成消息重復。還有很重要的一點,對於廣播消費來說,是不會進行消費失敗重投的,所以在 consumer 端消費邏輯處理時,需要額外關注消費失敗的情況。
雖然廣播消費能保證集群內每個 consumer 實例都能消費消息,但是消費進度的維護、不具備消息重投的機制大大影響了實際的使用。因此,在實際使用中,更推薦使用集群消費,因為集群消費不僅擁有消費進度存儲的可靠性,還具有消息重投的機制。而且,我們通過集群消費也可以達到廣播消費的效果。

如果業務上確實需要使用廣播消費,那么我們可以通過創建多個 consumer 實例,每個 consumer 實例屬於不同的 consumer group,但是它們都訂閱同一個 topic。舉個例子,我們創建 3 個 consumer 實例,consumer 1(屬於consumer group 1)、consumer 2(屬於 consumer group 2)、consumer 3(屬於consumer group 3),它們都訂閱了 topic A ,那么當 producer 發送一條消息到 topic A 上時,由於 3 個consumer 屬於不同的 consumer group,所以 3 個consumer都能收到消息,也就達到了廣播消費的效果了。 除此之外,每個 consumer 實例的消費邏輯可以一樣也可以不一樣,每個consumer group還可以根據需要增加 consumer 實例,比起廣播消費來說更加靈活。
說到消息過濾,就不得不說到 tag。沒錯,就是我們之前在專業術語中提到過的 tag。也稱為消息標簽,用來標記 Topic 下的不同用途的消息。
在 RocketMQ 中消費者是可以按照 Tag 對消息進行過濾。舉個電商交易場景的例子,用戶下完訂單之后,在后台會產生一系列的消息,比如說訂單消息、支付消息和物流消息。假設這些消息都發送到 Topic 為 Trade 中,同時用 tag 為 order 來標記訂單消息,用 tag 為 pay 來標記支付消息,用 tag 為 logistics 來標記物流消息。需要支付消息的支付系統(相當於一個 consumer)訂閱 Trade 中 tag 為 pay 的消息,此時,broker 則只會把 tag 為 pay 的消息投遞給支付系統。而如果是一個實時計算系統,它可能需要接收所有和交易相關的消息,那么只要它訂閱 Trade 中 tag 為 order、pay、logistics 的消息,broker 就會把帶有這些 tag 的消息投遞給實時計算系統。
對於消息分類,我們可以選擇創建多個 Topic 來區分,也可以選擇在同一個 Topic 下創建多個 tag 來區分。這兩種方式都是可行的,但是一般情況下,不同的 Topic 之間的消息是沒有什么必然聯系的,使用 tag 來區分同一個 Topic 下相互關聯的消息則更加合適一些。
講完了消息過濾,我們接着講講什么是訂閱關系一致性呢?其實在講 RocketMQ 消費模式的時候提到過,除了使用同一個 group name,訂閱的 tag 也必須是一樣的,只有符合這兩個條件的 consumer 實例才能組成 consumer 集群。這里所說的其實就是訂閱關系一致性。在 RocketMQ 中,訂閱關系由 Topic和 Tag 組成,因此要保證訂閱關系一致性,就必須同時保證這兩點:
-
訂閱的 Topic 必須一致
-
訂閱的 Topic 中的 tag 必須一致
保證訂閱關系一致性是非常重要的,一旦訂閱關系不一致,消息消費的邏輯就會混亂,甚至導致消息丟失,這對於大部分業務場景來說都是不允許的,甚至是致命的。在實際使用中,切記同一個消費者集群內的所有消費者實例務必要保證訂閱關系的一致性。

圖 1
備注:圖中 “*” 代表訂閱該Topic下所有的 tag。
我們用具體的例子來解釋一下,如圖 1 所示,消費者集群中有 3 個 consumer 實例,分別為 C1、C2、C3,各自訂閱的 topic 和 tag 各不相同。首先 C1 和 C2 都訂閱 TopicA,滿足了訂閱關系一致性的第一點,但是 C1 訂閱的是 TopicA 的 Tag1,而 C2 訂閱的是 TopicA 的 Tag2,不滿足訂閱關系一致性的第二點,所以 C1、C2 不滿足訂閱關系一致性。而 C3 訂閱的 Topic 和 Tag 都與 C1 和 C2不一樣,同樣也不滿足訂閱關系一致性。

圖 2
備注:圖中 “||” 用來連接不用的 tag,表示與的意思。
在圖 2 中,消費者集群中有 3 個 consumer 實例,分別為 C1、C2、C3,都是訂閱 TopicA 下的 Tag1 和 Tag2,滿足了訂閱關系一致性的兩點要求,所以滿足了訂閱關系一致性。

圖 3
如圖 3 所示,一個 consumer 也可以訂閱多個 Topic,同時也必須保證該 consumer 集群里的多個消費者實例的訂閱關系一致性,才不會造成不必要的麻煩。
在實際使用中,消息過濾可以幫助我們只消費我們所需要的消息,這是在broker端就幫我們處理好的,大大減少了在 consumer 端的消息過濾處理,一方面減少了代碼量,另一方面更減少了不必要消息的網絡傳輸消耗。
訂閱消息一致性則保證了同一個消費者集群中 consumer 實例的正常運行,避免消息邏輯的混亂和消息的丟失。所以在實際使用中,在 producer 端要做好消息的分類,便於 consumer 可以使用 tag 進行消息的准確訂閱,而在 consumer 端,則要保證訂閱關系一致性。
首先明確之前說過的,消息重試只針對集群消費模式,廣播消費沒有消息重試的特性,消費失敗之后,只會繼續消費下一條消息。這也是為什么我們一再強調,推薦大家使用集群消費模式,其消息重試的特性能給開發者帶來極大的方便。
那么什么是消息重試呢?簡單來說,就是當消費者消費消息失敗后,broker 會重新投遞該消息,直到消費成功。在 RocketMQ 中,當消費者使用集群消費模式時,消費者接收到消息並進行相應的邏輯處理之后,最后都要返回一個狀態值給 broker。這樣 broker 才知道是否消費成功,需不需要重新投遞消息。也就是說,我們可以通過設置返回的狀態值來告訴 broker 是否重新投遞消息。
到這里,可能大家會有一個疑問,那如果這條消息本身就是一條臟數據,就算你消費 100 次也不會消費成功,難道還是一直去重試嘛?其實 RocketMQ 並不會無限制地重試下去,默認每條消息最多重試 16 次,而每次重試的間隔時間如下表所示:
那么如果消息重試 16 次之后還是消費失敗怎么辦呢?那么消息就不會再投遞給消費者,而是將消息放到相對應的死信隊列中。這時候我們就需要對死信隊列的消息做一些人工補償處理,因為這些消息可能本身就有問題,也有可能和消費邏輯調用的服務有關等,所以需要人工判斷之后再進行處理。
到這里不知道大家有沒有一個疑問,那就是什么樣的情況才叫消費失敗呢?可以分為 3 種情況:
-
返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
-
返回 null
-
拋出異常
前兩種情況都比較好理解,就是前面說過的設置狀態值,也就是說,只需要消費者返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 或者 null,就相當於告訴 broker 說,這條消息我消費失敗了,你給我重新投遞一次。而對於拋出異常這種情況,只要在你處理消費邏輯的地方拋出了異常,那么 broker 也重新投遞這條消息。注意一點,如果是被捕獲的異常,則不會進行消息重試。
消息冪等
首先什么是消費冪等呢?簡單來說就是對於一條消息的處理結果,不管這條消息被處理多少次,最終的結果都一樣。比如說,你收到一條消息是要更新一個商品的價格為 6.8 元,那么當這條消息執行 1 次,還是執行 100 次,最終在數據庫里的該商品價格就是 6.8 元,這就是所謂的冪等。 那么為什么消費需要冪等呢?因為在實際使用中,尤其在網絡不穩定的情況下,RocketMQ 的消息有可能會出現重復,包括兩種情況:
-
發送時消息重復;
-
投遞時消息重復;
第一種情況是生產者發送消息的場景,消息已成功發送到 broker ,但是此時可能發生網絡閃斷或者生產者宕機了,導致 broker 發回的響應失敗。這時候生產者由於沒有收到響應,認為消息發送失敗,於是嘗試再次發送消息給 broker。這樣一來,broker 就會再收到一條一摸一樣內容的消息,最終造成了消費者也收到兩條內容一摸一樣的消息。
第二種情況是消費者消費消息的場景,消息已投遞到消費者並完成消費邏輯處理,當消費者給 broker 反饋消費狀態時可能發生網絡閃斷。broker 收不到消費者的消費狀態,為了保證至少消費一次的語義,broker 將在網絡恢復后再次嘗試投遞之前已經被處理過的消息,最終造成消費者收到兩條內容一摸一樣的消息。
當然對於一些允許消息重復的場景,大可以不必關心消費冪等。但是對於那些不允許消息重復的業務場景來說,處理建議就是通過業務上的唯一標識來作為冪等處理的依據。
消息重試,保證了消費消息的容錯性,即使消費失敗,也不需要開發者自己去編寫代碼來做補償,大大提高了開發效率,同時也是 RocketMQ 相較於其他 MQ 的一個非常好的特性。而消費冪等主要是針對那些不允許消息重復的場景,應該說大部分 MQ 都需要冪等處理,這屬於代碼邏輯或者說業務上的需要,最好的處理方式就是前面所說的根據業務上唯一標識來作為冪等處理的依據。
消息的重復消費問題及措施
出現消息的重復消費的原因是因為我們的rocketmq支持失敗重試的機制,一些極端情況下,例如消費超時,或者mq沒有收到消費端的ACK確認碼,將消息發給其他消費者而出現的重復問題
- 針對普通場景,建立一個消息表。對於每條消息,創建唯一的標識,這樣避免相同的消息出現重復消費
- 針對並發較高的場景,可以通過redis來代替消息表
- 甚至可以考慮布隆過濾器,但是布隆過濾器存在一定的誤報風險,當誤報時,會認為該條消息已存在(實際不存在),導致正常消息無法被消費
Rocketmq刷盤策略
所有消息都是持久化的,先寫入pagecache區,再寫入磁盤,保證磁盤和內存均有一份數據,讀取時讀取內存數據
使用哪種刷盤方式可以調整broker配置文件中的
flushType = SYNC_FLUSH or ASYNC_FLUSH
- 同步刷盤
- 消息存儲磁盤后才會返回成功
- 當消息存入pagecache區域時,立即通知刷盤線程,完成刷盤工作后,返回成功
- 同步刷盤更穩定,但是吞吐較低,適用於要求消息可靠性更高的場景
- 異步刷盤
- 消息存入pagecache區,即返回成功,當內存區域數據達到一定容量時,統一寫入磁盤
- 異步刷盤高吞吐,寫操作返回快
- 意外情況下斷電,會導致pagecache區域尚未刷入磁盤的部分數據丟失,但是吞吐性更高
Rocketmq復制策略
當broker以集群形式分布,需要進行消息的主從同步時,會使用到復制策略
同步復制
- master和salve均寫入成功后,返回成功
- master和salve數據同步,不易丟失,但是吞吐相對較低
異步復制
- master數據寫入成功后,立即返回成功
- master莫名其妙宕機后,可能會出現master和salve的數據不一致的情況,吞吐性能更高
建議推薦方式:異步刷盤+同步復制
RocketMq消息丟失場景及解決方案
- 生產者將消息發送給mq途中,因出現網絡抖動,導致消息丟失
- 消息存儲在pagecache區,且尚未觸發異步刷盤,而出現斷電一類,導致數據丟失。或是存入磁盤后,磁盤損壞導致數據丟失
- Consumer從mq中拿取數據,尚未完成消費,就通知mq消費完畢,然后消費者宕機,導致消息丟失
解決方案
場景一:
- 基於生產者的分布式事務來解決
- 若是消息推送mq過程中丟失,則執行回滾操作
- 生產者發送完消息以后,mq即使接收到響應成功后,暫時消費者也不會消費的(此時處於半消息狀態)
- 生產者會執行自己的鏈路,若是執行完畢且成功,會再次通知mq將消息commit(二次確認機制),否則進行rollback操作
場景二:
將異步刷盤改為同步刷盤,同時對於broker進行集群化部署,進行主從復制策略
場景三:
- mq會在消費端注冊一個監聽,當consumer拿去到消息消費時,只有消費成功后,才會發送一個COMSUME_SUCCESS的狀態,mq會知道消費成功(類似與一個ACK的確認機制)
- 當節點掛掉時,rocketmq長時間收不到響應(監聽也沒了),就會進行故障轉移,將消息發給其他消費者處理