【RabbitMQ學習記錄】- 消息隊列存儲機制源碼分析


本文來自 網易雲社區 。

 

RabbitMQ在金融系統,OpenStack內部組件通信和通信領域應用廣泛,它部署簡單,管理界面內容豐富使用十分方便。筆者最近在研究RabbitMQ部署運維和代碼架構,本篇文章主要記錄下RabbitMQ存儲機制相關內容和源碼分析。

一、RabbitMQ進程架構

Erlang是基於Actor模型的一門天然多進程、分布式和高並發的語言。一個Erlang虛擬機對應一個操作系統進程,一個Erlang進程調度器對應一個操作系統線程,一般來說,有多少個CPU核就有多少個調度器。

RabbitMQ是基於Erlang語言實現的一個分布式消息中間件。下圖是RabbitMQ基本的進程模型:

 

  • tcp_acceptor:負責接受客戶端連接,然后為客戶端連接創建rabbit_reader、rabbit_writer、rabbit_channel進程
  • rabbit_reader:負責解析客戶端AMQP幀,然后將請求發送給rabbit_channel進程
  • rabbit_writer:負責向客戶端返回數據
  • rabbit_channel:負責解析AMQP方法,以及對消息進行路由,然后發送給對應的隊列進程。
  • rabbit_amqqueue_process:rabbit隊列進程,該進程一般在rabbitmq創建隊列時被創建,其主要負責消息的接收/投遞邏輯
  • rabbit_msg_store:存儲服務器進程,主要負責消息的持久化存儲

上述進程中,tcp_acceptor和rabbit_msg_store只會有一個,rabbit_amqqueue_process進程的數量則和隊列數量保持一致,每個客戶端連接對應一個rabbit_reader和rabbit_writer進程,每一個連接的通道對應一個rabbit_channel進程。

通常來說,客戶端發起一條連接的同時,可以打開多條channel,相對連接的open/close來說,對channel進行open和close的操作開銷會更小。最佳實踐是一個生產者/消費者進程對應一個connection,具體發送一個線程對應一個channel即可。

二、消息存在哪里

RabbitMQ的消息持久化實際包括兩部分:隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。rabbit_queue_index負責維護隊列中落盤消息的信息,包括消息的存儲地點、是否已經被交付給消費者、是否已被消費者ack等,每個隊列都有一個與之對應的rabbit_queue_index。

rabbit_msg_store以鍵值對的形式存儲消息,每個節點有且只有一個,所有隊列共享。從技術層面講rabbit_msg_store又可以分為msg_store_persistent和msg_store_transient,其中msg_store_persistent負責持久化消息的存儲,不會丟失,而msg_store_transient負責非持久化消息的存儲,重啟后消息會丟失。

通過配置環境變量RABBITMQ_MNESIA_BASE可以指定存儲目錄,一般配置RABBITMQ_MNESIA_BASE=/srv/rabbitmq。

$ cd /srv/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@node1
$ ls -l
drwxr-xr-x 2 nqs nqs 12288 Jun  1 14:43 msg_store_persistent
drwxr-xr-x 2 nqs nqs  4096 Jul 25  2016 msg_store_transient
drwxr-xr-x 4 nqs nqs  4096 Jul 27  2016 queues
...

其中msg_store_transient、queues和msg_store_persistent就是實際消息的存儲目錄。

2.1 rabbit_msg_store存儲

RabbitMQ通過配置queue_index_embed_msgs_below可以指定根據消息存儲位置,默認queue_index_embed_msgs_below是4096字節(包含消息體、屬性及headers),小於該值的消息存在rabbit_queue_index中。

$ ls msg*
msg_store_persistent:
82680.rdq  97666.rdq

msg_store_transient:
0.rdq

經過rabbit_msg_store處理的消息都會以追加的方式寫入到文件中,文件名從0開始累加,后綴是.rdq,當一個文件的大小超過指定的限制(file_size_limit)后,關閉這個文件再創建一個新的文件存儲。 消息以以下格式存在於文件中:

<<Size:64, MsgId:16/binary, MsgBody>>

MsgId為RabbitMQ通過rabbit_guid:gen()每一個消息生成的GUID,MsgBody會包含消息對應的exchange,routing_keys,消息的內容,消息對應的協議版本,消息內容格式。

在進行消息存儲時,RabbitMQ會在ETS表中記錄消息在文件中的位置映射和文件的相關信息。讀取消息的時候先根據消息的msg_id找到對應的文件,如果文件存在且未被鎖住則直接打開文件,如果文件不存在或者鎖住了則發請求到rabbit_msg_store處理。

2.2 索引文件

查看索引信息

$ cd queues/DMX3PGVA4ZG3HHCXA0ULNIM6P
$ ls 
70083.idx  70084.idx  88155.idx  journal.jif

rabbit_queue_index順序存儲段文件,文件編號從0開始,后綴.idx,且每個段文件包含固定的SEGMENT_ENTRY_COUNT條記錄。SEGMENT_ENTRY_COUNT默認是16384,每個rabbit_queue_index從磁盤讀取消息的時候至少讀取一個段文件。

2.3 過期消息刪除

消息的刪除只是從ETS表刪除執行消息的相關信息,同時更新對應的存儲文件的相關信息,並不立即對文件中的消息進程刪除,后續會有專門的垃圾回收進程負責合並待回收消息文件。

當所有文件中的垃圾消息(已經被刪除的消息)比例大於閾值(GARBAGE_FRACTION = 0.5)時,會觸發文件合並操作(至少有三個文件存在的情況下),以提高磁盤利用率。

publish消息時寫入內容,ack消息時刪除內容(更新該文件的有用數據大小),當一個文件的有用數據等於0時,刪除該文件。

三、消息存儲過程源碼分析

消息流轉示意圖:

rabbit_channel進程確定了消息將要投遞的目標隊列,rabbit_amqqueue_process是隊列進程,每個隊列都有一個對應的進程,實際上rabbit_amqqueue_process進程只是提供了邏輯上對隊列的相關操作,他的真正操作是通過調用指定的backing_queue模塊提供的相關接口實現的,默認情況該backing_queue的實現模塊為rabbit_variable_queue。 RabbitMQ隊列中的消息隨着系統的負載會不斷的變化,一個消息可能會處於以下4種狀態:

%% Definitions:
%%
%% alpha: this is a message where both the message itself, and its
%%        position within the queue are held in RAM(消息本身和消息位置索引都只在內存中)
%%
%% beta:  this is a message where the message itself is only held on
%%        disk (if persisted to the message store) but its position
%%        within the queue is held in RAM.(消息本身存儲在磁盤中,但是消息的位置索引存在內存中)
%%
%% gamma: this is a message where the message itself is only held on
%%        disk, but its position is both in RAM and on disk.(消息本身存儲在磁盤中,但是消息的位置索引存在內存中和磁盤中)
%%
%% delta: this is a collection of messages, represented by a single
%%        term, where the messages and their position are only held on
%%        disk.(消息本身和消息的位置索引都值存儲在磁盤中)

對於普通的沒有設置優先級和鏡像的隊列來說,backing_queue的默認實現是rabbit_variable_queue,其內部通過5個子隊列Q1、Q2、Delta、Q3和Q4來實現這4個狀態的轉換,其關系如下圖所示:

 

其中Q1、Q4只包含alpha狀態的消息,Q2和Q3包含Beta和gamma狀態的消息,Delta只包含delta狀態的消息。具體消息的狀態轉換后續會進行源碼分析。

3.1 消息入隊分析

rabbit_amqqueue_process對消息的主要處理邏輯位於deliver_or_enqueue函數,該方法將消息直接傳遞給消費者,或者將消息存儲到隊列當中。

