[Erlang 0090] RabbitMQ 集群: Mirrored Queue


   之前提到過在集群環境中,隊列只有元數據會在集群的所有節點同步,但是隊列中的數據只會存在於一個節點;這不免讓人失望:數據沒有冗余容易丟數據甚至在durable的情況下,如果所在的節點當掉就要等待節點恢復.那么是不是有消息冗余的解決方案呢?是的,RabbitMQ自2.6.0開始就開始支持鏡像隊列(Mirrored Queue).消息會在節點之間復制,和其它的主從設計一樣,它也有master和slave的概念;一旦某個節點當掉,會在其余的節點中選舉一個slave作為master.要注意Mirrored Queue 也不是銀彈,后面會提到它的局限.

 

現在就開始動手  

  

Uri uri = new Uri( "amqp://192.168.10.160:9992/" );
               ......   ......
ch.ExchangeDeclare(exchange, exchangeType, true);//,true,true,false,false, true,null);
ch.QueueDeclare(pic_process_queue, true, false , false, new Dictionary <string, string>() { { "x-ha-policy" , "all" } });
 ch.QueueBind(pic_process_queue, exchange, routingKey);
 ch.QueueDeclare(pic_process_queue2, true, false , false, null);
 ch.QueueBind(pic_process_queue2, exchange, routingKey);

 

  我們使用rabbitmqctl工具檢查集群狀態,注意我新建了一個腳本rabbitmq-util內容基本和rabbitmqctl一樣,只是顯示指定了cookie.

[root@localhost scripts]#  ./rabbitmq-util -n z_91@zen.com cluster_status

Cluster status of node 'z_91@zen.com' ...

[{nodes,[{disc,['z_92@zen.com']},{ram,['z_93@zen.com','z_91@zen.com']}]},

{running_nodes,['z_92@zen.com','z_93@zen.com','z_91@zen.com']}]

...done.

[root@localhost scripts]#  ./rabbitmq-util -n z_93@zen.com list_queues name pid slave_pids

Listing queues ...
qp_pic_queue    <'z_92@zen.com'.2.7008.0>       [<'z_93@zen.com'.2.6931.0>, <'z_91@zen.com'.2.7445.0>]
qp_pic_queue2   <'z_92@zen.com'.2.7013.0>       []
...done.
[root@localhost scripts]# 

 

  可以看到上面連接的Rabbit是92節點,端口9992;我們現在正常關閉92節點(使用rabbitmqctl而不是kill),現在我們修改Client的代碼,連接到93節點 Uri uri = new Uri( "amqp://192.168.10.160:9993/" );運行同樣的代碼,和我們的預期一致:qp_pic_queue 由於聲明了Mirrored 所有在92節點關閉之后,我們可以在93上重建隊列;而qp_pic_queue2 就沒有這么幸運了,拋出了404錯誤.看下面的截圖:

 

如果只是在部分節點復制呢?    

 

   上面的例子是把消息在整個集群內進行復制,如果是指定在幾個節點之間做消息鏡像怎么辦?其實編碼實現上沒有難度,因為x-ha-policy參數支持顯示指定節點;下面是我截取的rabbit_amqqueue.erl中關於創建鏡像隊列的參數處理邏輯;下面是一段C#客戶端代碼顯示指定要在哪些節點復制消息.

..\rabbitmq-server-2.8.7\src\rabbit_amqqueue.erl

determine_queue_nodes(Args) ->
    Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
    PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
    case {Policy, PolicyParams} of
        {{_Type, <<"nodes">>}, {array, Nodes}} ->
            case [list_to_atom(binary_to_list(Node)) ||
                     {longstr, Node} <- Nodes] of
                [Node]         -> {Node,   undefined};
                [First | Rest] -> {First,  [First | Rest]}
            end;
        {{_Type, <<"all">>}, _} ->
            {node(), all};
        _ ->
            {node(), undefined}
    end.

 

聲明一個yaqp_pic_queue

ch.ExchangeDeclare(exchange, exchangeType, true);//,true,true,false,false, true,null);

 ch.QueueDeclare(pic_process_queue, true, false , false,

 new Dictionary <string, object>() { { "x-ha-policy" , "nodes" }, { "x-ha-policy-params", new List<string >() { "z_91@zen.com", "z_93@zen.com" } } });

 ch.QueueBind(pic_process_queue, exchange, routingKey);

   

  用rabbitmqctl看看slave_pids

[root@localhost scripts]#  ./rabbitmq-util -n z_93@zen.com cluster_status
Cluster status of node 'z_93@zen.com' ...
[{nodes,[{disc,['z_92@zen.com']},{ram,['z_93@zen.com','z_91@zen.com']}]},
{running_nodes,['z_92@zen.com','z_91@zen.com','z_93@zen.com']}]
...done.
[root@localhost scripts]#  ./rabbitmq-util -n z_91@zen.com list_queues name pid slave_pids
Listing queues ...
qp_pic_queue    <'z_93@zen.com'.2.6931.0>       [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>]
yaqp_pic_queue  <'z_91@zen.com'.2.7875.0>       [<'z_93@zen.com'.2.7387.0>]
qp_pic_queue2   <'z_92@zen.com'.3.232.0>        []
...done.
[root@localhost scripts]# 

   

    只是實際操作的時候有點兩難了:我們要在代碼編寫的時候就要指定這個參數,而運行時這些節點可是不一定活着的.如果顯示指定的節點不是都處於在線狀態,declare就會失敗.節點調整的時候硬編碼也會讓我們陷入窘境,所以rabbitmq in action推薦的做法是使用x-ha-policy的all選項,即在全集群范圍內復制消息.

  

半路增加新節點會如何?

 

    理想狀態總是簡單,真實環境總是復雜;如果我們要在運行時添加一個新的節點到集群中(添加節點這種事情這再正常不過),消息復制會怎么處理?如果有新節點加入,RabbitMQ不會同步之前的歷史數據,只會復制新消息.這里的假設是隨着消息的被consumer取走,最終所有的節點的數據都會對齊一致.

    一個自然的追問就是:master節點退出集群會選一個slave作為master,那么要是不幸選中了一個剛剛加入集群的節點怎么辦?不就丟消息了么?放心RabbitMQ會維護節點的狀態是否已經同步,使用rabbitmqctl的synchronised_slave_pids參數,就可以查看狀態.看下面的例子,如果slave_pids和synchronised_slave_pids里面的節點是一致的,那說明全都同步了.如果不一致很容易比較出來哪些還沒有同步.

[root@localhost scripts]#  ./rabbitmq-util -n z_91@zen.com list_queues name pid slave_pids synchronised_slave_pids
Listing queues ...
zen_qp_pic_queue        <'z_91@zen.com'.2.8009.0>       [<'z_93@zen.com'.2.7517.0>]     [<'z_93@zen.com'.2.7517.0>]
qp_pic_queue    <'z_93@zen.com'.2.6931.0>       [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>, <'z_94@zen.com'.1.595.0>] [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>, <'z_94@zen.com'.1.595.0>]
yaqp_pic_queue  <'z_91@zen.com'.2.7875.0>       [<'z_93@zen.com'.2.7387.0>]     [<'z_93@zen.com'.2.7387.0>]
qp_pic_queue2   <'z_92@zen.com'.3.232.0>
yy_qp_pic_queue <'z_91@zen.com'.2.7920.0>       [<'z_93@zen.com'.2.7434.0>]     [<'z_93@zen.com'.2.7434.0>]
...done.
[root@localhost scripts]# 

 

 官網資料: Highly Available Queues  http://www.rabbitmq.com/ha.html 

 

最后,小圖一張 每當變幻時 Miss

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM