RabbitMQ的集群是依賴erlang集群的,而erlang集群是通過.erlang.cookie文件進行通信認證的,所以我們使用RabbitMQ集群時只需要配置一下.erlang.cookie文件即可。下邊簡單演示一下RabbitMQ高可用集群的搭建,附帶一個簡單使用C#驅動RabbtiMQ集群的小栗子。
1 搭建RabbitMQ高可用集群
首先准備三台設備,這里采用的三台Centos7的虛擬機,測試一下各個虛擬機能不能相互ping通,如果可以相互ping通的話,在每台虛擬機上分別安裝RabbitMQ,可以參考第一篇的安裝方法。
第1步 修改主機配置
為了方便機器間的相互訪問,三台centos都執行 vim /etc/hosts ,添加下邊的配置(注意修改成自己設備的IP):
192.168.70.129 rabbitmq1 192.168.70.131 rabbitmq2 192.168.70.133 rabbitmq3
一般情況,hosts文件中內容如下:
第2步:修改.erlang.cookie文件
修改三台設備的.erlang.cookie中的key一致。如果使用的是前邊的安裝方法,.erlang.cookie的位置為 /var/lib/rabbitmq/.erlang.cookie (采用其他安裝方式找不到文件的話,可以使用命令 find / -name '.erlang.cookie' 找到文件位置)。這里三台虛擬機的key都采用192.168.70.129的key(值為CRRQPKHDXEEIUJUOGYKN),在另外兩台設備上 執行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改文件內容為CRRQPKHDXEEIUJUOGYKN。在修改時如果遇到權限問題,可執行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改文件的權限為可寫,修改內容完成后,執行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把文件再次改成只讀的。
完成上邊的兩步后,erlang集群就搭建好了,重啟所有的設備即可。在rabbitmq1節點的虛擬機上執行命令 rabbitmqctl cluster_status 查看集群狀態:
第3步:添加/刪除節點
添加節點
把rabbitmq2節點添加到集群中去,在rabbitmq2節點執行以下命令:
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl start_app
執行完成后,查看集群狀態,看到rabbitmq2已經在集群中了,如下:
重復上邊的步驟,把rabbitmq3也添加到集群中,在rabbitmq3節點執行下邊命令:
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit2 rabbitmqctl start_app
查看集群狀態,rabbitmq3也在集群中了,如下:
這時我們打開任意一個節點的Web管理界面,顯示如下,看到集群已經配置完成了:
刪除節點
把某一節點從集群中刪除很簡單,reset一下節點即可。如刪除rabbitmq3節點,在rabbitmq3上執行以下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
執行完成后,查看集群狀態如下:
現在搭建的集群是默認的普通集群,普通集群中節點可以共享集群中的exchange,routingKey和queue,但是queue中的消息只保存在首次聲明queue的節點中。任意節點的消費者都可以消費其他節點的消息,比如消費者連接rabbitmq1節點的消費者(代碼中建立Connection時,使用的rabbitmq1的IP)可以消費節點rabbitmq2的隊列myqueue2中的消息,消息傳輸過程是:rabbitmq2把myqueue2中的消息傳輸給rabbtimq1,然后rabbitmq1節點把消息發送給consumer。因為queue中的消息只保存在首次聲明queue的節點中,這樣就有一個問題:如果某一個node節點掛掉了,那么只能等待該節點重新連接才能繼續處理該節點內的消息(如果沒有設置持久化的話,節點掛掉后消息會直接丟失)。如下圖,rabbitmq1節點掛掉后,myqueue隊列就down掉了,不能被訪問。
針對上邊的問題,我們可能會想到:如果可以讓rabbitmq中的節點像redis集群的節點一樣,每一個節點都保存所有的消息,比如讓rabbitmq1不僅僅保存自己隊列myqueue的消息,還保存其他節點的隊列myqueue2和myqueue3中的消息,rabbitmq2和rabbitmq3節點也一樣,這樣就不用擔心宕機的問題了。rabbitmq也提供了這樣的功能:鏡像隊列。鏡像隊列由一個master和多個slave組成,使用鏡像隊列消息會自動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。
第4步:配置鏡像隊列
rabbitmq配置鏡像隊列十分簡單,我們在任意一個node節點下執行下邊的命令就可以完成鏡像隊列的配置(當然也可以在Web管理界面上添加policy):
rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}' # ha-all:為策略名稱; # ^my:為匹配符,只有一個^代表匹配所有,^abc為匹配名稱以abc開頭的queue或exchange; # ha-mode:為同步模式,一共3種模式: # ①all-所有(所有的節點都同步消息), # ②exctly-指定節點的數目(需配置ha-params參數,此參數為int類型比如2,在集群中隨機抽取2個節點同步消息) # ③nodes-指定具體節點(需配置ha-params參數,此參數為數組類型比如["rabbit@rabbitmq1","rabbit@rabbitmq2"],明確指定在這兩個節點上同步消息)。
打開Web管理界面,如果效果如下表示鏡像隊列已經配置完成了。當前myqueue的master節點為rabbitmq1:
如果首次聲明queue的節點(master)掛了,其他節點會自動變成master,如上圖myqueue的master為rabbitmq1,停掉rabbtmq1后,結果如下:rabbitmq2成為了master。
我們發現rabbitmq1節點掛了后,rabbitmq2自動成為了myqueue的master,myqueue不會down掉,可以正常的添加/刪除/獲取消息,這就解決了普通集群宕機的問題。使用鏡像隊列,因為各個節點要同步消息,所以比較耗費資源,一般在可靠性比較高的場景使用鏡像隊列。
還可以配置其他策略的鏡像隊列,也是一行命令即可完成配置,一些其它同步模式的栗子:
#策略名為ha-twe,匹配以“my”開頭的queue或exchange,在集群中隨機挑選鏡像節點,同步的節點為2個 rabbitmqctl set_policy ha-two "^my" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' #策略名為ha-nodes,匹配以“my”開頭的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3為同步節點 rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'
第5步:C#驅動RabbitMQ集群
C#驅動RabbitMQ集群與C#驅動單機RabbtiMQ的方式基本一樣,區別在於使用集群時,創建Connection指定的是一個host集合。看一個簡單的栗子:
生產者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用戶名 Password = "123456",//密碼 AutomaticRecoveryEnabled = true,//Connection斷了,自動重新連接 }; //集群中的三個rabbitmq節點 List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //隨機連接一個rabbitmq節點 using (var connection = factory.CreateConnection(hosts)) { //創建通道channel using (var channel = connection.CreateModel()) { Console.WriteLine("生產者准備就緒...."); #region 發布100條消息 for (int i = 0; i < 100; i++) { channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: Encoding.UTF8.GetBytes($"第{i}條消息")); } #endregion } } Console.ReadKey(); }
消費者代碼:
static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用戶名 Password = "123456",//密碼 }; //集群中的三個rabbitmq節點 List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //隨機連接一個rabbitmq節點 using (var connection = factory.CreateConnection(hosts)) { using (var channel = connection.CreateModel()) { //使用Qos,每次接收1條消息 channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); Console.WriteLine("消費者准備就緒...."); #region EventingBasicConsumer //定義消費者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { //處理一條消息需要10s時間 Thread.Sleep(1000); //顯示確認消息 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成!"); }; //處理消息 channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }
執行這兩個應用程序,結果如下:
到這里RabbtMQ的集群搭建就告一段落了,有一個小問題:RabbitMQ的集群默認不支持負載均衡的。我們可以根據設備的性能,使用Qos給各個消費者指定合適的最大發送條數,這樣可以在一定程度上實現負載均衡。也有園友通過Haproxy實現RabbitMQ集群的負載均衡,有興趣的小伙伴可以研究一下,為什么使用Haprpxy而不用Ngnix呢?這是因為Haproxy支持四層(tcp,udp等)和七層(http,https,email等)的負載均衡,而Nginx只支持七層的負載均衡,而Rabbitmq是通過tcp傳輸的。本節也是RabbitMQ系列的最后一篇,如果文中有錯誤的話,希望大家可以指出,我會及時修改,謝謝。