[Erlang 0089] RabbitMQ Exchange



   
 之前提到了RabbitMQ是怎樣維護Queue的data和metadata的.我們知道Queue在RabbitMQ對應Erlang的進程,那么Exchane是不是也是獨立的Erlang進程呢?它的信息是如何維護的呢?

 

Exchange 本質上是什么

 

   印象中Vhost就像一個容器,Exchange Queue就像一個個零件,這些東西組裝起來成為一個消息的Broker.真正實現的時候並不是每一個東西都會按照一一對應的方式設計實體,這里我想到兩點:

  [1] 思考問題的時候恰當的隱喻能夠幫我們快速理解問題,但是在某個時候要拋開隱喻,因為它和你要理解的東西畢竟是兩個獨立的東西;

  [2] 奧卡姆剃刀原則,如無必要勿增實體,在Erlang世界中創建進程的成本很低,往往會濫用這一點,把一些沒有必要設計成獨立進程的搞成了進程,Erlang的設計哲學是把獨立的活動用進程表達.進程創建也是有限制的,最后一根稻草壓死駱駝,雪崩的時候每一片雪花都不認為自己有責任,就是這個道理.

 

   Exchange在RabbitMQ就沒有設計為進程而僅僅是單純維護了exchange 名稱以及相關的bingding規則.我們發送消息到Exchange本質上是:你連接到RabbitMQ的channel拿消息中的routing key和binding規則匹配完成消息的路由.換句話說,真正完成route邏輯的實體是channel,exchange只是一堆規則集,很容易在集群內部同步,所以沒有queue面臨的那些問題.下面是rabbit_channel的代碼片段,我們可以窺見一斑:

  

handle_method(#'basic.publish'{exchange    = ExchangeNameBin,
                               routing_key = RoutingKey,
                               mandatory   = Mandatory,
                               immediate   = Immediate},
              Content, State = #ch{virtual_host    = VHostPath,
                                   tx_status       = TxStatus,
                                   confirm_enabled = ConfirmEnabled,
                                   trace_state     = TraceState}) ->
    ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
    check_write_permitted(ExchangeName, State),
    Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
    check_internal_exchange(Exchange),
    %% We decode the content's properties here because we're almost
    %% certain to want to look at delivery-mode and priority.
    DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
    check_user_id_header(DecodedContent#content.properties, State),
    {MsgSeqNo, State1} =
        case {TxStatus, ConfirmEnabled} of
            {none, false} -> {undefined, State};
            {_, _}        -> SeqNo = State#ch.publish_seqno,
                             {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
        end,
    case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
        {ok, Message} ->
            rabbit_trace:tap_trace_in(Message, TraceState),
            Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
                                             MsgSeqNo),
            QNames = rabbit_exchange:route(Exchange, Delivery),
            {noreply,
             case TxStatus of
                 none        -> deliver_to_queues({Delivery, QNames}, State1);
                 in_progress -> TMQ = State1#ch.uncommitted_message_q,
                                NewTMQ = queue:in({Delivery, QNames}, TMQ),
                                State1#ch{uncommitted_message_q = NewTMQ}
             end};
        {error, Reason} ->
            rabbit_misc:protocol_error(precondition_failed,
                                       "invalid message: ~p", [Reason])
    end;

 

  我們在RabbitMQ集群環境創建exchange實際上就是在集群內所有節點的exchange表增加一條數據.這樣連接到所有節點都可以使用這個新exchange.

 

  要是發送到channel的消息還沒有完成路由節點就當掉了怎么辦?

 

  basic.public AMQP命令並沒有返回消息的狀態.這就意味節點當掉的時候,channel還會繼續routing消息.producer會繼續發布消息,這樣就有丟消息的風險.

 

  解決方案有兩個:[1] 使用AMQP transaction (AMQP事務):producer保持阻塞狀態直到消息被路由到queue.[2] 使用publisher confirm 來跟蹤節點當掉的時候哪些消息還沒有確認.

  這兩種方法都可以幫助我們檢測消息是否到達目的地(比如節點當掉).

  下面我們看看exchange在erlang node里面是怎么維護的:

 

Exchange in Erlang node

 
  通過ets:i()查看節點的ETS表,能夠看到一系列Rabbit前綴的表:
 
rabbit_durable_exchange rabbit_durable_exchange set   9      507      mnesia_monitor
rabbit_durable_queue rabbit_durable_queue set   2      335      mnesia_monitor
rabbit_durable_route rabbit_durable_route set   4      445      mnesia_monitor
rabbit_exchange rabbit_exchange   set   9      507      mnesia_monitor
rabbit_exchange_serial rabbit_exchange_serial set   0      283      mnesia_monitor
rabbit_listener rabbit_listener   bag   1      321      mnesia_monitor
rabbit_queue    rabbit_queue      set   2      335      mnesia_monitor
rabbit_registry rabbit_registry   set   7      353      rabbit_registry
rabbit_reverse_route rabbit_reverse_route ordered_set 4      239      mnesia_monitor
rabbit_route    rabbit_route      ordered_set 4      239      mnesia_monitor
rabbit_semi_durable_route rabbit_semi_durable_route ordered_set 4      239      mnesia_monitor
rabbit_topic_trie_binding rabbit_topic_trie_binding ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_edge rabbit_topic_trie_edge ordered_set 0      73       mnesia_monitor
rabbit_topic_trie_node rabbit_topic_trie_node ordered_set 0      73       mnesia_monitor
rabbit_user     rabbit_user       set   1      302      mnesia_monitor
rabbit_user_permission rabbit_user_permission set   1      315      mnesia_monitor
rabbit_vhost    rabbit_vhost      set   1      294      mnesia_monitor

 

 
  下面我們看幾個我們比較關注的ets表
 
  rabbit_exchange :
 
(default@zen.com)37> ets:match(rabbit_exchange,'$1').
[[{exchange,{resource,<<"/">>,exchange,
                      <<"amq.rabbitmq.trace">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,
                      <<"amq.rabbitmq.log">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"qp_pic_exchange">>},
            direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.match">>},
            headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.headers">>},
            headers,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.topic">>},
            topic,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.direct">>},
            direct,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<"amq.fanout">>},
            fanout,true,false,false,[],undefined}],
[{exchange,{resource,<<"/">>,exchange,<<>>},
            direct,true,false,false,[],undefined}]]

 

 
rabbit_queue
 
(default@zen.com)32> ets:match(rabbit_queue,'$1').
[[#amqqueue{name = {resource,<<"/">>,queue,
                             <<"zen_qp_pic_queue">>},
            durable = true,auto_delete = false,exclusive_owner = none,
            arguments = [],pid = <0.396.0>,slave_pids = [],
            mirror_nodes = undefined}],
[#amqqueue{name = {resource,<<"/">>,queue,
                             <<"qp_pic_queue2">>},
            durable = true,auto_delete = false,exclusive_owner = none,
            arguments = [],pid = <0.397.0>,slave_pids = [],
            mirror_nodes = undefined}]]
(default@zen.com)33> process_info(pid(0,396,0)).
[{current_function,{erlang,hibernate,3}},
{initial_call,{proc_lib,init_p,5}},
{status,waiting},
{message_queue_len,0},
{messages,[]},
{links,[#Port<0.2723>,<0.204.0>]},
{dictionary,[{{"/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
                fhc_file},
               {file,1,true}},
              {{xtype_to_module,direct},rabbit_exchange_type_direct},
              {credit_blocked,[]},
              {{credit_from,<0.198.0>},1998},
              {'$ancestors',[rabbit_amqqueue_sup,rabbit_sup,<0.110.0>]},
              {{#Ref<0.0.0.4414>,fhc_handle},
               {handle,{file_descriptor,prim_file,{#Port<0.2723>,18}},
                       0,false,0,infinity,[],true,
                       "/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif",
                       [write,binary|...],
                       [{...}],
                       true,...}},
              {fhc_age_tree,{1,
                             {{1352,908329,782846},#Ref<0.0.0.4414>,nil,nil}}},
              {guid,{{2942931266,2608231844,2036421255,3742291922},1}},
              {'$initial_call',{gen,init_it,6}}]},
{trap_exit,true},
{error_handler,error_handler},
{priority,normal},
{group_leader,<0.109.0>},
{total_heap_size,1709},
{heap_size,1709},
{stack_size,0},
{reductions,8669},
{garbage_collection,[{min_bin_vheap_size,46368},
                      {min_heap_size,233},
                      {fullsweep_after,65535},
                      {minor_gcs,10}]},
{suspending,[]}]

 

 rabbit_route

(default@zen.com)35> ets:match(rabbit_route,'$1').
[[{route,{binding,{resource,<<"/">>,exchange,<<>>},
                  <<"qp_pic_queue2">>,
                  {resource,<<"/">>,queue,<<"qp_pic_queue2">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,<<>>},
                  <<"zen_qp_pic_queue">>,
                  {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,
                            <<"qp_pic_exchange">>},
                  <<"qp_pic2">>,
                  {resource,<<"/">>,queue,<<"qp_pic_queue2">>},
                  []},
         const}],
[{route,{binding,{resource,<<"/">>,exchange,
                            <<"qp_pic_exchange">>},
                  <<"qp_pic2">>,
                  {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>},
                  []},
         const}]]

 

 

technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook

 

Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence 

of dot-separated words and/or wildcards. AMQP wildcards are just:
 #  This matches zero or more words
 *  This matches exactly one word


So, for example:
 #.ebook and *.*.ebook both match the first and the third sent messages
 sport.# and sport.*.* both match the second and the third sent messages
 # alone matches any message sent

 

 

最后小圖一張 Emma Watson 我們倆都是O型血 Petrificus Totalus!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM