說起消息重入隊列還得從隊列注冊消費者說起,客戶端在向隊列注冊消費者之后,創建的channel也會被主隊列進程monitor,當channel掛掉后,主隊列進程(rabbit_amqqueue_process)收到'DOWN'通知,將未ack的消息重入隊列,並根據消息的deliver tag,也就是消費入隊列的順序,將消息重入隊列中
主要代碼如下:
1.注冊消費者
handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ConsumerTag,
no_local = _, % FIXME: implement
no_ack = NoAck,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
_, State = #ch{consumer_prefetch = ConsumerPrefetch,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, State),
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
"amq.ctag");
Other -> Other
end,
case basic_consume(
QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait, State) of
{ok, State1} ->
{noreply, State1};
{error, exclusive_consume_unavailable} ->
rabbit_misc:protocol_error(
access_refused, "~s in exclusive use",
[rabbit_misc:rs(QueueName)])
end;
{ok, _} ->
%% Attempted reuse of consumer tag.
rabbit_misc:protocol_error(
not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
end;
2.主隊列進程增加消費者,並對channel進程監控
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use -> reply({error, exclusive_consume_unavailable}, State);
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
PrefetchCount, Args, is_empty(State),
Consumers),
end;
ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = priority_queue:new(),
limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
C = #cr{} -> C
end.
3.主隊列進程收到channel 'DOWN'的消息后,刪除消費者,獲取此被此channel ack的消息
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
{ok, State1} -> noreply(State1);
{stop, State1} -> stop(State1)
end;
handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
senders = Senders}) ->
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false -> Senders;
true -> credit_flow:peer_down(DownPid),
pmon:demonitor(DownPid, Senders)
end},
case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
{ok, State1};
{ChAckTags, ChCTags, Consumers1} ->
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
ensure_expiry_timer(State2))}
end
end.
4.涉及重入隊列時,需要了解backing queue,即消息是如何在鏡像隊列之間內部以及消息如何在本地內存和磁盤資源之間按需切換,或此部分涉及內容較多,后序會專門列出一個專題來分析此實現。rabbit_amqqueue_process主隊列進程的backing_queue是rabbit_mirror_queue_master(鏡像隊列消息同步),后者因為需要將消息按需放置,所以也有backing_queueu,即rabbit_variable_queue。
根據sequeue_id來判斷消息在隊列中的位置,從當前隊列中pop出隊頭的消息(最早入隊列的消息),若未ack的消息較晚(seqid相對大),則將pop隊頭的隊列再與未ack的消息比較,將消息pop出的消息放置在front隊列中,直到符合,插入隊列,並將front隊列與此隊列合入。
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
Limit, PubFun, State).
queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
Limit, PubFun, State)
when Limit == undefined orelse SeqId < Limit ->
case ?QUEUE:out(Q) of
{{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
when SeqIdQ < SeqId ->
%% enqueue from the remaining queue
queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
{MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
{#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
PubFun(MsgStatus, State1),
queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
Limit, PubFun, State2)
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
{SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