整體處理邏輯如下:

  1. 首先處理消息的mandory標志,和confirm屬性。mandatory標志告訴服務器至少將該消息route到一個隊列中,否則將消息返還給生產者。confirm則是消息的發布確認。
  2. 然后判斷隊列中是否有消費者正在等待,如果有則直接調用backing_queue的接口給客戶端發送消息。
  3. 如果隊列上沒有消費者,根據當前相關設置判斷消息是否需要丟棄,不需要丟棄的情況下調用backing_queue的接口將消息入隊。

deliver_or_enqueue函數代碼:

deliver_or_enqueue(Delivery = #delivery{message = Message,
                                        sender  = SenderPid,
                                        flow    = Flow},
                   Delivered, State = #q{backing_queue       = BQ,
                                         backing_queue_state = BQS}) ->
    %% 如果當前消息mandatory字段為true,則立刻通知該消息對應的rabbit_channel進程
    send_mandatory(Delivery), %% must do this before confirms
    %% 消息隊列記錄要confirm的消息,如果confirm為false,則不記錄要confirm(如果消息需要進行confirm,則將該消息的信息存入msg_id_to_channel字段中)
    {Confirm, State1} = send_or_record_confirm(Delivery, State),
    %% 得到消息特性特性數據結構
    Props = message_properties(Message, Confirm, State1),
    %% 讓backing_queue去判斷當前消息是否重復(rabbit_variable_queue沒有實現,直接返回的false)
    {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
    State2 = State1#q{backing_queue_state = BQS1},
    case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
                                             State2) of
        true ->
            State2;
        %% 已經將消息發送給消費者的情況
        {delivered, State3} ->
            State3;
        %% The next one is an optimisation
        %% 沒有消費者來取消息的情況(discard:拋棄)
        %% 當前消息沒有發送到對應的消費者,同時當前隊列中設置的消息過期時間為0,同時重新發送的exchange交換機為undefined,則立刻將該消息丟棄掉
        {undelivered, State3 = #q{ttl = 0, dlx = undefined,
                                  backing_queue_state = BQS2,
                                  msg_id_to_channel   = MTC}} ->
            %% 直接將消息丟棄掉,如果需要confirm的消息則立刻通知rabbit_channel進程進行confirm操作
            {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
            State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
        %% 沒有消費者來取消息的情況
        {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
            %% 將消息發布到backing_queue中
            BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
            %% 判斷當前隊列中的消息數量超過上限或者消息的占的空間大小超過上限
            {Dropped, State4 = #q{backing_queue_state = BQS4}} =
                maybe_drop_head(State3#q{backing_queue_state = BQS3}),
            %% 得到當前隊列中的消息數量
            QLen = BQ:len(BQS4),
            %% optimisation: it would be perfectly safe to always
            %% invoke drop_expired_msgs here, but that is expensive so
            %% we only do that if a new message that might have an
            %% expiry ends up at the head of the queue. If the head
            %% remains unchanged, or if the newly published message
            %% has no expiry and becomes the head of the queue then
            %% the call is unnecessary.
            case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
                %% 該情況是頭部沒有變化,同時消息隊列消息樹立不為一,則不管當前加入的消息是否設置有超時時間,都不執行drop_expired_msgs函數
                {false, false,         _} -> State4;
                %% 有丟棄消息,同時當前隊列中只有當前這個新的消息,同時消息自己的特性過期時間沒有定義,則不檢查消息過期
                %% 此時消息的頭部有變化,但是消息隊列中只有一個消息,該消息還沒有設置超時時間,則不執行drop_expired_msgs函數
                {true,  true,  undefined} -> State4;
                %% 當向隊列中插入消息后需要做檢查消息過期,同時設置定時器的操作只有三種情況
                %% 1.當消息頭部根據隊列上限有變化,同時消息插入后當前隊列消息數量為一,且該消息設置有過期時間,則需要做一次操作(該情況是消息頭部有刪除消息,都會進行一次消息過期檢查)
                %% 2.當消息頭部根據隊列上限有變化,同時消息插入后當前隊列消息數量不為一,且該消息設置有過期時間,則需要做一次操作(該情況是消息頭部有刪除消息,都會進行一次消息過期檢查)
                %% 3.當消息頭部根據隊列上限沒有變化,同時消息插入后當前隊列消息數量為一,不管消息有沒有過期時間,都要做一次操作(該情況下是當前隊列進入第一條消息)
                %% 最重要的是只要消息隊列的頭部消息有變化,則立刻執行drop_expired_msgs函數,將隊列頭部超時的消息刪除掉
                {_,     _,             _} -> drop_expired_msgs(State4)
            end
    end.

如果調用到該方法的BQ:publish則說明當前隊列沒有消費者正在等待,消息將進入到隊列。backing_queue實現了消息的存儲,他會盡力會durable=true的消息做持久化存儲。初始默認情況下,非持久化消息直接進入內存隊列,此時效率最高,當內存占用逐漸達到一個閾值時,消息和消息索引逐漸往磁盤中移動,隨着消費者的不斷消費,內存占用的減少,消息逐漸又從磁盤中被轉到內存隊列中。

消息在這些Queue中傳遞的"一般"過程q1->q2->delta->q3->q4,一般負載較輕的情況消息不需要走完每個Queue,大部分都可以跳過。rabbit_variable_queue中消息的入隊接口源碼如下:

%% 消息的發布接口
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
        MsgProps = #message_properties { needs_confirming = NeedsConfirming },
        IsDelivered, _ChPid, _Flow,
        State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
                           next_seq_id      = SeqId,
                           in_counter       = InCount,
                           durable          = IsDurable,
                           unconfirmed      = UC }) ->
    %% 只有持久化隊列和消息持久化才會對消息進行持久化
    IsPersistent1 = IsDurable andalso IsPersistent,
    %% 組裝消息狀態(該數據結構是實際存儲在隊列中的數據)
    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
    %% 如果隊列和消息都是持久化類型,則將消息內容和消息在隊列中的索引寫入磁盤
    {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
    %% 將消息狀態數據結構存入內存(如果Q3隊列不為空,則將新消息存入Q1隊列,如果為空則將新消息存入Q4隊列)
    State2 = case ?QUEUE:is_empty(Q3) of
                 %% 如果Q3隊列不為空,則將當前的消息寫入Q1隊列
                 false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
                 %% 如果Q3隊列為空,則將當前的消息寫入Q4隊列
                 true  -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
             end,
    %% 進入隊列中的消息數量加一
    InCount1 = InCount + 1,
    %% 如果消息需要確認,將該消息加入unconfirmed字段
    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
    %% 更新隊列進程中的狀態信息
    State3 = stats({1, 0}, {none, MsgStatus1},
                   %% 更新下一個消息在消息中的位置
                   State2#vqstate{ next_seq_id = SeqId + 1,
                                   in_counter  = InCount1,
                                   unconfirmed = UC1 }),
    %% RabbitMQ系統中使用的內存過多,此操作是將內存中的隊列數據寫入到磁盤中
    a(reduce_memory_use(maybe_update_rates(State3))).

消息入隊時先判斷Q3是否為空,如果Q3為空,則直接進入Q4,否則進入Q1,這里思考下為什么?

假如Q3為空,Delta一定為空,因為假如Delta不為空,那么Q3取出最后一個消息的時候Delta已經把消息轉移到Q3了,這樣Q3就不是空了,前后矛盾因此Delta一定是空的。同理可以推測出Q2、Q1都是空的,直接把消息放入Q4即可。

消息入隊后,需要判斷內存使用,調用reduce_memory_use函數:

reduce_memory_use(State = #vqstate {
                                    ram_pending_ack  = RPA,
                                    ram_msg_count    = RamMsgCount,
                                    target_ram_count = TargetRamCount,
                                    rates            = #rates { in      = AvgIngress,
                                                                out     = AvgEgress,
                                                                ack_in  = AvgAckIngress,
                                                                ack_out = AvgAckEgress } }) ->
    State1 = #vqstate { q2 = Q2, q3 = Q3 } =
                          %% 得到當前在內存中的數量超過允許在內存中的最大數量的個數
                          case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
                              0  -> State;
                              %% Reduce memory of pending acks and alphas. The order is
                              %% determined based on which is growing faster. Whichever
                              %% comes second may very well get a quota of 0 if the
                              %% first manages to push out the max number of messages.
                              S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
                                                     (AvgIngress - AvgEgress)) of
                                               %% ack操作進入的流量大於消息進入的流量,則優先將等待ack的消息寫入磁盤文件
                                               true  -> [
                                                         %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
                                                         fun limit_ram_acks/2,
                                                         %% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
                                                         fun push_alphas_to_betas/2
                                                        ];
                                               %% 消息進入的流量大於ack操作進入的消息流量,則優先將非等待ack的消息寫入磁盤文件
                                               false -> [
                                                         %% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
                                                         fun push_alphas_to_betas/2,
                                                         %% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
                                                         fun limit_ram_acks/2
                                                        ]
                                           end,
                                    %% 真正執行轉化的函數
                                    {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
                                                                       ReduceFun(QuotaN, StateN)
                                                              end, {S1, State}, Funs),
                                    State2
                          end,
    %% 當前beta類型的消息大於允許的beta消息的最大值,則將beta類型多余的消息轉化為deltas類型的消息
    case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
                    permitted_beta_count(State1)) of
        S2 when S2 >= ?IO_BATCH_SIZE ->
            %% 將S2個betas類型的消息轉化為deltas類型的消息
            push_betas_to_deltas(S2, State1);
        _  ->
            State1
    end.

