了解一些 RabbitMQ 的實現原理也是很有必要的,它可以讓你在遇到問題時能透過現象看本質。
比如一個隊列的內部存儲其實是由5個子隊列來流轉運作的,隊列中的消息可以有4種不同的狀態等,通過這些可以明白在使用 RabbitMQ 時盡量不要有過多的消息堆積,不然會影響整體服務的性能。
存儲機制
RabbitMQ存儲層包含兩個部分:隊列索引和消息存儲。
RabbitMQ消息有兩種類型:持久化消息和非持久化消息,這兩種消息都會被寫入磁盤。
持久化消息在到達隊列時寫入磁盤,同時會內存中保存一份備份,當內存吃緊時,消息從內存中清除。這會提高一定的性能。非持久化消息一般只存於內存中,當內存吃緊時會被換入磁盤,以節省內存空間。
隊列索引:rabbit_queue_index(下文簡稱index)
index維護隊列的落盤消息的信息,如存儲地點、是否已被交付給消費者、是否已被消費者ack等。每個隊列都有相對應的index。
index使用順序的段文件來存儲,后綴為.idx,文件名從0開始累加,每個段文件中包含固定的segment_entry_count條記錄,默認值是16384。每個index從磁盤中讀取消息的時候,至少要在內存中維護一個段文件,所以設置queue_index_embed_msgs_below值得時候要格外謹慎,一點點增大也可能會引起內存爆炸式增長。
消息存儲:rabbit_msg_store(下文簡稱store)
store以鍵值的形式存儲消息,所有隊列共享同一個store,每個節點有且只有一個。
從技術層面上說,store還可分為msg_store_persistent和msg_store_transient,前者負責持久化消息的持久化,重啟后消息不會丟失;后者負責非持久化消息的持久化,重啟后消息會丟失。通常情況下,兩者習慣性的被當作一個整體。
store使用文件來存儲,后綴為.rdq,經過store處理的所有消息都會以追加的方式寫入到該文件中,當該文件的大小超過指定的限制(file_size_limit)后,將會關閉該文件並創建一個新的文件以供新的消息寫入。文件名從0開始進行累加。在進行消息的存儲時,RabbitMQ會在ETS(Erlang Term Storage)表中記錄消息在文件中的位置映射和文件的相關信息。
消息(包括消息頭、消息體、屬性)可以直接存儲在index中,也可以存儲在store中。最佳的方式是較小的消息存在index中,而較大的消息存在store中。這個消息大小的界定可以通過queue_index_embed_msgs_below來配置,默認值為4096B。當一個消息小於設定的大小閾值時,就可以存儲在index中,這樣性能上可以得到優化(可理解為數據庫的覆蓋索引和回表)。
讀取消息時,先根據消息的ID(msg_id)找到對應存儲的文件,如果文件存在並且未被鎖住,則直接打開文件,從指定位置讀取消息內容。如果文件不存在或者被鎖住了,則發送請求由store進行處理。
刪除消息時,只是從ETS表刪除指定消息的相關信息,同時更新消息對應的存儲文件和相關信息。在執行消息刪除操作時,並不立即對文件中的消息進行刪除,也就是說消息依然在文件中,僅僅是標記為垃圾數據而已。當一個文件中都是垃圾數據時可以將這個文件刪除。當檢測到前后兩個文件中的有效數據可以合並成一個文件,並且所有的垃圾數據的大小和所有文件(至少有3個文件存在的情況下)的數據大小的比值超過設置的閾值garbage_fraction(默認值0.5)時,才會觸發垃圾回收,將這兩個文件合並,執行合並的兩個文件一定是邏輯上相鄰的兩個文件。
隊列結構
通常隊列由rabbit_amqqueue_process和backing_queue這兩部分組成,rabbit_amqqueue_process負責協議相關的消息處理,即接收生產者發布的消息、向消費者交付消息、處理消息的確認(包括生產端的confirm和消費端的ack)等。backing_queue是消息存儲的具體形式和引擎,並向rabbit_amqqueue_process提供相關的接口以供調用。
如果消息投遞的目的隊列是空的,並且有消費者訂閱了這個隊列,那么該消息會直接發送給消費者,不會經過隊列這一步。當消息無法直接投遞給消費者時,需要暫時將消息存入隊列,以便重新投遞。
RabbitMQ的隊列消息有4種狀態:
-
alpha:消息索引和消息內容都存內存,最耗內存,很少消耗CPU
-
beta:消息索引存內存,消息內存存磁盤
-
gama:消息索引內存和磁盤都有,消息內容存磁盤
-
delta:消息索引和內容都存磁盤,基本不消耗內存,消耗更多CPU和I/O操作
消息存入隊列后,不是固定不變的,它會隨着系統的負載在隊列中不斷流動,消息的狀態會不斷發送變化。持久化的消息,索引和內容都必須先保存在磁盤上,才會處於上述狀態中的一種,gama狀態只有持久化消息才會有的狀態。
在運行時,RabbitMQ會根據消息傳遞的速度定期計算一個當前內存中能夠保存的最大消息數量(target_ram_count),如果alpha狀態的消息數量大於此值,則會引起消息的狀態轉換,多余的消息可能會轉換到beta、gama或者delta狀態。區分這4種狀態的主要作用是滿足不同的內存和CPU需求。
對於普通沒有設置優先級和鏡像的隊列來說,backing_queue的默認實現是rabbit_variable_queue,其內部通過5個子隊列Q1、Q2、delta、Q3、Q4來體現消息的各個狀態。
消費者獲取消息也會引起消息的狀態轉換。當消費者獲取消息時,首先會從Q4中獲取消息,如果獲取成功則返回。如果Q4為空,則嘗試從Q3中獲取消息,系統首先會判斷Q3是否為空,如果為空則返回隊列為空,即此時隊列中無消息。如果Q3不為空,則取出Q3中的消息,進而再判斷此時Q3和Delta中的長度,如果都為空,則可以認為 Q2、Delta、 Q3、Q4 全部為空,此時將Q1中的消息直接轉移至Q4,下次直接從 Q4 中獲取消息。如果Q3為空,Delta不為空,則將Delta的消息轉移至Q3中,下次可以直接從Q3中獲取消息。在將消息從Delta轉移到Q3的過程中,是按照索引分段讀取的,首先讀取某一段,然后判斷讀取的消息的個數與Delta中消息的個數是否相等,如果相等,則可以判定此時Delta中己無消息,則直接將Q2和剛讀取到的消息一並放入到Q3中,如果不相等,僅將此次讀取到的消息轉移到Q3。
這里就有兩處疑問,第一個疑問是:為什么Q3為空則可以認定整個隊列為空?試想一下,如果Q3為空,Delta不為空,那么在Q3取出最后一條消息的時候,Delta 上的消息就會被轉移到Q3這樣與 Q3 為空矛盾;如果Delta 為空且Q2不為空,則在Q3取出最后一條消息時會將Q2的消息並入到Q3中,這樣也與Q3為空矛盾;在Q3取出最后一條消息之后,如果Q2、Delta、Q3都為空,且Q1不為空時,則Q1的消息會被轉移到Q4,這與Q4為空矛盾。其實這一番論述也解釋了另一個問題:為什么Q3和Delta都為空時,則可以認為 Q2、Delta、Q3、Q4全部為空?
通常在負載正常時,如果消費速度大於生產速度,對於不需要保證可靠不丟失的消息來說,極有可能只會處於alpha狀態。對於持久化消息,它一定會進入gamma狀態,在開啟publisher confirm機制時,只有到了gamma 狀態時才會確認該消息己被接收,若消息消費速度足夠快、內存也充足,這些消息也不會繼續走到下一個狀態。
在系統負載較高時,消息若不能很快被消費掉,這些消息就會進入到很深的隊列中去,這樣會增加處理每個消息的平均開銷。因為要花更多的時間和資源處理“堆積”的消息,如此用來處理新流入的消息的能力就會降低,使得后流入的消息又被積壓到很深的隊列中,繼續增大處理每個消息的平均開銷,繼而情況變得越來越惡化,使得系統的處理能力大大降低。
要避免流控機制觸發,服務端默認配置是當內存使用達到40%,磁盤空閑空間小於50M,即啟動內存報警,磁盤報警;報警后服務端觸發流控(flowcontrol)機制。
一般地,當發布端發送消息速度快於訂閱端消費消息的速度時,隊列中堆積了大量的消息,導致報警,就會觸發流控機制。
觸發流控機制后,RabbitMQ服務端接收發布來的消息會變慢,使得進入隊列的消息減少;
與此同時RabbitMQ服務端的消息推送也會受到極大的影響,測試發現,服務端推送消息的頻率會大幅下降,等待下一次推送的時間,有時等1分鍾,有時5分鍾,甚至30分鍾。
一旦觸發流控,將導致RabbitMQ服務端性能惡化,推送消息也會變得非常緩慢;
因此要做好數據設計,使得發送速率和接收速率保持平衡,而不至於引起服務器堆積大量消息,進而引發流控。通過增加服務器集群節點,增加消費者,來避免流控發生,治標不治本,而且成本高。
應對這一問題一般有3種措施:
-
增加prefetch_count的值,即一次發送多條消息給消費者,加快消息被消費的速度。
-
采用multiple ack,降低處理 ack 帶來的開銷
-
流量控制
參考
【1】RabbitMQ存儲模型
【2】RabbitMQ存儲和隊列結構