存儲機制
RabbitMQ消息有兩種類型:持久化消息和非持久化消息。
這兩種消息都會被寫入磁盤。
持久化消息在到達隊列時寫入磁盤,同時會內存中保存一份備份,當內存吃緊時,消息從內存中清除。這會提高一定的性能。
非持久化消息一般只存於內存中,當內存吃緊時會被換入磁盤,以節省內存空間。
RabbitMQ存儲層包含兩個部分:隊列索引和消息存儲,如下圖
隊列索引:rabbit_queue_index(下文簡稱index)
index維護隊列的落盤消息的信息,如存儲地點、是否已被交付給消費者、是否已被消費者ack等。每個隊列都有相對應的index。
index使用順序的段文件來存儲,后綴為.idx,文件名從0開始累加,每個段文件中包含固定的segment_entry_count條記錄,默認值是16384。每個index從磁盤中讀取消息的時候,至少要在內存中維護一個段文件,所以設置queue_index_embed_msgs_below值得時候要格外謹慎,一點點增大也可能會引起內存爆炸式增長。
消息存儲:rabbit_msg_store(下文簡稱store)
store以鍵值的形式存儲消息,所有隊列共享同一個store,每個節點有且只有一個。從技術層面上說,store還可分為msg_store_persistent和msg_store_transient,前者負責持久化消息的持久化,重啟后消息不會丟失;后者負責非持久化消息的持久化,重啟后消息會丟失。通常情況下,兩者習慣性的被當作一個整體。
store使用文件來存儲,后綴為.rdq,經過store處理的所有消息都會以追加的方式寫入到該文件中,當該文件的大小超過指定的限制(file_size_limit)后,將會關閉該文件並創建一個新的文件以供新的消息寫入。文件名從0開始進行累加。在進行消息的存儲時,RabbitMQ會在ETS(Erlang Term Storage)表中記錄消息在文件中的位置映射和文件的相關信息。
消息(包括消息頭、消息體、屬性)可以直接存儲在index中,也可以存儲在store中。最佳的方式是較小的消息存在index中,而較大的消息存在store中。這個消息大小的界定可以通過queue_index_embed_msgs_below來配置,默認值為4096B。當一個消息小於設定的大小閾值時,就可以存儲在index中,這樣性能上可以得到優化(可理解為數據庫的覆蓋索引和回表)。
讀取消息時,先根據消息的ID(msg_id)找到對應存儲的文件,如果文件存在並且未被鎖住,則直接打開文件,從指定位置讀取消息內容。如果文件不存在或者被鎖住了,則發送請求由store進行處理。
刪除消息時,只是從ETS表刪除指定消息的相關信息,同時更新消息對應的存儲文件和相關信息。在執行消息刪除操作時,並不立即對文件中的消息進行刪除,也就是說消息依然在文件中,僅僅是標記為垃圾數據而已。
當一個文件中都是垃圾數據時可以將這個文件刪除。當檢測到前后兩個文件中的有效數據可以合並成一個文件,並且所有的垃圾數據的大小和所有文件(至少有3個文件存在的情況下)的數據大小的比值超過設置的閾值garbage_fraction(默認值0.5)時,才會觸發垃圾回收,將這兩個文件合並,執行合並的兩個文件一定是邏輯上相鄰的兩個文件。合並邏輯:
-
鎖定這兩個文件
-
先整理前面的文件的有效數據,再整理后面的文件的有效數據
-
將后面文件的有效數據寫入到前面的文件中
-
更新消息在ETS表中的記錄
-
刪除后面文件
隊列結構
通常隊列由rabbit_amqqueue_process和backing_queue這兩部分組成
rabbit_amqqueue_process負責協議相關的消息處理,即接收生產者發布的消息、向消費者交付消息、處理消息的確認(包括生產端的confirm和消費端的ack)等
backing_queue是消息存儲的具體形式和引擎,並向rabbit_amqqueue_process提供相關的接口以供調用。
如果消息投遞的目的隊列是空的,並且有消費者訂閱了這個隊列,那么該消息會直接發送給消費者,不會經過隊列這一步。當消息無法直接投遞給消費者時,需要暫時將消息存入隊列,以便重新投遞。
RabbitMQ的隊列消息有4種狀態:
-
alpha:消息索引和消息內容都存內存,最耗內存,很少消耗CPU
-
beta:消息索引存內存,消息內存存磁盤
-
gama:消息索引內存和磁盤都有,消息內容存磁盤
-
delta:消息索引和內容都存磁盤,基本不消耗內存,消耗更多CPU和I/O操作
消息存入隊列后,不是固定不變的,它會隨着系統的負載在隊列中不斷流動,消息的狀態會不斷發送變化。
持久化的消息,索引和內容都必須先保存在磁盤上,才會處於上述狀態中的一種,gama狀態只有持久化消息才會有的狀態。
在運行時,RabbitMQ會根據消息傳遞的速度定期計算一個當前內存中能夠保存的最大消息數量(target_ram_count),如果alpha狀態的消息數量大於此值,則會引起消息的狀態轉換,多余的消息可能會轉換到beta、gama或者delta狀態。區分這4種狀態的主要作用是滿足不同的內存和CPU需求。
對於普通沒有設置優先級和鏡像的隊列來說,backing_queue的默認實現是rabbit_variable_queue,其內部通過5個子隊列Q1、Q2、delta、Q3、Q4來體現消息的各個狀態。
消費者獲取消息也會引起消息的狀態轉換。
當消費者獲取消息時,首先會從Q4中獲取消息,如果獲取成功則返回。如果Q4為空,則嘗試從Q3中獲取消息,系統首先會判斷Q3是否為空,如果為空則返回隊列為空,即此時隊列中無消息。如果Q3不為空,則取出Q3中的消息,進而再判斷此時Q3和Delta中的長度,如果都為空,則可以認為 Q2、Delta、 Q3、Q4 全部為空,此時將Q1中的消息直接轉移至Q4,下次直接從 Q4 中獲取消息。如果Q3為空,Delta不為空,則將Delta的消息轉移至Q3中,下次可以直接從Q3中獲取消息。在將消息從Delta轉移到Q3的過程中,是按照索引分段讀取的,首先讀取某一段,然后判斷讀取的消息的個數與Delta中消息的個數是否相等,如果相等,則可以判定此時Delta中己無消息,則直接將Q2和剛讀取到的消息一並放入到Q3中,如果不相等,僅將此次讀取到的消息轉移到Q3。
這里就有兩處疑問,第一個疑問是:為什么Q3為空則可以認定整個隊列為空?試想一下,如果Q3為空,Delta不為空,那么在Q3取出最后一條消息的時候,Delta 上的消息就會被轉移到Q3這樣與 Q3 為空矛盾;如果Delta 為空且Q2不為空,則在Q3取出最后一條消息時會將Q2的消息並入到Q3中,這樣也與Q3為空矛盾;在Q3取出最后一條消息之后,如果Q2、Delta、Q3都為空,且Q1不為空時,則Q1的消息會被轉移到Q4,這與Q4為空矛盾。其實這一番論述也解釋了另一個問題:為什么Q3和Delta都為空時,則可以認為 Q2、Delta、Q3、Q4全部為空?
通常在負載正常時,如果消費速度大於生產速度,對於不需要保證可靠不丟失的消息來說,極有可能只會處於alpha狀態。對於持久化消息,它一定會進入gamma狀態,在開啟publisher confirm機制時,只有到了gamma 狀態時才會確認該消息己被接收,若消息消費速度足夠快、內存也充足,這些消息也不會繼續走到下一個狀態。
在系統負載較高時,消息若不能很快被消費掉,這些消息就會進入到很深的隊列中去,這樣會增加處理每個消息的平均開銷。因為要花更多的時間和資源處理“堆積”的消息,如此用來處理新流入的消息的能力就會降低,使得后流入的消息又被積壓到很深的隊列中,繼續增大處理每個消息的平均開銷,繼而情況變得越來越惡化,使得系統的處理能力大大降低。 應對這一問題一般有3種措施:
-
增加prefetch_count的值,即一次發送多條消息給消費者,加快消息被消費的速度。
-
采用multiple ack,降低處理 ack 帶來的開銷
-
流量控制
集群原理
RabbitMQ分布式部署有3種方式:集群、Federation和Shovel
這三種方式並不是互斥的,可以根據需求選擇相互組合來達到目的,后兩者都是以插件的形式進行設計,復雜性相對高
集群
部署RabbitMQ的機器稱為節點(broker)。broker有2種類型節點:磁盤節點和內存節點。顧名思義,磁盤節點的broker把元數據存儲在磁盤中,內存節點把元數據存儲在內存中,很明顯,磁盤節點的broker在重啟后元數據可以通過讀取磁盤進行重建,保證了元數據不丟失,內存節點的broker可以獲得更高的性能,但在重啟后元數據就都丟了。元數據包含以下內容:
-
queue元數據:queue名稱、屬性
-
exchange:exchange名稱、屬性(注意此處是exchange本身)
-
binding元數據:exchange和queue之間、exchange和exchange之間的綁定關系
-
vhost元數據:vhost內部的命名空間、安全屬性數據等
隊列所在的節點稱為宿主節點。
隊列創建時,只會在宿主節點創建隊列的進程,宿主節點包含完整的隊列信息,包括元數據、狀態、內容等等。因此,只有隊列的宿主節點才能知道隊列的所有信息。
隊列創建后,集群只會同步隊列和交換器的元數據到集群中的其他節點,並不會同步隊列本身,因此非宿主節點就只知道隊列的元數據和指向該隊列宿主節點的指針。
這樣的設計,保證了不論從哪個broker中均可以消費所有隊列的數據,並分擔了負載,因此,增加broker可以線性提高服務的性能和吞吐量。
但該方案也有顯著的缺陷,那就是不能保證消息不會丟失。當集群中某一節點崩潰時,崩潰節點所在的隊列進程和關聯的綁定都會消失,附加在那些隊列上的消費者也會丟失其訂閱信息,匹配該隊列的新消息也會丟失。
崩潰節點重啟后,需要從磁盤節點中同步元數據信息,並重建隊列,所以,集群中要求必須至少有一個broker為磁盤節點,以保證集群的可用性。
為何集群不將隊列內容和狀態復制到所有節點上呢?有2個原因:
-
如果包含了完整隊列,那么所有節點將會是同樣的數據拷貝,也就是所有節點均互為鏡像,無法拓寬負載,延展性不高
-
每次的同步都會讓消息同步到其他節點上並落盤,引發大量的網絡IO和磁盤IO,無法提升性能。(此處可以引申思考一下kafka中replica的分配方式)
如果磁盤節點崩潰了,集群依然可以繼續路由消息(因為其他節點元數據在還存在),但無法做以下操作:
-
創建隊列、交換器、綁定
-
添加用戶
-
更改權限
-
添加、刪除集群節點
也就是說,唯一的磁盤節點崩潰后,為了保證可用性,禁用了和元數據相關的添加、修改和刪除操作。所以建立集群的時候,建議保證有2個以上的磁盤節點。
集群添加或者刪除節點時,會把變更通知到至少一個磁盤節點。在內存節點重啟后,會先從磁盤節點中同步元數據,內存節點中唯一存儲到磁盤的是磁盤節點的地址。
RabbitMQ采用鏡像隊列的方式保證隊列的可靠性。鏡像隊列就是將隊列鏡像到集群中的其他節點,如果集群中一個節點失效了,隊列就能自動的切換到鏡像中的節點,一個鏡像隊列中包含有1個主節點master和若干個從節點slave。其主從節點包含如下幾個特點:
-
消息的讀寫都是在master上進行,並不是讀寫分離
-
master接收命令后會向salve進行組播,salve會命令執行順序執行
-
master失效,根據節點加入的時間,最老的slave會被提升為master
-
互為鏡像的是隊列,並非節點,集群中可以不同節點可以互為鏡像隊列,也就是說隊列的master可以分布在不同的節點上
與Kafka的差異
RabbitMQ第一版發布於2007年,其構思的本質的原因是AMQP的出現。AMQP出現之前各家的MQ產品百花齊放,但也因此導致整合非常困難,沒有形成統一的消息總線,在AMQP神兵天降之后RabbitMQ就開啟了它的職業生涯。
Kafka起源於LinkedIn,開源於2011年,目標是為了幫助處理持續數據流而設計的組件,在嘗試了消息系統、數據聚合、ETL工具等方式后,均無法滿足其需求,因此Kafka就誕生了。
RabbitMQ遵循了AMQP協議,擁有比較完善的消息交換模型:
-
支持生產者消費者模式和發布訂閱模式
-
支持消息的ack機制:顯式ack和自動ack
-
支持多租戶
-
支持權限配置
-
支持死信隊列
-
支持消息超時機制
-
...
Kafka作為一個分布式流式組件:
-
支持生產者消費者模式和發布訂閱模式
-
支持流回放
-
支持分批次寫入
-
支持分片
-
支持副本策略
-
支持數據保存策略
-
...
以上特性可以分別看出兩者使用場景上的差別,下面補充一下兩者針對消息存儲和消費的差異性。
存儲方式
RabbitMQ將消息存儲於隊列(Queue)中,消費者確認收到消息后(返回ack)才會移除消息。
Kafka將消息存於主題(Topic)中,並分片分散在各個節點,提高了並行率。其存儲策略可配置時間策略(如消息最長保存7天)或者空間策略(如消息最多保存10G)。
有序性
RabbitMQ中隊列是先進先出隊列,因此保證了消息的有序性。
Kafka如果分片配置為1,則消息也保證了有序性,但卻降低了吞吐率;如果分片配置為多份,則只能保證每個分片里的數據是有序的,無法保證整個分片是有序的。
消息ACK機制
RabbitMQ如果沒有配置自動ack,消息或者隊列也沒有設置TTL,則MQ將會一直等待消費者顯式響應ack后才會將消息移除,否則消息將一直存着,這種情況下,如果出現消費者崩潰或者消費速率低於生產速率等情況,會導致消息堆積占用內存,時間一長將蔓延影響到生產者生產數據。
Kafka不支持消息的ack,但提供了消息偏移量offset,消費者根據offset獲取消息,且支持消息回放(重復消費)。
集群
RabbitMQ有多種集群方式分為兩大類:非鏡像集群和鏡像集群。在可靠性上,RabbitMQ使用集群+帶有負載均衡的軟硬件(如HAProxy)組件實現。在彈性拓展上,非鏡像集群,拓展節點可線性提高性能,但由於並非所有節點都存儲隊列本身,因此如果某一個節點故障了,該節點的數據將會丟失。鏡像集群,隊列數據將同步到其他節點,保證了可用性,但同時也增加了網絡和磁盤的負載,損失了性能。Kafka
Kafka在設計時支持分片和副本策略,該架構保存消息會消息分散到在不同的節點,在擁有可靠性的同時也有較好的拓展能力,也因此,但因依賴了ZooKeeper,且需要保證節點數為奇數個。