%% 將Quota個alphas類型的消息轉化為betas類型的消息(Q1和Q4隊列都是alphas類型的消息)
push_alphas_to_betas(Quota, State) ->
    %% 將Q1隊列中消息轉化為betas類型的消息
    %% 如果磁盤中沒有消息,則將Q1中的消息存儲到Q3隊列,如果磁盤中有消息則將Q3隊列中的消息存儲到Q2隊列(將Q1隊列頭部的元素放入到Q2或者Q3隊列的尾部)
    {Quota1, State1} =
        push_alphas_to_betas(
          fun ?QUEUE:out/1,
          fun (MsgStatus, Q1a,
               %% 如果delta類型的消息的個數為0,則將該消息存入存入Q3隊列
               State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
                   State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
             %% 如果delta類型的消息個數不為0,則將該消息存入Q2隊列
             (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
                  State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
          end,
          Quota, State #vqstate.q1, State),
    %% 將Q4隊列中消息轉化為betas類型的消息(Q4 -> Q3)(將Q4隊列尾部的元素不斷的放入到Q3隊列的頭部)
    {Quota2, State2} =
        push_alphas_to_betas(
          fun ?QUEUE:out_r/1,
          fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
                   State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
          end,
          Quota1, State1 #vqstate.q4, State1),
    {Quota2, State2}.

%% 限制內存中的等待ack的消息(將消息內容在內存中的等待ack的消息的消息內容寫入磁盤文件)
limit_ram_acks(0, State) ->
    {0, State};

limit_ram_acks(Quota, State = #vqstate { ram_pending_ack  = RPA,
                                         disk_pending_ack = DPA }) ->
    case gb_trees:is_empty(RPA) of
        true ->
            {Quota, State};
        false ->
            %% 拿到隊列索引最大的消息
            {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
            %% 內存不足,強制性的將等待ack的SeqId消息內容寫入磁盤
            {MsgStatus1, State1} =
                maybe_write_to_disk(true, false, MsgStatus, State),
            %% 如果成功的將消息寫入磁盤,則將內存中的消息體字段清空
            MsgStatus2 = m(trim_msg_status(MsgStatus1)),
            %% 更新存儲在磁盤中等待ack的消息字段disk_pending_ack,將剛才從存儲在內存中等待ack的消息字段ram_pending_ack中的SeqId存儲到disk_pending_ack字段中
            DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
            %% 更新隊列狀態,同時更新最新的ram_pending_ack和disk_pending_ack字段
            limit_ram_acks(Quota - 1,
                           %% 主要是更新內存中保存的消息大小(ram_bytes減去當前寫入磁盤的消息的大小)
                           stats({0, 0}, {MsgStatus, MsgStatus2},
                                 State1 #vqstate { ram_pending_ack  = RPA1,
                                                   disk_pending_ack = DPA1 }))
    end.

每次入隊消息后,判斷RabbitMQ系統中使用的內存是否過多,此操作是嘗試將內存中的隊列數據寫入到磁盤中.
內存中的消息數量(RamMsgCount)及內存中的等待ack的消息數量(RamAckIndex)的和大於允許的內存消息數量(TargetRamCount)時,多余數量的消息內容會被寫到磁盤中.

3.2 消息出隊源碼分析

獲取消息:

  1. 嘗試從q4隊列中獲取一個消息,如果成功,則返回獲取到的消息,如果失敗,則嘗試通過試用fetch_from_q3/1從q3隊列獲取消息,成功則返回,如果為空則返回空;
  2. 注意fetch_from_q3從Q3獲取消息,如果Q3為空,則說明整個隊列都是空的,無消息,消費者等待即可。

 

取出消息后:

  1. 如果Q4不為空,取出消息后直接返回;
  2. 如果Q4為空,Q3不為空,從Q3取出消息后,判斷Q3是否為空,如果Q3為空,Delta不為空,則將Delta中的消息轉移到Q3中,下次直接從Q3消費;
  3. 如果Q3和Delta都是空的,則可以任務Delta和Q2的消息都是空的,此時將Q1的消息轉移到Q4,下次直接從Q4消費即可。
%% 從隊列中獲取消息
queue_out(State = #vqstate { q4 = Q4 }) ->
    %% 首先嘗試從Q4隊列中取得元素(Q4隊列中的消息類型為alpha)
    case ?QUEUE:out(Q4) of
        {empty, _Q4} ->
            %% 如果Q4隊列為空則從Q3隊列中取得元素(如果Q3也為空,則直接返回空)
            case fetch_from_q3(State) of
                {empty, _State1} = Result     -> Result;
                {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
            end;
        {{value, MsgStatus}, Q4a} ->
            {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
    end.

%% 從隊列Q3中讀取消息
fetch_from_q3(State = #vqstate { q1    = Q1,
                                 q2    = Q2,
                                 delta = #delta { count = DeltaCount },
                                 q3    = Q3,
                                 q4    = Q4 }) ->
    %% 先從Q3隊列中取元素(如果為空,則直接返回為空)
    case ?QUEUE:out(Q3) of
        {empty, _Q3} ->
            {empty, State};
        {{value, MsgStatus}, Q3a} ->
            State1 = State #vqstate { q3 = Q3a },
            State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                         {true, true} ->
                             %% 當這兩個隊列都為空時,可以確認q2也為空,也就是這時候,q2,q3,delta,q4都為空,那么,q1隊列的消息可以直接轉移到q4,下次獲取消息時就可以直接從q4獲取
                             %% q3 is now empty, it wasn't before;
                             %% delta is still empty. So q2 must be
                             %% empty, and we know q4 is empty
                             %% otherwise we wouldn't be loading from
                             %% q3. As such, we can just set q4 to Q1.
                             %% 當Q3隊列為空,且磁盤中的消息數量為空,則斷言Q2隊列為空
                             true = ?QUEUE:is_empty(Q2), %% ASSERTION
                             %% 當Q3隊列為空,且磁盤中的消息數量為空,則斷言Q4隊列為空
                             true = ?QUEUE:is_empty(Q4), %% ASSERTION
                             %% 從Q3隊列中取走消息后發現Q3隊列為空,同時磁盤中沒有消息,則將Q1隊列中的消息放入Q4隊列
                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
                         {true, false} ->
                             %% 從Q3隊列中取走消息后發現Q3隊列為空,q3空,delta非空,這時候就需要從delta隊列(內容與索引都在磁盤上,通過maybe_deltas_to_betas/1調用)讀取消息,並轉移到q3隊列
                             maybe_deltas_to_betas(State1);
                         {false, _} ->
                             %% q3非空,直接返回,下次獲取消息還可以從q3獲取
                             %% q3 still isn't empty, we've not
                             %% touched delta, so the invariants
                             %% between q1, q2, delta and q3 are
                             %% maintained
                             State1
                     end,
            {loaded, {MsgStatus, State2}}
    end.
轉移Delta消息到Q3源碼分析:

%% 從磁盤中讀取隊列數據到內存中來(從隊列消息中最小索引ID讀取出一個索引磁盤文件大小的消息索引信息)
%% 從隊列索引的磁盤文件將單個磁盤文件中的消息索引讀取出來
%% 該操作是將單個隊列索引磁盤文件中的deltas類型消息轉換為beta類型的消息
maybe_deltas_to_betas(State = #vqstate {
                                        q2                   = Q2,
                                        delta                = Delta,
                                        q3                   = Q3,
                                        index_state          = IndexState,
                                        ram_msg_count        = RamMsgCount,
                                        ram_bytes            = RamBytes,
                                        ram_pending_ack      = RPA,
                                        disk_pending_ack     = DPA,
                                        qi_pending_ack       = QPA,
                                        disk_read_count      = DiskReadCount,
                                        transient_threshold  = TransientThreshold }) ->
    #delta { start_seq_id = DeltaSeqId,
             count        = DeltaCount,
             end_seq_id   = DeltaSeqIdEnd } = Delta,
    %% 根據delta中的開始DeltaSeqId得到存在索引磁盤的最小的磁盤索引號
    DeltaSeqId1 =
        lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
                   DeltaSeqIdEnd]),
    %% 從隊列索引中讀取消息索引(從隊列索引的磁盤文件將單個磁盤文件中的消息索引讀取出來)
    {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
                                                  IndexState),
    %% 過濾掉從rabbit_queue_index中讀取過來的消息隊列索引(如果該消息不是持久化的則需要刪除掉),最后得到當前內存中准備好的消息個數以及內存中的消息的總的大小
    {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
        %% RabbitMQ系統關閉以前非持久化消息存儲到磁盤中的索引信息再從磁盤讀取出來的時候必須將他們徹底從RabbitMQ系統中刪除
        betas_from_index_entries(List, TransientThreshold,
                                 RPA, DPA, QPA, IndexState1),
    %% 更新隊列消息索引結構,內存中隊列中的消息個數,隊列內存中消息占的大小,以及從磁盤文件讀取的次數
    State1 = State #vqstate { index_state       = IndexState2,
                              ram_msg_count     = RamMsgCount   + RamCountsInc,
                              ram_bytes         = RamBytes      + RamBytesInc,
                              disk_read_count   = DiskReadCount + RamCountsInc},
    case ?QUEUE:len(Q3a) of
        0 ->
            %% we ignored every message in the segment due to it being
            %% transient and below the threshold
            %% 如果讀取的當前消息隊列索引磁盤文件中的操作項為空,則繼續讀下一個消息索引磁盤文件中的操作項
            maybe_deltas_to_betas(
              State1 #vqstate {
                               delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
        Q3aLen ->
            %% 將從索引中讀取出來的消息索引存儲到Q3隊列(將新從磁盤中讀取的消息隊列添加到老的Q3隊列的后面)
            Q3b = ?QUEUE:join(Q3, Q3a),
            case DeltaCount - Q3aLen of
                0 ->
                    %% 如果讀取出來的長度和隊列索引的總長度相等,則delta信息被重置為消息個數為0,同時q2中的消息轉移到q3隊列
                    %% delta is now empty, but it wasn't before, so
                    %% can now join q2 onto q3
                    State1 #vqstate { q2    = ?QUEUE:new(),
                                      delta = ?BLANK_DELTA,
                                      %% 如果磁盤中已經沒有消息,則將Q2隊列中的消息放入Q3隊列
                                      q3    = ?QUEUE:join(Q3b, Q2) };
                N when N > 0 ->
                    %% 得到最新的隊列消息磁盤中的信息
                    Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
                                        count        = N,
                                        end_seq_id   = DeltaSeqIdEnd }),
                    %% 更新最新的q3隊列和磁盤信息結構
                    State1 #vqstate { delta = Delta1,
                                      q3    = Q3b }
            end
    end.

問題1:為什么Q4,Q3空,隊列就為空?

消費Q3最后一條消息的時候,會調用函數maybe_deltas_to_betas,將磁盤上Delta狀態的消息轉移到Q3,現在Q3是空的,那么Delta狀態的消息一定是空的,否則消息會轉移到Q3;

Delta消息是空的,上述代碼中:

State1 #vqstate { q2  = ?QUEUE:new(),
delta = ?BLANK_DELTA,
%% 如果磁盤中已經沒有消息,則將Q2隊列中的消息放入Q3隊列
q3    = ?QUEUE:join(Q3b, Q2) };

會將Q2隊列的消息轉移到Q3,現在Q3是空的,那么Q2中消息肯定是空的;

現在Q2、Q3、Delta和Q4都是空的,看代碼:

State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
                         {true, true} ->
                             true = ?QUEUE:is_empty(Q2), 
                             true = ?QUEUE:is_empty(Q4), 
                             %% 從Q3隊列中取走消息后發現Q3隊列為空,同時磁盤中沒有消息,則將Q1隊列中的消息放入Q4隊列
                             State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };

會將Q1消息轉移到Q4,現在Q4是空的,Q1肯定沒有消息了。

綜上所述,Q3和Q4都是空的,那該隊列無消息!

 

問題2:為什么q4,q3,delta為空的時候,q2必空?

在問題1中已經分析了,Delta消息為空的時候會將Q2放入Q3中,現在Q3是空的,可以反向推出Q2肯定是空的。

 

問題3:為什么Q4、Q3和delta為空的時候,q1不為空會直接轉移到q4?

根據定義Q1和Q4存儲的消息是處於內存中的alpha狀態的消息,這時候直接從Q1轉到Q4就不需要經過磁盤,減少IO延遲;

rabbit_variable_queue.erl源碼關於轉換狀態還有很多細節,這里不再介紹。后續深入學習源碼后再分析。

 

四、總結

節點消息堆積較多時,這些堆積的消息很快就會進入很深的隊列中去,這樣會增加處理每個消息的平均開銷,整個系統的處理能力就會降低。因為要花更多的時間和資源處理堆積的消息,后流入的消息又被擠壓到很深的隊列中了,系統負載越來越惡化。

因此RabbitMQ使用時一定要注意磁盤占用監控和流控監控,這些在控制台上都可以看到,一般來說如果消息堆積過多建議增加消費者或者增強每個消費者的消費能力(比如調高prefetch_count消費者一次收到的消息可以提高單個消費者消費能力)。

 

參考文章:
RabbitMQ源碼分析 - 隊列機制

RabbitMQ實戰指南

RabbitMQ官方文檔

 

本文已由作者李海燕授權網易雲社區發布。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM