什么是 MQ?
Message Queue(MQ),消息隊列中間件。很多人都說:MQ 通過將消息的發送和接收分離來實現應用程序的異步和解偶,這個給人的直覺是——MQ 是異步的,用來解耦的,但是這個只是 MQ 的效果而不是目的。MQ 真正的目的是為了通訊,屏蔽底層復雜的通訊協議,定義了一套應用層的、更加簡單的通訊協議。一個分布式系統中兩個模塊之間通訊要么是 HTTP,要么是自己開發的 TCP,但是這兩種協議其實都是原始的協議。HTTP 協議很難實現兩端通訊——模塊 A 可以調用 B,B 也可以主動調用 A,如果要做到這個兩端都要背上 WebServer,而且還不支持長連接(HTTP 2.0 的庫根本找不到)。TCP 就更加原始了,粘包、心跳、私有的協議,想一想頭皮就發麻。MQ 所要做的就是在這些協議之上構建一個簡單的“協議”——生產者/消費者模型。MQ 帶給我的“協議”不是具體的通訊協議,而是更高層次通訊模型。它定義了兩個對象——發送數據的叫生產者;接收數據的叫消費者, 提供一個 SDK 讓我們可以定義自己的生產者和消費者實現消息通訊而無視底層通訊協議
消息隊列的流派
-
有 Broker 的MQ
生產者/消費者模式一定伴隨着者觀察者模式。
這個流派通常有一台服務器作為 Broker,所有的消息都通過它中轉。生產者把消息發送給它就結束自己的任務了,Broker 則把消息主動推送給消費者(或者消費者主動輪詢)
-
重 Topic
kafka、JMS(ActiveMQ)就屬於這個流派,生產者會發送 key 和數據到 Broker,由 Broker 比較 key 之后決定給哪個消費者。這種模式是我們最常見的模式,是我們對 MQ 最多的印象。在這種模式下一個 topic 往往是一個比較大的概念,甚至一個系統中就可能只有一個 topic,topic 某種意義上就是 queue,生產者發送 key 相當於說:“hi,把數據放到 key 的隊列中”
kafka 會丟數據。
雖然架構一樣但是 kafka 的性能要比 jms 的性能不知道高到多少倍,所以基本這種類型的 MQ 只有 kafka 一種備選方案。如果你需要一條暴力的數據流(在乎性能而非靈活性)那么 kafka 是最好的選擇
-
-
輕 Topic
這種的代表是 RabbitMQ(或者說是 AMQP)。生產者發送 key 和數據,消費者定義訂閱的隊列,Broker 收到數據之后會通過一定的邏輯計算出 key 對應的隊列,然后把數據交給隊列
AMQP 中有四種 exchange
-
Direct exchange:key 就等於 queue
-
Fanout exchange:無視 key,給所有的 queue 都來一份
-
Topic exchange:key 可以用“寬字符”模糊匹配 queue
-
Headers exchange:無視 key,通過查看消息的頭部元數據來決定發給那個 queue(AMQP 頭部元數據非常豐富而且可以自定義)
-
無 Broker 的 MQ
無 Broker 的 MQ 的代表是 ZeroMQ。該作者非常睿智,他非常敏銳的意識到——MQ 是更高級的 Socket,它是解決通訊問題的。所以 ZeroMQ 被設計成了一個“庫”而不是一個中間件,這種實現也可以達到——沒有 Broker 的目的
節點之間通訊的消息都是發送到彼此的隊列中,每個節點都既是生產者又是消費者。ZeroMQ 做的事情就是封裝出一套類似於 Socket 的 API 可以完成發送數據,讀取數據
ZeroMQ 其實就是一個跨語言的、重量級的 Actor 模型郵箱庫。你可以把自己的程序想象成一個 Actor,ZeroMQ 就是提供郵箱功能的庫;ZeroMQ 可以實現同一台機器的 RPC 通訊也可以實現不同機器的 TCP、UDP 通訊,如果你需要一個強大的、靈活、野蠻的通訊能力,別猶豫 ZeroMQ
為什么要用 MQ?
-
異步處理
比如用戶注冊后需要發送短信和郵件,新手會怎么寫那?新手會把所有業務寫成串行化,注冊成功后發郵件,發短信。缺點是會把接口的時間拖的很慢。
如何解決和優化那?
發送郵件和短信完全可以異步處理,注冊成功,發個消息消息中間件后直接返回,讓中間件去異步的發郵件和短信。
-
應用解藕
比如在分布式系統中,我們有訂單服務,庫存服務等等。我們訂單服務下完單后通過 rpc 調用庫存系統。假如你的庫存系統掛了,你的訂單是不是就失敗了?
解決辦法還是加一個消息中間件。使用發布訂閱模式。
系統的耦合性越高,容錯就越低。比如訂單系統要去調用支付系統,庫存系統和物流系統,如果任何一個系統高異常都會影響用戶體驗。
加入 MQ 后,比如物流系統出現異常,可以將物流消息暫時存在 MQ 中,不影響用戶正常下單,等物流系統恢復后繼續執行(分布式事務的概念)。
-
流量削峰
應用系統如果遇到系統請求流量的瞬間猛漲,有可能會將系統壓垮,有了消息隊列可以將大量請求緩存起來,分散到很長一段時間處理,這樣可以大大提高系統的穩定性和用戶體驗。
如果業務系統正常時段的 QPS 是 1000,流量最高峰是 10000,請求超過某個閥值后對流量進行削峰處理。
-
MQ 的優點和缺點
優點:解藕,削峰,數據分發。
缺點:
- 系統可用性降低
系統引入的外部依賴越多,系統穩定性越差。一旦 MQ 宕機,就會對業務造成影響。 - 系統復雜度提高
- 一致性問題
A 系統處理完業務,通過 MQ 給 B,C,D 三個系統發消息數據,如果 B,C 處理成功,D 系統處理失敗。如何保證消息數據處理的一致性?
Push 和 Pull 模型
- Push 模型,即當 Producer 發出的消息到達后,服務端馬上將這條消息投遞給 Consumer。
- Pull 模型,指的是服務端接收到這條消息后什么也不做,只是等着 Consumer 主動到自己這里來讀,即 Consumer 這里有一個 “拉取” 的動作。
RocketMQ 的角色介紹
- Producer:消息發送者。
- Consumer:消息接收者。
- Broker:暫存和傳輸消息,像郵局。
- NameServer:管理 Broker,命名服務。
- Topic:區分消息的種類,一個發送者可以發送消息給一個或者多個多個 Topic,一個消息的接受者可以訂閱一個或者多個 Topic 消息。
- Message Queue:相當於是 Topic 的分區,用於並行發送和接收消息。
持久化
- 消息生成者發送消息
- MQ 收到消息,將消息進行持久化,在存儲中新增一條記錄
- 返回 Ack 給生產者
- MQ push 消息給對應的消費者,然后等待消費者返回 Ack
- 如果消息消費者在指定時間內返回 Ack,那么 MQ 會認為消息消費成功,在存儲中刪除消息,即執行第 6 步。如果 MQ 在指定時間內沒有收到 Ack,則認為消息消費失敗,會嘗試重新 push 消息,重復執行 4,5,6 步驟。
- MQ 刪除消息
存儲介質
- 關系型數據庫(ActiveMQ,數據庫的 IO 讀寫性能往往會出現瓶頸)。
- 文件系統(RocketMQ/Kafka/RabbitMQ 均采用的是消息刷盤至所部屬虛擬機/物理機的文件系統來做初始化,刷盤可以采用同步/異步兩種模式)。
- RocketMQ 寫文件時采用順序寫,最高 600M/s,隨機寫只有 100KB/s。RocketMQ 采用了零拷貝技術,提高了消息存盤和網絡發送的速度。
消息存儲結構
RocketMQ 消息的存儲是由 ConsumerQueue 和 CommitLog 配合完成的,消息真正的物理存儲文件是 CommitLog,ConsumerQueue 是消息的邏輯隊列,類似數據庫的索引文件,存儲指向物理存儲的地址。每個 Topic 下的每個 MessageQueue 都有一個對應的 ConsumerQueue 文件。
刷盤機制
RocketMQ 為了提高性能,會盡可能的保證磁盤的順序寫,消息在通過 Producer 寫入 RocketMQ 的時候,有兩種寫磁盤方式,分布式同步刷盤和異步刷盤。
-
同步刷盤
在返回寫成功狀態時,消息立即被寫入磁盤。具體流程是,消息寫入內存的 PAGECACHE 后,立刻通知刷盤線程刷盤,然后等刷盤完成,刷盤線程執行完成后換行等待的線程,返回消息寫成功的狀態。
-
異步刷盤
在返回寫成功狀態時,消息可能只是被寫入了內存的 PAGECACHE,寫操作的返回快,吞吐量達,當內存里的消息積累到一定程度時,統一出發寫磁盤動作,快速寫入。
高可用機制
RocketMQ 分布式集群是通過 Master 和 Slave 的配合達到高可用性的。
Master 角色的 Broker 支持讀和寫,Slave 角色的 Broker 僅支持讀,也就是 Producer 只能和 Master 角色的 Borker 連接寫入消息。Consumer 可以連接 Master 角色的 broker,也可以連接 Slave 角色的 Broker 來讀取消息。
消費端的高可用,默認先從 Master 讀,當 Master 角色出現故障后,Consumer 仍然可以從 Slave 讀取消息,達到了消費端的高可用。
雙主從機制保證了發送消息的高可用。
消息主從復制
-
同步復制
同步復制方式是等 Master 和 Slave 均寫成功后才反饋給客戶端寫成功狀態。
同步復制方式下,如果 Master 出故障,Slave 上有全部的備份數據,容易恢復,但是同步復制會增大數據寫入延遲,降低系統吞吐量。
-
異步復制
異步復制方式只要 Master 寫入成功即可反饋給客戶端寫入成功狀態。
在異步復制方式下,系統擁有較低的延遲和較高的吞吐量,但是如果 Master 出了故障,有些數據因為沒有被寫入 Slave,又可能會丟失。
-
一般把刷盤配置成異步的,主從之間同步配置成同步的,這樣即使有一台機器出故障,仍然能保障數據不丟失,是個不錯的選擇。
負載均衡
Producer 端,每個實力在發送消息的時候,默認會輪訓所有 message queue 發送,以達到讓消息平均落在不同的 queue 上,而由於 queue 可以散落在不同的 broker,所以消息就發送到不同的 broker 下。
Consumer 負載均衡
-
集群模式
在集群消費模式下,每條消息只需要投遞到訂閱這個 topic 的 Cosnuemr Group 下的一個實例即可。RocketMQ 采用主動拉取並消費消息,在拉取的時候需要明確指定拉取哪一條 message queue。
消息重試
-
順序消息的重試
對於順序消息,當消費者消費消息失敗后,消息隊列 RocketMQ 會自動不斷進行消息重試,這時,應用會出現消息消費被阻塞的情況。因此,在使用順序消息時,無比保證應用能夠及時監控並處理消費失敗的情況,避免阻塞現象的發生。
-
無序消息的重試
對於無序消息(普通,定時,延時,事務消息),當消費消費消息失敗時,您可以通過設置返回狀態達到消息重試的結果。
死信隊列
當一條消息消費失敗,消息隊列 RocketMQ 會自動進行消息重試,達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列 RocketMQ 不會立刻將消息丟棄,而是發送到死信隊列中。
消費冪等
消息隊列 RocketMQ 消費者在接收到消息以后,有必要根據業務上的唯一 Key 對消息做冪等處理的必要性。互聯網應用中,尤其是網絡不穩定情況下,消息隊列 RocketMQ 的消息可能會出現重復:
- 發送消息重復
- 投遞消息重復
- 負載均衡時消息重復
因為 Message ID 又可能出現沖突的情況,所以真正的安全的冪等處理,不建議以 Message ID 作為處理依據,最好方式是以業務唯一標識作為冪等的關鍵依據,而業務的唯一標示可以通過消息 Key 進行設置。