功能特性:


應用場景:
消息隊列 MQ 可應用於如下幾個場景:
分布式事務
在傳統的事務處理中,多個系統之間的交互耦合到一個事務中,響應時間長,影響系統可用性。引入分布式事務消息,交易系統和消息隊列之間,組成一個事務處理,能保證分布式系統之間數據的最終一致。;下游業務系統(購物車、積分、其他)相互隔離,並行處理。


實時計算
通過消息隊列(MQ),將源端不停產生的數據實時流入到計算引擎,實現實時計算。可采用如下計算引擎:Spark / Storm / EMR / ARMS / BeamRunner。


物聯網應用
物聯網設備通過微消息隊列(LMQ)連接雲端,雙向通信,數據傳輸;設備數據通過消息隊列(MQ)連接計算引擎,分析數據或者源數據實時高效寫入到 HiTSDB / HiStore / ODPS 等。


大規模緩存同步
在商業大促活動中,如“雙11”大促,各個分會場會有琳琅滿目的商品,每件商品的價格都會實時變化;同時,大量並發訪問商品數據庫,會場頁面響應時間長。集中式緩存,帶寬成瓶頸,無法滿足對商品價格的訪問需求。
消息隊列(MQ)能夠通過大規模緩存同步,減少頁面響應時間;針對分會場的多緩存設計,滿足客戶對商品價格的訪問需求。


名詞解釋
本文主要對 MQ 涉及的專有名詞及術語進行定義和解析,方便您更好地理解相關概念並使用 MQ。
Message Queue
消息隊列,阿里雲商用的專業消息中間件,是企業級互聯網架構的核心產品,提供基於高可用分布式集群技術搭建的消息發布訂閱、軌跡查詢、資源統計、定時(延時)、監控報警等一系列消息雲服務。
Message
消息,消息隊列中信息傳遞的載體。
Message ID
消息的全局唯一標識,由 MQ 系統自動生成,唯一標識某條消息。
Message Key
消息的業務標識,由消息生產者(Producer)設置,唯一標識某個業務邏輯。
Topic
消息主題,一級消息類型,通過 Topic 對消息進行分類。
Tag
消息標簽,二級消息類型,用來進一步區分某個 Topic 下的消息分類。
Producer
消息生產者,也稱為消息發布者,負責生產並發送消息。
Producer ID
一類 Producer 的標識,這類 Producer 通常生產並發送一類消息,且發送邏輯一致。
Producer 實例
Producer 的一個對象實例,不同的 Producer 實例可以運行在不同進程內或者不同機器上。Producer 實例線程安全,可在同一進程內多線程之間共享。
Consumer
消息消費者,也稱為消息訂閱者,負責接收並消費消息。
Consumer ID
一類 Consumer 的標識,這類 Consumer 通常接收並消費一類消息,且消費邏輯一致。
Consumer 實例
Consumer 的一個對象實例,不同的 Consumer 實例可以運行在不同進程內或者不同機器上。一個 Consumer 實例內配置線程池消費消息。
集群消費
一個 Consumer ID 所標識的所有 Consumer 平均分攤消費消息。例如某個 Topic 有 9 條消息,一個 Consumer ID 有 3 個 Consumer 實例,那么在集群消費模式下每個實例平均分攤,只消費其中的 3 條消息。
廣播消費
一個 Consumer ID 所標識的所有 Consumer 都會各自消費某條消息一次。例如某個 Topic 有 9 條消息,一個 Consumer ID 有 3 個 Consumer 實例,那么在廣播消費模式下每個實例都會各自消費 9 條消息。
定時消息
Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是推遲到在當前時間點之后的某一個時間投遞到 Consumer 進行消費,該消息即定時消息。
延時消息
Producer 將消息發送到 MQ 服務端,但並不期望這條消息立馬投遞,而是延遲一定時間后才投遞到 Consumer 進行消費,該消息即延時消息。
事務消息
MQ 提供類似 X/Open XA 的分布事務功能,通過 MQ 事務消息能達到分布式事務的最終一致。
順序消息
MQ 提供的一種按照順序進行發布和消費的消息類型, 分為全局順序消息和分區順序消息。
順序發布
對於指定的一個 Topic,客戶端將按照一定的先后順序進行發送消息。
順序消費
對於指定的一個 Topic,按照一定的先后順序進行接收消息,即先發送的消息一定會先被客戶端接收到。
全局順序消息
對於指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發布和消費。
分區順序消息
對於指定的一個 Topic,所有消息根據 sharding key 進行區塊分區。同一個分區內的消息按照嚴格的 FIFO 順序進行發布和消費。Sharding key 是順序消息中用來區分不同分區的關鍵字段,和普通消息的 key 是完全不同的概念。
消息堆積
Producer 已經將消息發送到 MQ 服務端,但由於 Consumer 消費能力有限,未能在短時間內將所有消息正確消費掉,此時在 MQ 服務端保存着未被消費的消息,該狀態即消息堆積。
消息過濾
訂閱者可以根據消息標簽(Tag)對消息進行過濾,確保訂閱者最終只接收被過濾后的消息類型。消息過濾在 MQ 服務端完成。
消息軌跡
在一條消息從發布者發出到訂閱者消費處理過程中,由各個相關節點的時間、地點等數據匯聚而成的完整鏈路信息。通過消息軌跡,用戶能清晰定位消息從發布者發出,經由 MQ 服務端,投遞給消息訂閱者的完整鏈路,方便定位排查問題。
重置消費位點
以時間軸為坐標,在消息持久化存儲的時間范圍內(默認3天),重新設置消息訂閱者對其訂閱 Topic 的消費進度,設置完成后訂閱者將接收設定時間點之后由消息發布者發送到 MQ 服務端的消息。
集群消費和廣播消費:
本文檔主要介紹 MQ 集群消費和廣播消費的基本概念,適用場景以及注意事項。
基本概念
MQ 是基於發布訂閱模型的消息系統。在 MQ 消息系統中消息的訂閱方訂閱關注的 Topic,以獲取並消費消息。由於訂閱方應用一般是分布式系統,以集群方式部署有多台機器。因此 MQ 約定以下概念。
集群:MQ 約定使用相同 Consumer ID 的訂閱者屬於同一個集群,同一個集群下的訂閱者消費邏輯必須完全一致(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點。
集群消費:當使用集群消費模式時,MQ 認為任意一條消息只需要被集群內的任意一個消費者處理即可。
廣播消費:當使用廣播消費模式時,MQ 會將每條消息推送給集群內所有注冊過的客戶端,保證消息至少被每台機器消費一次。
場景對比
集群消費模式:


適用場景&注意事項
- 消費端集群化部署,每條消息只需要被處理一次。
- 由於消費進度在服務端維護,可靠性更高。
- 集群消費模式下,每一條消息都只會被分發到一台機器上處理,如果需要被集群下的每一台機器都處理,請使用廣播模式。
- 集群消費模式下,不保證消息的每一次失敗重投等邏輯都能路由到同一台機器上,因此處理消息時不應該做任何確定性假設。
廣播消費模式:


適用場景&注意事項
- 順序消息暫不支持廣播消費模式。
- 每條消息都需要被相同邏輯的多台機器處理。
- 消費進度在客戶端維護,出現重復的概率稍大於集群模式。
- 廣播模式下,MQ 保證每條消息至少被每台客戶端消費一次,但是並不會對消費失敗的消息進行失敗重投,因此業務方需要關注消費失敗的情況。
- 廣播模式下,第一次啟動時默認從最新消息消費,客戶端的消費進度是被持久化在客戶端本地的隱藏文件中,因此不建議刪除該隱藏文件,否則會丟失部分消息。
- 廣播模式下,每條消息都會被大量的客戶端重復處理,因此推薦盡可能使用集群模式。
- 目前僅 Java 客戶端支持廣播模式。
- 廣播模式下服務端不維護消費進度,所以 MQ 控制台不支持消息堆積查詢和堆積報警功能。
使用集群模式模擬廣播:
如果業務需要使用廣播模式,也可以創建多個 Consumer ID,用於訂閱同一個 Topic。


適用場景&注意事項
- 每條消息都需要被多台機器處理,每台機器的邏輯可以相同也可以不一樣。
- 消費進度在服務端維護,可靠性高於廣播模式。
- 一個雲賬戶所能創建的 Consumer ID 數量是有限制的,具體可以咨詢售后技術支持。
- 對於一個 Consumer ID 來說,可以部署一個消費端實例,也可以部署多個消費端實例。當部署多個消費端實例時,實例之間又組成了集群模式(共同分擔消費消息)。假設 Consumer ID1 部署了三個消費者實例 C1,C2,C3,那么這三個實例將共同分擔服務器發送給 Consumer ID1 的消息。同時,實例之間訂閱關系必須保持一致。
消息過濾:
本文描述 MQ 消費者如何根據 Tag 在 MQ 服務端完成消息過濾。
Tag,即消息標簽、消息類型,用來區分某個 MQ 的 Topic 下的消息分類。MQ 允許消費者按照 Tag 對消息進行過濾,確保消費者最終只消費到他關心的消息類型。
以下圖電商交易場景為例,從客戶下單到收到商品這一過程會生產一系列消息,比如訂單創建消息(order)、支付消息(pay)、物流消息(logistics)。這些消息會發送到 Topic 為 Trade_Topic 的隊列中,被各個不同的系統所接收,比如支付系統、物流系統、交易成功率分析系統、實時計算系統等。其中,物流系統只需接收物流類型的消息(logistics),而實時計算系統需要接收所有和交易相關(order、pay、logistics)的消息。


說明:針對消息歸類,您可以選擇創建多個 Topic, 或者在同一個 Topic 下創建多個 Tag。但通常情況下,不同的 Topic 之間的消息沒有必然的聯系,而 Tag 則用來區分同一個 Topic 下相互關聯的消息,比如全集和子集的關系,流程先后的關系。
參考示例
發送消息
發送消息時,每條消息必須指明消息類型 Tag:
Message msg = new Message("MQ_TOPIC","TagA","Hello MQ".getBytes());
消費方式-1
消費者如需訂閱某 Topic 下所有類型的消息,Tag 用符號 * 表示:
consumer.subscribe("MQ_TOPIC", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消費方式-2
消費者如需訂閱某 Topic 下某一種類型的消息,請明確標明 Tag:
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消費方式-3
消費者如需訂閱某 Topic 下多種類型的消息,請在多個 Tag 之間用 || 分隔:
consumer.subscribe("MQ_TOPIC", "TagA||TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
消費方式-4(錯誤示例)
同一個消費者多次訂閱某 Topic 下的不同 Tag,后者會覆蓋前者:
//如下錯誤代碼中,consumer 只能接收到 MQ_TOPIC 下 TagB 的消息,而不能接收 TagA 的消息。
consumer.subscribe("MQ_TOPIC", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("MQ_TOPIC", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; }
訂閱關系一致:
MQ 里的一個 Consumer ID 代表一個 Consumer 實例群組。對於大多數分布式應用來說,一個 Consumer ID 下通常會掛載多個 Consumer 實例。訂閱關系一致指的是同一個 Consumer ID 下所有 Consumer 實例的處理邏輯必須完全一致。一旦訂閱關系不一致,消息消費的邏輯就會混亂,甚至導致消息丟失。
由於 MQ 的訂閱關系主要由 Topic+Tag 共同組成,因此,保持訂閱關系一致意味着同一個 Consumer ID 下所有的實例需在以下兩方面均保持一致:
- 訂閱的 Topic 必須一致;
- 訂閱的 Topic 中的 Tag 必須一致。




如上圖所示,一個 Consumer ID 也可以訂閱多個 Topic,但是這個 Consumer ID 里的多個消費者實例的訂閱關系一定要保持一致。
下文給出了訂閱關系不一致的錯誤代碼示例。
【例一】以下例子中,同一個 Consumer ID 下的兩個實例訂閱的 Topic 不一致。
Consumer 實例 1-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 實例1-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_1"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_B ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
【例二】以下例子中,同一個 Consumer ID 下訂閱 Topic 的 Tag 不一致。Consumer 實例2-1 訂閱了 TagA,而 Consumer 實例2-2 未指定 Tag。
Consumer 實例2-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 實例2-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_2"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A ", "*", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; }
【例三】此例中,錯誤的原因有倆個:
- 同一個 Consumer ID 下訂閱 Topic 個數不一致。
- 同一個 Consumer ID 下訂閱 Topic 的 Tag 不一致。
Consumer 實例3-1:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, "CID_jodie_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A", "TagA", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } }); consumer.subscribe("jodie_test_B", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });
Consumer 實例3-2:
Properties properties = new Properties(); properties.put(PropertyKeyConst.ConsumerId, " CID_jodie_test_3"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("jodie_test_A ", "TagB", new MessageListener() { public Action consume(Message message, ConsumeContext context) { System.out.println(message.getMsgID()); return Action.CommitMessage; } });