消息存儲:
message主要存儲在RAM和disk里面。
所有隊列中的消息都以append的方式寫到一個文件中,當這個文件的大小超過指定的限制大小后,關閉這個文件再創建一個新的文件供消息的寫入。文件名(*.rdq)從0開始然后依次累加。
在進行消息的存儲時,rabbitmq會在ets表中記錄消息在文件中的映射,以及文件的相關信息。消息讀取時,根據消息ID找到該消息所存儲的文件,在文件中的偏移量,然后打開文件進行讀取。
rabbitmq在啟動時會創建msg_store_persistent,msg_store_transient兩個進程,一個用於持久消息的存儲,一個用於內存不夠時,將存儲在內存中的非持久化數據轉存到磁盤中。所有隊列的消息的寫入和刪除最終都由這兩個進程負責處理,而消息的讀取則可能是隊列本身直接打開文件進行讀取,也可能是發送請求由msg_store_persisteng/msg_store_transient進程進行處理。
Ets數據結構:
-record(msg_location,{ msg_id, //消息ID ref_count, //引用計數 file, //消息存儲的文件名 offset, //消息在文件中的偏移量 total_size //消息的大小 }).
GC過程
消息的刪除只是從flying_ets表刪除指定消息的相關信息,同時更新消息對應存儲的文件的相關信息、更新文件有效數據大小。當垃圾數據超過一定比例后(默認比例為40%),rabbitmq觸發垃圾回收。垃圾回收會先找到符合要求的兩個文件(根據#file_summary{}中left,right找邏輯上相鄰的兩個文件,並且兩個文件的有效數據可在一個文件中存儲),然后鎖定這兩個文件,並先對左邊文件的有效數據進行整理,再將右邊文件的有效數據寫入到左邊文件,同時更新消息的相關信息(存儲的文件,文件中的偏移量)、文件的相關信息(文件的有效數據,左邊文件,右邊文件),最后將右邊的文件刪除。
消息隊列結構:
通常隊列由兩部分組成:一部分是AMQQueue,負責AMQP協議相關的消息處理,即接收生產者發布的消息、向消費者投遞消息、處理消息confirm、acknowledge等等;另一部分是BackingQueue,它提供了相關的接口供AMQQueue調用,完成消息的存儲以及可能的持久化工作等。
基本經歷RAM->DISK->RAM這樣的過程。這樣設計的好處是:當隊列負載很高的情況下,能夠通過將一部分消息由磁盤保存來節省內存空間,當負載降低的時候,這部分消息又漸漸回到內存,被消費者獲取,使得整個隊列具有很好的彈性。
RabbitMQ高可用時采用鏡像隊列:
集群原理
1. 基本概念
1) 鏡像隊列(Mirrored Queue)
RabbitMQ集群的隊列(Queue)在默認的情況下只存在單一節點(node)上。我們也可以把隊列配置成同時存在在多個節點上,也就是說隊列可以被鏡像到多個節點上。發布(publish)到鏡像隊列上的消息(message)會被復制(replicated)到所有的節點上。一個鏡像隊列包含一個主(master)和多個從(slave)。
2) 非同步的Slave(unsynchronised slave)
在rabbitmq中同步(synchronised)是用來描述master和slave之間的數據狀態是否一致的。如果slave包含master中的所有message,則這個slave是synchronised,如果這個slave並沒有包含master中所有的message,則這個slave是unsynchronised。
3) 在什么情況下會出現unsynchronisedslave?
當一個新slave加入到一個鏡像隊列時,這時這個新slave是空的,而master中這時可能包含之前接收到的消息。假設這時master包含了N條消息,這是第N+1條消息被添加到這個鏡像隊列中,這個新slave會從這個第N+1條消息開始接收。此時這個slave就是unsynchronised slave。隨着前5條消息從鏡像隊列中被消費掉(consumed), 這個slave變成了synchronised。
slave 重新加入(rejoin)到鏡像隊列時,也會出現非同步的情況。一個slave要重新加入鏡像隊列之前,slave可能已經接收了一些消息,要重新加入鏡像隊列,就要清空自己之前已經接收的所有消息,好像自己是第一次加入隊列一樣。(slave在很多情況下會需要重新加入鏡像隊列,例如:網絡分區(networkpartition))