之前提到過在集群環境中,隊列只有元數據會在集群的所有節點同步,但是隊列中的數據只會存在於一個節點;這不免讓人失望:數據沒有冗余容易丟數據甚至在durable的情況下,如果所在的節點當掉就要等待節點恢復.那么是不是有消息冗余的解決方案呢?是的,RabbitMQ自2.6.0開始就開始支持鏡像隊列(Mirrored Queue).消息會在節點之間復制,和其它的主從設計一樣,它也有master和slave的概念;一旦某個節點當掉,會在其余的節點中選舉一個slave作為master.要注意Mirrored Queue 也不是銀彈,后面會提到它的局限.
現在就開始動手
Uri uri = new Uri( "amqp://192.168.10.160:9992/" ); ...... ...... ch.ExchangeDeclare(exchange, exchangeType, true);//,true,true,false,false, true,null); ch.QueueDeclare(pic_process_queue, true, false , false, new Dictionary <string, string>() { { "x-ha-policy" , "all" } }); ch.QueueBind(pic_process_queue, exchange, routingKey); ch.QueueDeclare(pic_process_queue2, true, false , false, null); ch.QueueBind(pic_process_queue2, exchange, routingKey);
我們使用rabbitmqctl工具檢查集群狀態,注意我新建了一個腳本rabbitmq-util內容基本和rabbitmqctl一樣,只是顯示指定了cookie.
[root@localhost scripts]# ./rabbitmq-util -n z_91@zen.com cluster_status Cluster status of node 'z_91@zen.com' ... [{nodes,[{disc,['z_92@zen.com']},{ram,['z_93@zen.com','z_91@zen.com']}]}, {running_nodes,['z_92@zen.com','z_93@zen.com','z_91@zen.com']}] ...done. [root@localhost scripts]# ./rabbitmq-util -n z_93@zen.com list_queues name pid slave_pids Listing queues ... qp_pic_queue <'z_92@zen.com'.2.7008.0> [<'z_93@zen.com'.2.6931.0>, <'z_91@zen.com'.2.7445.0>] qp_pic_queue2 <'z_92@zen.com'.2.7013.0> [] ...done. [root@localhost scripts]#
可以看到上面連接的Rabbit是92節點,端口9992;我們現在正常關閉92節點(使用rabbitmqctl而不是kill),現在我們修改Client的代碼,連接到93節點 Uri uri = new Uri( "amqp://192.168.10.160:9993/" );運行同樣的代碼,和我們的預期一致:qp_pic_queue 由於聲明了Mirrored 所有在92節點關閉之后,我們可以在93上重建隊列;而qp_pic_queue2 就沒有這么幸運了,拋出了404錯誤.看下面的截圖:
如果只是在部分節點復制呢?
上面的例子是把消息在整個集群內進行復制,如果是指定在幾個節點之間做消息鏡像怎么辦?其實編碼實現上沒有難度,因為x-ha-policy參數支持顯示指定節點;下面是我截取的rabbit_amqqueue.erl中關於創建鏡像隊列的參數處理邏輯;下面是一段C#客戶端代碼顯示指定要在哪些節點復制消息.
..\rabbitmq-server-2.8.7\src\rabbit_amqqueue.erl determine_queue_nodes(Args) -> Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), case {Policy, PolicyParams} of {{_Type, <<"nodes">>}, {array, Nodes}} -> case [list_to_atom(binary_to_list(Node)) || {longstr, Node} <- Nodes] of [Node] -> {Node, undefined}; [First | Rest] -> {First, [First | Rest]} end; {{_Type, <<"all">>}, _} -> {node(), all}; _ -> {node(), undefined} end.
聲明一個yaqp_pic_queue
ch.ExchangeDeclare(exchange, exchangeType, true);//,true,true,false,false, true,null); ch.QueueDeclare(pic_process_queue, true, false , false, new Dictionary <string, object>() { { "x-ha-policy" , "nodes" }, { "x-ha-policy-params", new List<string >() { "z_91@zen.com", "z_93@zen.com" } } }); ch.QueueBind(pic_process_queue, exchange, routingKey);
用rabbitmqctl看看slave_pids
[root@localhost scripts]# ./rabbitmq-util -n z_93@zen.com cluster_status Cluster status of node 'z_93@zen.com' ... [{nodes,[{disc,['z_92@zen.com']},{ram,['z_93@zen.com','z_91@zen.com']}]}, {running_nodes,['z_92@zen.com','z_91@zen.com','z_93@zen.com']}] ...done. [root@localhost scripts]# ./rabbitmq-util -n z_91@zen.com list_queues name pid slave_pids Listing queues ... qp_pic_queue <'z_93@zen.com'.2.6931.0> [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>] yaqp_pic_queue <'z_91@zen.com'.2.7875.0> [<'z_93@zen.com'.2.7387.0>] qp_pic_queue2 <'z_92@zen.com'.3.232.0> [] ...done. [root@localhost scripts]#
只是實際操作的時候有點兩難了:我們要在代碼編寫的時候就要指定這個參數,而運行時這些節點可是不一定活着的.如果顯示指定的節點不是都處於在線狀態,declare就會失敗.節點調整的時候硬編碼也會讓我們陷入窘境,所以rabbitmq in action推薦的做法是使用x-ha-policy的all選項,即在全集群范圍內復制消息.
半路增加新節點會如何?
理想狀態總是簡單,真實環境總是復雜;如果我們要在運行時添加一個新的節點到集群中(添加節點這種事情這再正常不過),消息復制會怎么處理?如果有新節點加入,RabbitMQ不會同步之前的歷史數據,只會復制新消息.這里的假設是隨着消息的被consumer取走,最終所有的節點的數據都會對齊一致.
一個自然的追問就是:master節點退出集群會選一個slave作為master,那么要是不幸選中了一個剛剛加入集群的節點怎么辦?不就丟消息了么?放心RabbitMQ會維護節點的狀態是否已經同步,使用rabbitmqctl的synchronised_slave_pids參數,就可以查看狀態.看下面的例子,如果slave_pids和synchronised_slave_pids里面的節點是一致的,那說明全都同步了.如果不一致很容易比較出來哪些還沒有同步.
[root@localhost scripts]# ./rabbitmq-util -n z_91@zen.com list_queues name pid slave_pids synchronised_slave_pids Listing queues ... zen_qp_pic_queue <'z_91@zen.com'.2.8009.0> [<'z_93@zen.com'.2.7517.0>] [<'z_93@zen.com'.2.7517.0>] qp_pic_queue <'z_93@zen.com'.2.6931.0> [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>, <'z_94@zen.com'.1.595.0>] [<'z_91@zen.com'.2.7445.0>, <'z_92@zen.com'.3.235.0>, <'z_94@zen.com'.1.595.0>] yaqp_pic_queue <'z_91@zen.com'.2.7875.0> [<'z_93@zen.com'.2.7387.0>] [<'z_93@zen.com'.2.7387.0>] qp_pic_queue2 <'z_92@zen.com'.3.232.0> yy_qp_pic_queue <'z_91@zen.com'.2.7920.0> [<'z_93@zen.com'.2.7434.0>] [<'z_93@zen.com'.2.7434.0>] ...done. [root@localhost scripts]#
官網資料: Highly Available Queues http://www.rabbitmq.com/ha.html
最后,小圖一張 每當變幻時 Miss