之前提到了RabbitMQ是怎樣維護Queue的data和metadata的.我們知道Queue在RabbitMQ對應Erlang的進程,那么Exchane是不是也是獨立的Erlang進程呢?它的信息是如何維護的呢?
Exchange 本質上是什么
印象中Vhost就像一個容器,Exchange Queue就像一個個零件,這些東西組裝起來成為一個消息的Broker.真正實現的時候並不是每一個東西都會按照一一對應的方式設計實體,這里我想到兩點:
[1] 思考問題的時候恰當的隱喻能夠幫我們快速理解問題,但是在某個時候要拋開隱喻,因為它和你要理解的東西畢竟是兩個獨立的東西;
[2] 奧卡姆剃刀原則,如無必要勿增實體,在Erlang世界中創建進程的成本很低,往往會濫用這一點,把一些沒有必要設計成獨立進程的搞成了進程,Erlang的設計哲學是把獨立的活動用進程表達.進程創建也是有限制的,最后一根稻草壓死駱駝,雪崩的時候每一片雪花都不認為自己有責任,就是這個道理.
Exchange在RabbitMQ就沒有設計為進程而僅僅是單純維護了exchange 名稱以及相關的bingding規則.我們發送消息到Exchange本質上是:你連接到RabbitMQ的channel拿消息中的routing key和binding規則匹配完成消息的路由.換句話說,真正完成route邏輯的實體是channel,exchange只是一堆規則集,很容易在集群內部同步,所以沒有queue面臨的那些問題.下面是rabbit_channel的代碼片段,我們可以窺見一斑:
handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, tx_status = TxStatus, confirm_enabled = ConfirmEnabled, trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), check_user_id_header(DecodedContent#content.properties, State), {MsgSeqNo, State1} = case {TxStatus, ConfirmEnabled} of {none, false} -> {undefined, State}; {_, _} -> SeqNo = State#ch.publish_seqno, {SeqNo, State#ch{publish_seqno = SeqNo + 1}} end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), {noreply, case TxStatus of none -> deliver_to_queues({Delivery, QNames}, State1); in_progress -> TMQ = State1#ch.uncommitted_message_q, NewTMQ = queue:in({Delivery, QNames}, TMQ), State1#ch{uncommitted_message_q = NewTMQ} end}; {error, Reason} -> rabbit_misc:protocol_error(precondition_failed, "invalid message: ~p", [Reason]) end;
我們在RabbitMQ集群環境創建exchange實際上就是在集群內所有節點的exchange表增加一條數據.這樣連接到所有節點都可以使用這個新exchange.
要是發送到channel的消息還沒有完成路由節點就當掉了怎么辦?
basic.public AMQP命令並沒有返回消息的狀態.這就意味節點當掉的時候,channel還會繼續routing消息.producer會繼續發布消息,這樣就有丟消息的風險.
解決方案有兩個:[1] 使用AMQP transaction (AMQP事務):producer保持阻塞狀態直到消息被路由到queue.[2] 使用publisher confirm 來跟蹤節點當掉的時候哪些消息還沒有確認.
這兩種方法都可以幫助我們檢測消息是否到達目的地(比如節點當掉).
下面我們看看exchange在erlang node里面是怎么維護的:
Exchange in Erlang node
rabbit_durable_exchange rabbit_durable_exchange set 9 507 mnesia_monitor rabbit_durable_queue rabbit_durable_queue set 2 335 mnesia_monitor rabbit_durable_route rabbit_durable_route set 4 445 mnesia_monitor rabbit_exchange rabbit_exchange set 9 507 mnesia_monitor rabbit_exchange_serial rabbit_exchange_serial set 0 283 mnesia_monitor rabbit_listener rabbit_listener bag 1 321 mnesia_monitor rabbit_queue rabbit_queue set 2 335 mnesia_monitor rabbit_registry rabbit_registry set 7 353 rabbit_registry rabbit_reverse_route rabbit_reverse_route ordered_set 4 239 mnesia_monitor rabbit_route rabbit_route ordered_set 4 239 mnesia_monitor rabbit_semi_durable_route rabbit_semi_durable_route ordered_set 4 239 mnesia_monitor rabbit_topic_trie_binding rabbit_topic_trie_binding ordered_set 0 73 mnesia_monitor rabbit_topic_trie_edge rabbit_topic_trie_edge ordered_set 0 73 mnesia_monitor rabbit_topic_trie_node rabbit_topic_trie_node ordered_set 0 73 mnesia_monitor rabbit_user rabbit_user set 1 302 mnesia_monitor rabbit_user_permission rabbit_user_permission set 1 315 mnesia_monitor rabbit_vhost rabbit_vhost set 1 294 mnesia_monitor
(default@zen.com)37> ets:match(rabbit_exchange,'$1'). [[{exchange,{resource,<<"/">>,exchange, <<"amq.rabbitmq.trace">>}, topic,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange, <<"amq.rabbitmq.log">>}, topic,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"qp_pic_exchange">>}, direct,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"amq.match">>}, headers,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"amq.headers">>}, headers,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"amq.topic">>}, topic,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"amq.direct">>}, direct,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<"amq.fanout">>}, fanout,true,false,false,[],undefined}], [{exchange,{resource,<<"/">>,exchange,<<>>}, direct,true,false,false,[],undefined}]]
(default@zen.com)32> ets:match(rabbit_queue,'$1'). [[#amqqueue{name = {resource,<<"/">>,queue, <<"zen_qp_pic_queue">>}, durable = true,auto_delete = false,exclusive_owner = none, arguments = [],pid = <0.396.0>,slave_pids = [], mirror_nodes = undefined}], [#amqqueue{name = {resource,<<"/">>,queue, <<"qp_pic_queue2">>}, durable = true,auto_delete = false,exclusive_owner = none, arguments = [],pid = <0.397.0>,slave_pids = [], mirror_nodes = undefined}]] (default@zen.com)33> process_info(pid(0,396,0)). [{current_function,{erlang,hibernate,3}}, {initial_call,{proc_lib,init_p,5}}, {status,waiting}, {message_queue_len,0}, {messages,[]}, {links,[#Port<0.2723>,<0.204.0>]}, {dictionary,[{{"/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif", fhc_file}, {file,1,true}}, {{xtype_to_module,direct},rabbit_exchange_type_direct}, {credit_blocked,[]}, {{credit_from,<0.198.0>},1998}, {'$ancestors',[rabbit_amqqueue_sup,rabbit_sup,<0.110.0>]}, {{#Ref<0.0.0.4414>,fhc_handle}, {handle,{file_descriptor,prim_file,{#Port<0.2723>,18}}, 0,false,0,infinity,[],true, "/var/lib/rabbitmq/mnesia/rabbit@localhost/queues/48ROU6Q8HVQVRA1PVZQ38TZLK/journal.jif", [write,binary|...], [{...}], true,...}}, {fhc_age_tree,{1, {{1352,908329,782846},#Ref<0.0.0.4414>,nil,nil}}}, {guid,{{2942931266,2608231844,2036421255,3742291922},1}}, {'$initial_call',{gen,init_it,6}}]}, {trap_exit,true}, {error_handler,error_handler}, {priority,normal}, {group_leader,<0.109.0>}, {total_heap_size,1709}, {heap_size,1709}, {stack_size,0}, {reductions,8669}, {garbage_collection,[{min_bin_vheap_size,46368}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,10}]}, {suspending,[]}]
rabbit_route
(default@zen.com)35> ets:match(rabbit_route,'$1'). [[{route,{binding,{resource,<<"/">>,exchange,<<>>}, <<"qp_pic_queue2">>, {resource,<<"/">>,queue,<<"qp_pic_queue2">>}, []}, const}], [{route,{binding,{resource,<<"/">>,exchange,<<>>}, <<"zen_qp_pic_queue">>, {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>}, []}, const}], [{route,{binding,{resource,<<"/">>,exchange, <<"qp_pic_exchange">>}, <<"qp_pic2">>, {resource,<<"/">>,queue,<<"qp_pic_queue2">>}, []}, const}], [{route,{binding,{resource,<<"/">>,exchange, <<"qp_pic_exchange">>}, <<"qp_pic2">>, {resource,<<"/">>,queue,<<"zen_qp_pic_queue">>}, []}, const}]]
technology.rabbitmq.ebook
sport.golf.paper
sport.tennis.ebook
Using the topic exchange, the subscription/binding key specified in step 5 can be a sequence
of dot-separated words and/or wildcards. AMQP wildcards are just:
# This matches zero or more words
* This matches exactly one word
So, for example:
#.ebook and *.*.ebook both match the first and the third sent messages
sport.# and sport.*.* both match the second and the third sent messages
# alone matches any message sent
最后小圖一張 Emma Watson 我們倆都是O型血 Petrificus Totalus!