rabbitmq網絡框架剖析


 

Rabbitmq是一個MQ系統,也就是消息中間件,它實現了AMQP 0.8規范,簡單來說就是一個TCP的廣播服務器。AMQP協議,你可以類比JMS,不過JMS僅僅是java領域內的API規范,而AMQP比JMS更進一步,它有自己的wire-level protocol,有一套可編程的協議,中立於語言。簡單介紹了Rabbitmq之后,進入正題。

    Rabbitmq充分利用了Erlang的分布式、高可靠性、並發等特性,首先看它的一個結構圖:


這張圖展現了Rabbitmq的主要組件和組件之間的關系,具體到監控樹的結構,我畫了一張圖:


      頂層是rabbit_sup supervisor,它至少有兩個子進程,一個是rabbit_tcp_client_sup,用來監控每個connection的處理進程 rabbit_reader的supervisor;rabbit_tcp_listener_sup是監控tcp_listener和 tcp_acceptor_sup的supervisor,tcp_listener里啟動tcp服務器,監聽端口,並且通過tcp_acceptor_sup啟動N個tcp_accetpor,tcp_acceptor發起accept請求,等待客戶端連接;tcp_acceptor_sup負責監控這些acceptor。這張圖已經能給你一個大體的印象。

講完大概,進入細節,說說幾個我覺的值的注意的地方:
1、tcp_accepto.erl,r對於accept采用的是異步方式,利用prim_inet:async_accept/2方法,此模塊沒有被文檔化,是otp庫內部使用,通常來說沒必要使用這一模塊,gen_tcp:accept/1已經足夠,不過rabbitmq是廣播程序,因此采用了異步方式。使用async_accept,需要打patch,以使得socket好像我們從gen_tcp:accept/1得到的一樣:


  1. handle_info({inet_async, LSock, Ref, {ok, Sock}},
  2.             State = #state{callback={M,F,A}, sock=LSock, ref=Ref}) ->
  3.     %%這里做了patch
  4.     %% patch up the socket so it looks like one we got from
  5.     %% gen_tcp:accept/1 
  6.     {ok, Mod} = inet_db:lookup_socket(LSock),
  7.     inet_db:register_socket(Sock, Mod),
  8.     try
  9.         %% report
  10.         {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
  11.         {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
  12.         error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
  13.                               [inet_parse:ntoa(Address), Port,
  14.                                inet_parse:ntoa(PeerAddress), PeerPort]),
  15.         %% 調用回調模塊,將Sock作為附加參數
  16.         apply(M, F, A ++ [Sock])
  17.     catch {inet_error, Reason} ->
  18.             gen_tcp:close(Sock),
  19.             error_logger:error_msg("unable to accept TCP connection: ~p~n",
  20.                                    [Reason])
  21.     end,
  22.     %% 繼續發起異步調用
  23.     case prim_inet:async_accept(LSock, -1) of
  24.         {ok, NRef} -> {noreply, State#state{ref=NRef}};
  25.         Error -> {stop, {cannot_accept, Error}, none}
  26.     end;
  27. %%處理錯誤情況
  28. handle_info({inet_async, LSock, Ref, {error, closed}},
  29.             State=#state{sock=LSock, ref=Ref}) ->
  30.     %% It would be wrong to attempt to restart the acceptor when we
  31.     %% know this will fail.
  32.     {stop, normal, State};


2、rabbitmq內部是使用了多個並發acceptor,這在高並發下、大量連接情況下有效率優勢,類似java現在的nio框架采用多個reactor類似,查看tcp_listener.erl:

  1. init({IPAddress, Port, SocketOpts,
  2.       ConcurrentAcceptorCount, AcceptorSup,
  3.       {M,F,A} = OnStartup, OnShutdown, Label}) ->
  4.     process_flag(trap_exit, true),
  5.     case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
  6.                                              {active, false}]) of
  7.         {ok, LSock} ->
  8.              %%創建ConcurrentAcceptorCount個並發acceptor
  9.             lists:foreach(fun (_) ->
  10.                                   {ok, _APid} = supervisor:start_child(
  11.                                                   AcceptorSup, [LSock])
  12.                           end,
  13.                           lists:duplicate(ConcurrentAcceptorCount, dummy)),
  14.             {ok, {LIPAddress, LPort}} = inet:sockname(LSock),
  15.             error_logger:info_msg("started ~s on ~s:~p~n",
  16.                                   [Label, inet_parse:ntoa(LIPAddress), LPort]),
  17.             %%調用初始化回調函數
  18.             apply(M, F, A ++ [IPAddress, Port]),
  19.             {ok, #state{sock = LSock,
  20.                         on_startup = OnStartup, on_shutdown = OnShutdown, 
  21.                         label = Label}};
  22.         {error, Reason} ->
  23.             error_logger:error_msg(
  24.               "failed to start ~s on ~s:~p - ~p~n",
  25.               [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
  26.             {stop, {cannot_listen, IPAddress, Port, Reason}}
  27.     end.


這里有一個技巧,如果要循環N次執行某個函數F,可以通過lists:foreach結合lists:duplicate(N,dummy)來處理。

lists:foreach(fun(_)-> F() end,lists:duplicate(N,dummy)).

3、simple_one_for_one策略的使用,可以看到對於tcp_client_sup和tcp_acceptor_sup都采用了simple_one_for_one策略,而非普通的one_fo_one,這是為什么呢?
這牽扯到simple_one_for_one的幾個特點:
1)simple_one_for_one內部保存child是使用dict,而其他策略是使用list,因此simple_one_for_one更適合child頻繁創建銷毀、需要大量child進程的情況,具體來說例如網絡連接的頻繁接入斷開。
2)使用了simple_one_for_one后,無法調用terminate_child/2 delete_child/2 restart_child/2 

3)start_child/2 對於simple_one_for_one來說,不必傳入完整的child spect,傳入參數list,會自動進行參數合並在一個地方定義好child spec之后,其他地方只要start_child傳入參數即可啟動child進程,簡化child都是同一類型進程情況下的編程

在 rabbitmq中,tcp_acceptor_sup的子進程都是tcp_acceptor進程,在tcp_listener中是啟動了 ConcurrentAcceptorCount個tcp_acceptor子進程,通過supervisor:start_child/2方法:


  1. %%創建ConcurrentAcceptorCount個並發acceptor
  2.             lists:foreach(fun (_) ->
  3.                                   {ok, _APid} = supervisor:start_child(
  4.                                                   AcceptorSup, [LSock])
  5.                           end,
  6.                           lists:duplicate(ConcurrentAcceptorCount, dummy)),


注意到,這里調用的start_child只傳入了LSock一個參數,另一個參數CallBack是在定義child spec的時候傳入的,參見tcp_acceptor_sup.erl:


  1. init(Callback) ->
  2.     {ok, {{simple_one_for_one, 10, 10},
  3.           [{tcp_acceptor, {tcp_acceptor, start_link, [Callback]},
  4.             transient, brutal_kill, worker, [tcp_acceptor]}]}}.



Erlang內部自動為simple_one_for_one做了參數合並,最后調用的是tcp_acceptor的init/2:


  1. init({Callback, LSock}) ->
  2.     case prim_inet:async_accept(LSock, -1) of
  3.         {ok, Ref} -> {ok, #state{callback=Callback, sock=LSock, ref=Ref}};
  4.         Error -> {stop, {cannot_accept, Error}}
  5.     end.


對於tcp_client_sup的情況類似,tcp_client_sup監控的子進程都是rabbit_reader類型,在 rabbit_networking.erl中啟動tcp_listenner傳入的處理connect事件的回調方法是是 rabbit_networking:start_client/1:


  1. start_tcp_listener(Host, Port) ->
  2.     start_listener(Host, Port, "TCP Listener",
  3.                    %回調的MFA
  4.                    {?MODULE, start_client, []}).
  5. start_client(Sock) ->
  6.     {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
  7.     ok = rabbit_net:controlling_process(Sock, Child),
  8.     Child ! {go, Sock},
  9.     Child.



start_client調用了supervisor:start_child/2來動態啟動rabbit_reader進程。

4、協議的解析,消息的讀取這部分也非常巧妙,這一部分主要在rabbit_reader.erl中,對於協議的解析沒有采用gen_fsm,而是實現了一個巧妙的狀態機機制,核心代碼在mainloop/4中:


  1. %啟動一個連接
  2. start_connection(Parent, Deb, ClientSock) ->
  3.     process_flag(trap_exit, true),
  4.     {PeerAddressS, PeerPort} = peername(ClientSock),
  5.     ProfilingValue = setup_profiling(),
  6.     try 
  7.         rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
  8.                         [self(), PeerAddressS, PeerPort]),
  9.          %延時發送握手協議
  10.         Erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
  11.                           handshake_timeout),
  12.         %進入主循環,更換callback模塊,魔法就在這個switch_callback
  13.         mainloop(Parent, Deb, switch_callback(
  14.                                 #v1{sock = ClientSock,
  15.                                     connection = #connection{
  16.                                       user = none,
  17.                                       timeout_sec = ?HANDSHAKE_TIMEOUT,
  18.                                       frame_max = ?FRAME_MIN_SIZE,
  19.                                       vhost = none},
  20.                                     callback = uninitialized_callback,
  21.                                     recv_ref = none,
  22.                                     connection_state = pre_init},
  23.                                 %%注意到這里,handshake就是我們的回調模塊,8就是希望接收的數據長度,AMQP協議頭的八個字節。
  24.                                 handshake, 8))


魔法就在switch_callback這個方法上:


  1. switch_callback(OldState, NewCallback, Length) ->
  2.     %發起一個異步recv請求,請求Length字節的數據
  3.     Ref = inet_op(fun () -> rabbit_net:async_recv(
  4.                               OldState#v1.sock, Length, infinity) end),
  5.     %更新狀態,替換ref和處理模塊
  6.     OldState#v1{callback = NewCallback,
  7.                 recv_ref = Ref}.


異步接收Length個數據,如果有,erlang會通知你處理。處理模塊是什么概念呢?其實就是一個狀態的概念,表示當前協議解析進行到哪一步,起一個label的作用,看看mainloop/4中的應用:


  1. mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
  2.     %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
  3.     receive
  4.         %%接收到數據,交給handle_input處理,注意handle_input的第一個參數就是callback
  5.         {inet_async, Sock, Ref, {ok, Data}} ->
  6.             %handle_input處理
  7.             {State1, Callback1, Length1} =
  8.                 handle_input(State#v1.callback, Data,
  9.                              State#v1{recv_ref = none}),
  10.             %更新回調模塊,再次發起異步請求,並進入主循環
  11.             mainloop(Parent, Deb,
  12.                      switch_callback(State1, Callback1, Length1));


handle_input有多個分支,每個分支都對應一個處理模塊,例如我們剛才提到的握手協議:


  1. %handshake模塊,注意到第一個參數,第二個參數就是我們得到的數據
  2. handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
  3.              State = #v1{sock = Sock, connection = Connection}) ->
  4.      %檢測協議是否兼容
  5.     case check_version({ProtocolMajor, ProtocolMinor},
  6.                        {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
  7.         true ->
  8.             {ok, Product} = application:get_key(id),
  9.             {ok, Version} = application:get_key(vsn),
  10.             %兼容的話,進入connections start,協商參數
  11.             ok = send_on_channel0(
  12.                    Sock,
  13.                    #'connection.start'{
  14.                      version_major = ?PROTOCOL_VERSION_MAJOR,
  15.                      version_minor = ?PROTOCOL_VERSION_MINOR,
  16.                      server_properties =
  17.                      [{list_to_binary(K), longstr, list_to_binary(V)} ||
  18.                          {K, V} <-
  19.                              [{"product", Product},
  20.                               {"version", Version},
  21.                               {"platform", "Erlang/OTP"},
  22.                               {"copyright", ?COPYRIGHT_MESSAGE},
  23.                               {"information", ?INFORMATION_MESSAGE}]],
  24.                      mechanisms = <<"PLAIN AMQPLAIN">>,
  25.                      locales = <<"en_US">> }),
  26.             {State#v1{connection = Connection#connection{
  27.                                      timeout_sec = ?NORMAL_TIMEOUT},
  28.                       connection_state = starting},
  29.              frame_header, 7};
  30.          %否則,斷開連接,返回可以接受的協議
  31.         false ->
  32.             throw({bad_version, ProtocolMajor, ProtocolMinor})
  33.     end;


其他協議的處理也是類似,通過動態替換callback的方式來模擬狀態機做協議的解析和數據的接收,真的很巧妙!讓我們體會到Erlang的魅力,FP的魅力。

5、序列圖:
1)tcp server的啟動過程:






2)一個client連接上來的處理過程:









小結:從上面的分析可以看出,rabbitmq的網絡層是非常健壯和高效的,通過層層監控,對每個可能出現的風險點都做了考慮,並且利用了prim_net模塊做異步IO處理。分層也是很清晰,將業務處理模塊隔離到client_sup監控下的子進程,將網絡處理細節和業務邏輯分離。在協議的解析和業務處理上雖然沒有采用gen_fsm,但是也實現了一套類似的狀態機機制,通過動態替換Callback來模擬狀態的變遷,非常巧妙。如果你要實現一個tcp server,強烈推薦從rabbitmq中扣出這個網絡層,你只需要實現自己的業務處理模塊即可擁有一個高效、健壯、分層清晰的TCP服務器。



網友討論:
prim_inet,按照余鋒老大的說法是可以用的,基本上接口不會有大的變更,gen_tcp其實是基於prim_net實現的。

使用simple_one_for_one,可以有多個child的,只不過這些child的是同一種類型的,看supervisor.erl的源碼就知道,內部是動態保存在一個dict結構里dynamics = ?DICT:new(),因此是可保存多個:


  1. {ok, Pid} ->
  2. NState = State#state{dynamics = 
  3. ?DICT:store(Pid, Args, State#state.dynamics)},
  4. {reply, {ok, Pid}, NState};

文章來自:http://blog.chinaunix.net/uid-429659-id-3536524.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM