queue.declare即申請隊列,首先對隊列名作處理,若未指定隊列名則隨機生成一個,然后查詢數據庫隊列是否已經創建,若創建完成則會申請隊列返回
handle_method(#'queue.declare'{queue = QueueNameBin,
passive = false,
durable = DurableDeclare,
exclusive = ExclusiveDeclare,
auto_delete = AutoDelete,
nowait = NoWait,
arguments = Args} = Declare,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid,
queue_collector_pid = CollectorPid}) ->
Owner = case ExclusiveDeclare of
true -> ConnPid;
false -> none
end,
Durable = DurableDeclare andalso not ExclusiveDeclare,
ActualNameBin = case QueueNameBin of
<<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
"amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
check_configure_permitted(QueueName, State),
%%查找是否隊列是否已經存在
case rabbit_amqqueue:with(
QueueName,
fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
Q, Durable, AutoDelete, Args, Owner),
maybe_stat(NoWait, Q)
end) of
{ok, MessageCount, ConsumerCount} ->
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
{error, not_found} ->
DlxKey = <<"x-dead-letter-exchange">>,
case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
undefined ->
ok;
{error, {invalid_type, Type}} ->
precondition_failed(
"invalid type '~s' for arg '~s' in ~s",
[Type, DlxKey, rabbit_misc:rs(QueueName)]);
DLX ->
check_read_permitted(QueueName, State),
check_write_permitted(DLX, State),
ok
end,
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
{new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
%% the connection shuts down.
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
%% must have been created between the stat and the
%% declare. Loop around again.
handle_method(Declare, none, State);
{absent, Q, Reason} ->
rabbit_misc:absent(Q, Reason);
{owner_died, _Q} ->
%% Presumably our own days are numbered since the
%% connection has died. Pretend the queue exists though,
%% just so nothing fails.
return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
end;
{error, {absent, Q, Reason}} ->
rabbit_misc:absent(Q, Reason)
end;
rabbit_amqqueue.erl
其中的node()是為了指明master queue的位置,即收到申請隊列消息的節點
declare(QueueName, Durable, AutoDelete, Args, Owner) -> declare(QueueName, Durable, AutoDelete, Args, Owner, node()).
選擇主節點並對主節點發創建隊列進程的消息
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_queue_decorator:set( rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, exclusive_owner = Owner, pid = none, slave_pids = [], sync_slave_pids = [], recoverable_slaves = [], gm_pids = [], state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), {init, new}, infinity).
rabbit_amqqueue_sup.erl
在啟動rabbit_amqp_process的時候,supervisor使用的Maker來標志此進程是否首次啟動,以區別重啟進程來做不同操作 。
start_link(Q, StartMode) ->
%%Marker存在的意義是什么?標志着是否為第一次啟動
Marker = spawn_link(fun() -> receive stop -> ok end end),
ChildSpec = {rabbit_amqqueue,
{rabbit_prequeue, start_link, [Q, StartMode, Marker]},
intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
rabbit_mirror_queue_slave]},
{ok, SupPid} = supervisor2:start_link(?MODULE, []),
{ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
unlink(Marker),
Marker ! stop,
{ok, SupPid, Qpid}.
之后,主節點會啟動rabbit_amqp_process,用coordinator來完成數據同步(gm),而備節點則會啟動rabbit_mirror_queue_slave進程,后者同時使用了gm behaviour,所以可以和coordinator來進程數據同步,以mq節點之間狀態保持一致。
通過coordinator獲取gm完成可靠同步,然后獲取備節點在備節點增加鏡像隊列
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), depth_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
Self = self(),
ok = rabbit_misc:execute_mnesia_transaction(
fun () ->
[Q1 = #amqqueue{gm_pids = GMPids}]
= mnesia:read({rabbit_queue, QName}),
ok = rabbit_amqqueue:store_queue(
Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
state = live})
end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
在所有的備節點上增加鏡像隊列,即創建備隊列進程
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), #state { name = QName, gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, seen_status = dict:new(), confirmed = [], known_senders = sets:new() }. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), rabbit_mirror_queue_slave:go(SPid, SyncMode) end); {error, not_found} = E -> E end.
未完成待續
