消息中間件
消息中間件的作用
- 解耦:消息中間件在服務之間插入了一個隱含的、基於數據的接口層。兩邊的服務處理過程都要實現這一接口,這允許我們獨立的擴展或修改兩邊的處理過程,只要確保他們遵守相同的規范約束即可
- 冗余(存儲):消息中間件可以將數據持久化直到完全被處理
- 擴展性:因為消息中間件解耦了應用的處理過程,所以提高消息入隊和處理的效率都是很容易的,只要另外增加處理過程即可,不需要修改代碼和調節參數
- 削峰:在訪問量驟增的情況下,服務仍然需要可用。但以此為標准設計程序又無疑是巨大的浪費。使用消息中間件可以使關鍵組件能夠給支撐突然訪問壓力,不會因為突發的超負荷請求而完全崩潰
- 可恢復性、順序保證(一定程度)、緩沖、異步通訊
Kafka
Kafka 和 RabbitMq 的異同
項目中同時用到了 Kafka 和 RabbitMq,因此需要作比較
Kafka 是一個分布式的消息流處理平台,擁有着極致的吞吐量,支持消息重新消費。非常適合應用在大數據領域。因此我們項目中的收數服務應用了 Kafka 來處理每天 10E 級的數據。
RabbitMq 則勝在擁有更靈活的交換器與隊列的匹配規則(基於 topic 交換器 + # * 匹配關鍵字),還有 TTL+死信隊列 實現的延時隊列,及簡單易上手的可靠性保障,因此在吞吐沒有達到每秒幾十萬的而必須用 kafka 時,RabbitMq 是個很好的選擇
RabbitMQ
基礎
相關概念介紹
- 生產者:投遞消息的一方,生產者創建消息,然后發布到 mq 中。消息一般分為兩個部分:消息體和標簽,消息體也可稱為 payload,實際應用中,消息體一般是一個帶有業務邏輯結構的數據。消息的標簽用來表述這條消息,比如一個交換器名稱和一個路由鍵
- 消費者:接收消息的一方,當消費者消費一條消息時,只是消費消息的消息體,消息路由的過程中,消息的標簽會被丟棄,存入到隊列中的只有消息體
- 隊列:RabbitMq 消息最終存儲在隊列中,多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤。RabbitMq不支持隊列層面的廣播消費,Kafka 可以通過消費者組實現
- 交換器、路由鍵、綁定:生產者將消息發送到交換器,交換器根據 RoutingKey 和 BindingKey 將消息路由到隊列,如果不能則返回給消費者或丟棄該消息
交換器類型
- fanout: 將消息路由到所有綁定的隊列中
- direct:將消息路由到 routingKey 與 bindingKey 完全匹配的隊列中
- topic:將消息按照 bindingKey 規則匹配 routingKey ,到指定的隊列
- routingKey 和 bindingKey 都是以 . 號分割多個單詞的字符串
- BindingKey 可以使用 * 用來匹配一個單詞。# 用來匹配 0 個或多個單詞
- headers:不依賴路由鍵的匹配,而是使用一個鍵值對匹配,幾乎不怎么用
隊列的排他性
排他隊列僅對首次聲明他的連接可見,並在連接斷開時自動刪除。該連接下所有的信道(Channel) 都可以使用它
虛擬主機的作用
多租戶場景,對外部而言各個虛擬主機是完全獨立的,A主機的交換器不能綁定B主機的隊列,權限也是隔離的
Qos
消費者消費速度限制,須配合手動 ack 一起使用,此時當 mq 檢測到某個 channel 未 ack 的消息達到閾值后,就不會推送消息到該 cahnnel
延時隊列
可以結合死信隊列 + 隊列過期時間,模擬延時隊列
如何保證消息不丟失
- 首先是消息本身:要確保寫了正確的交換機名、路由鍵名,同時可以設置 mandetory = true 使得消息沒找到合適的隊列時可以返回給生產者(這樣需要編碼時添加 ReturnListener 的編碼,不方便)。或者添加一個備份交換器,此交換器類型建議設置為 fanout,以保證消息可以被正確路由到隊列
- 消費者服務:設置 autoAck = false,程序處理完畢后手動 Ack
- Mq:開啟交換器、隊列、消息的持久化。同時為了避免消息在落盤之前因為 Mq 宕機而丟失,采用鏡像隊列機制(高可用 mq)可以最大程度規避此問題
- 生產者服務:為了確保消息成功發送到 mq ,一般有兩種方法
- 事物:開啟當前 channel 的事物機制,每發送一條消息 commit 一下,如果捕捉到異常則 rollback 同時重新發送該消息。該方法實現簡單,但會嚴重降低吞吐量(相比下一種降低 10 倍)
- 發送方確認機制:開啟當前 channel 的發送方確認機制之后,生產者發送消息之后,可以立刻或者批量或者異步等待 mq 響應 ack 或者 nack 消息,在收到 nack 消息后做重發操作,其中立即處理和事物機制吞吐差不多,批量會面臨一個消息 nack 這一批消息都重新發送的窘境,異步確認是最為推薦的機制
擴展
Federation 聯邦交換器
假設一種場景:業務A的 clientA 位於北京,需要往位於廣州的 exchangeA 發送一條消息,那網絡延遲對吞吐量的影響是不容小覷的,如果設置了事務或者開啟了消息發送確認,就更慢了
此時可以通過 Federation 插件解決,在 broker3 中為交換器 exchageA 與 broker1 建立一個單向的 Federation 連接,此時 F 會在 broker1 中創建一個同名交換器 exchangeA,同時創建一個內部交換器 exchageA -> broker3 B,還有一個隊列 f:eA -> b3 B。並與交換器 eA -> b3 B 綁定,F 插件會在隊列: f:eA -> b3 B 與 broker3 中的 exchangeA 建立 AMQP 連接來實時的消費隊列中的消息。
這樣部署在北京的生產者可以直接向 exchageA 發送消息,可以低延遲的收到回復。而后消息通過 F link 轉發到 broker3 的 exchangeA 中,由消費者進行消費
Federation Queue 聯邦隊列
聯邦隊列可以在多個 Broker 節點(或者集群)之間為單個隊列提供負載均衡的能力。一個聯邦隊列可以連接一個或多個上游隊列(upstream queue),並從上游隊列中獲取消息以滿足本地消費者的消費需求
如圖所示:隊列 queue1 和 queue2 原本在 broker2 中,由於某種需求將其配置為 fedarated queue 並將 broker1 設置為 upstream queue 。此時 federation 插件會在 broker1 上創建同名的隊列 queue1 和 queue2,當有消費者 clientA 連接 broker2 並消費 queue1 (queue2) 時,若隊列中有消息則會直接消費,如果隊列中沒有消息,那么它會通過 Federation 從 broker1 中的 queue1(queue2) 中拉取消息,然后存儲到本地。最后被 clientA 消費
同時 Federate Queue 支持雙向聯邦,一條消息可以在隊列中被轉發多次,以達到消息最終被轉發到某一個消費力更強的 broker 中從而被消費
Shovel 鏟子
與 Federation 具備的數據轉發功能類似,Shovel 能夠持續可靠的從一個 Broker 中的 queue 將消息轉發到當前或另一個 Broker 中的 exchange 中(與 F 不同的是它將消息由隊列轉發至交換機,而 F 類似於在 B1 創建了一個代理,B1 一開始什么都不需要有)
其原理是通過消費隊列中的數據同時將數據發送給交換器來實現數據轉發, Shovel 同時也支持源數據為交換器或者目標數據為隊列。實際上兩者都是通過補足虛擬的隊列或者交換器實現的
案例:消息堆積的治理
消息堆積嚴重時,可以選擇清空隊列,或者添加空消費者丟棄部分消息。但對於重要的數據而言,此舉不可行
另一種方案是增加下游的消費能力,但是這種優化代碼的方案在緊急時刻缺失“遠水解不了近渴”
那么合理的優化方案是(一備一):
- 創建一個額外的隊列 queue2,通過 shovel 與原隊列 queue1 綁定,當 queue1 中的消息達到閾值 A 時,通過 shovel 將消息轉發到 queue2,
- 當 queue1 中的消息減少到閾值 B 時,停止 shovel 轉發
- 當 queue1 中的消息減少到閾值 C 時,將 queue2 的消息又轉發到 queue1 中
- 當 queue1 中的消息增加到閾值 B 時,停止 shove 轉發。這樣 3 4 循環以逐步將多余消息消費
如果需要一備多的場景,可以使用鏡像隊列或 Federation
原理
存儲機制
不管是持久化還是非持久化的消息都可以被寫入到硬盤。持久化的消息在到達隊列時就被寫入到磁盤,並且如果可以,持久化的消息也在內存中保存一份備份,當內存吃緊的時候從內存中清除。非持久化的消息一般只保存在內存中,當內存吃緊的時候會被換入磁盤中,以節省內存空間。這兩種消息的落盤處理都在 RabbitMq 的“持久層”完成
“持久層”實際上是一個邏輯概念,實際包含兩個部分:隊列索引 rabbit_queue_index 和消息存儲 rabbit_msg_store
rabbit_queue_index 負責維護隊列中落盤的消息,包括消息的存儲地點、是否已交付給消費者、是否已 Ack,每個隊列與之對應的 rabbit_queue_index
rabbit_msg_store 以鍵值對的形式存儲消息,它被所有隊列共享,在每個節點有且只有一個
消息(包括消息體、屬性和 headers)可以存儲在兩者中的任意一個。一般通過 queue_index_embed_msg_below 配置一個大小閾值,較小的消息存儲在 rabbit_queue_index 中,較大的消息存儲在 rabbit_msg_store 中
隊列的結構
通常隊列由 rabbit_amqqueue_process 和 backing_queue 這兩部分組成,rabbit_amqqueue_process 負責協議的相關消息處理,backing_queue 是消息存儲的具體形式和引擎,消息入隊列之后,不是固定不變的,它會隨着系統的負載不斷的流動,有以下四種狀態
- alpha:消息內容和消息索引都存儲在內存中
- beta:消息索引存儲在內存中,消息內容存儲在磁盤中
- gamma:消息內容存儲在磁盤中,消息索引存儲在內存和磁盤中
- delta:消息內容和消息索引都存儲在磁盤中
對於持久化的消息,消息內容和消息索引必須先保存在磁盤上,才會處於上述狀態的一種,而 gamma 狀態的消息是只有持久化的消息才會有的狀態。對於 durable 為 true 的消息,在開啟 publish confirm 機制后,只有到了 gamma 狀態才會確認消息已被接收
如圖所示:Q1 和 Q4 僅存儲 alpha 狀態的消息,Q2 和 Q3 存儲 beta 和 gamma 狀態的消息,Detla 存儲 detla 狀態的消息,當消費者消費消息時,會先從 Q4 從獲取,如果成功則返回,如果 Q4 為空則按照一定的規則從上面的隊列中轉移消息到 Q4 后獲取
通常負載正常時,對於不需要保證消息可靠不丟失的情況,極有可能消息只處於 alpha 狀態。對於需要持久化的消息,只有當消息處於 gamma 狀態時才會確認消息已接收。
惰性隊列
惰性隊列會盡可能的將消息存儲在硬盤之中,而在消費者消費到相應的消息才會加載到內存中。惰性隊列會將接收到的消息直接存儲到文件系統中,而不管消息是持久化的還是非持久化的,這樣可以減少內存的損耗
流控
RabbitMq 的流控鏈如上圖所示
- rabbit_reader:connection 的處理進程,負責接收、解析 AMQP 協議數據包等
- rabbit_channel:Channel 的處理進程,負責處理 AMQP 協議中的各種方法,進行路由解析等
- rabbit_amqqueue_process:隊列的處理進程,負責實現隊列的所有邏輯
- rabbit_msg_store:負責實現隊列的持久化
當 connection 處於 flow 狀態,而 connection 沒有一個 channel 處於 flow 狀態,說明 channel 出現了性能瓶頸,一般是因為處理大量較小的非持久化消息時出現
當 connection 處於 flow 狀態,並且若干個 channel 處於 flow 狀態,但是沒有任何一個對應的隊列處於 flow 狀態。說明一個或多個隊列出現了性能瓶頸,這可能是將消息存入隊列時 CPU 占用過高,或者將消息持久化到磁盤時 I/O 過高,這種情況一般會在處理大量較小的持久化消息時出現
當 connection、channel、若干隊列都是 flow 狀態時,意味着在消息持久化時出現了性能瓶頸,這種情況一般在發送大量的較大持久化消息時最容易出現
打破隊列的瓶頸
向一個隊列中推送消息時,往往會在 rabbit_amqqueue_process(即隊列進程中)產生性能瓶頸。那如何破局,提高 rabbit 的性能呢
如圖所示,因為 rabbit_amqqueue_process 是隊列獨享的,而在代碼層面實現多個隊列會增加業務的復雜度,因此可以通過封裝拆分隊列的邏輯來解決
鏡像隊列
如果 RabbitMq 只有一個 Broker 節點,那么該節點的失效將會導致整體服務的暫時不可用,並且有可能導致消息的丟失。可以將消息設置為持久化,並且將消息所屬的隊列 durable 屬性設置為 true,但這仍無法避免緩存導致的問題,因為消息在發送之后到存盤之前有一個短暫的時間窗。通過 publish confirm 機制可以保證消息落盤后確認(前文有提到,broker 會在消息進入 gamma 階段也即消息體存盤、消息索引磁盤和內存都有的時候,通知生產者消息發送成功),盡管如此,我們仍不希望 Broker 單點導致的服務不可用問題
鏡像隊列機制可以將隊列鏡像到集群中的其他 broker 上,如果集群中的一個 broker 失效了,隊列能自動的切換到鏡像中的另外一個節點保證服務的可用性,每一個鏡像隊列都包含一個主節點 master,和若干個從節點 slave,相應的結構圖如下
slave 會准確按照 master 的執行命令的順序進行動作,如果 master 宕機,"資歷最老"(加入時間最長)的 slave 會提升成 master,發送到鏡像隊列的消息會同時發送給 master 和 slave(圖中實線),除發送消息外的所有動作只會和 master 打交道,然后由 master 同步給 slave(圖中虛線)。同步采用的是一種稱為組播 GM(Guaranteed Multicast) 的方式,GM模塊的實現是一種可靠的組播通訊協議,該協議能保證組播消息的原子性,即保證組中活着的節點要么都收到消息要么都收不到,它的實現大致如圖上所示,所有節點形成一個循環鏈表,master 發出的消息最終會再次收到,以此確認組中所有節點都收到。
可能有人會覺得,消費者都是從 master 讀取消息的,broker 之間是不是沒有得到有效的負載均衡?其實不然,負載均衡是對整個 broker 而言,對整個機器而言的,而消費者消費的是隊列,只要確保隊列的 master 節點均勻的散落在不同的 broker 上,即可確保很大程度的負載均衡
RabbitMq 的鏡像隊列機制同時支持事物和 publisher confirm 兩種機制,在事物機制中,只有當前事物在所有節點中都執行之后,才會返回 OK,同樣的在 publisher confirm 機制,只有當所有鏡像都接收該消息並處於 gamma 狀態時,才會通知生產者