快速掌握RabbitMQ(五)——搭建高可用的RabbitMQ集群


  RabbitMQ的集群是依賴erlang集群的,而erlang集群是通過.erlang.cookie文件進行通信認證的,所以我們使用RabbitMQ集群時只需要配置一下.erlang.cookie文件即可。下邊簡單演示一下RabbitMQ高可用集群的搭建,附帶一個簡單使用C#驅動RabbtiMQ集群的小栗子。

1 搭建RabbitMQ高可用集群

  首先准備三台設備,這里采用的三台Centos7的虛擬機,測試一下各個虛擬機能不能相互ping通,如果可以相互ping通的話,在每台虛擬機上分別安裝RabbitMQ,可以參考第一篇的安裝方法

第1步 修改主機配置

   為了方便機器間的相互訪問,三台centos都執行  vim /etc/hosts ,添加下邊的配置(注意修改成自己設備的IP):

192.168.70.129 rabbitmq1
192.168.70.131 rabbitmq2
192.168.70.133 rabbitmq3

  一般情況,hosts文件中內容如下:

第2步:修改.erlang.cookie文件

  修改三台設備的.erlang.cookie中的key一致。如果使用的是前邊的安裝方法,.erlang.cookie的位置為 /var/lib/rabbitmq/.erlang.cookie (采用其他安裝方式找不到文件的話,可以使用命令  find / -name '.erlang.cookie' 找到文件位置)。這里三台虛擬機的key都采用192.168.70.129的key(值為CRRQPKHDXEEIUJUOGYKN),在另外兩台設備上 執行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改文件內容為CRRQPKHDXEEIUJUOGYKN。在修改時如果遇到權限問題,可執行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改文件的權限為可寫,修改內容完成后,執行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把文件再次改成只讀的。

  完成上邊的兩步后,erlang集群就搭建好了,重啟所有的設備即可。在rabbitmq1節點的虛擬機上執行命令 rabbitmqctl cluster_status 查看集群狀態:

 第3步:添加/刪除節點

添加節點

  把rabbitmq2節點添加到集群中去,在rabbitmq2節點執行以下命令:

rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit1
rabbitmqctl start_app

  執行完成后,查看集群狀態,看到rabbitmq2已經在集群中了,如下:

  重復上邊的步驟,把rabbitmq3也添加到集群中,在rabbitmq3節點執行下邊命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbit2
rabbitmqctl start_app
  查看集群狀態,rabbitmq3也在集群中了,如下:  

  這時我們打開任意一個節點的Web管理界面,顯示如下,看到集群已經配置完成了:

刪除節點
  把某一節點從集群中刪除很簡單,reset一下節點即可。如刪除rabbitmq3節點,在rabbitmq3上執行以下命令:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

  執行完成后,查看集群狀態如下:

   現在搭建的集群是默認的普通集群,普通集群中節點可以共享集群中的exchange,routingKey和queue,但是queue中的消息只保存在首次聲明queue的節點中。任意節點的消費者都可以消費其他節點的消息,比如消費者連接rabbitmq1節點的消費者(代碼中建立Connection時,使用的rabbitmq1的IP)可以消費節點rabbitmq2的隊列myqueue2中的消息,消息傳輸過程是:rabbitmq2把myqueue2中的消息傳輸給rabbtimq1,然后rabbitmq1節點把消息發送給consumer。因為queue中的消息只保存在首次聲明queue的節點中,這樣就有一個問題:如果某一個node節點掛掉了,那么只能等待該節點重新連接才能繼續處理該節點內的消息(如果沒有設置持久化的話,節點掛掉后消息會直接丟失)。如下圖,rabbitmq1節點掛掉后,myqueue隊列就down掉了,不能被訪問。

  針對上邊的問題,我們可能會想到:如果可以讓rabbitmq中的節點像redis集群的節點一樣,每一個節點都保存所有的消息,比如讓rabbitmq1不僅僅保存自己隊列myqueue的消息,還保存其他節點的隊列myqueue2和myqueue3中的消息,rabbitmq2和rabbitmq3節點也一樣,這樣就不用擔心宕機的問題了。rabbitmq也提供了這樣的功能:鏡像隊列。鏡像隊列由一個master和多個slave組成,使用鏡像隊列消息會自動在鏡像節點間同步,而不是在consumer取數據時臨時拉取。

第4步:配置鏡像隊列

  rabbitmq配置鏡像隊列十分簡單,我們在任意一個node節點下執行下邊的命令就可以完成鏡像隊列的配置(當然也可以在Web管理界面上添加policy):

rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
# ha-all:為策略名稱;
# ^my:為匹配符,只有一個^代表匹配所有,^abc為匹配名稱以abc開頭的queue或exchange;
# ha-mode:為同步模式,一共3種模式:
#    ①all-所有(所有的節點都同步消息),
#    ②exctly-指定節點的數目(需配置ha-params參數,此參數為int類型比如2,在集群中隨機抽取2個節點同步消息)
#    ③nodes-指定具體節點(需配置ha-params參數,此參數為數組類型比如["rabbit@rabbitmq1","rabbit@rabbitmq2"],明確指定在這兩個節點上同步消息)。

  打開Web管理界面,如果效果如下表示鏡像隊列已經配置完成了。當前myqueue的master節點為rabbitmq1:

  如果首次聲明queue的節點(master)掛了,其他節點會自動變成master,如上圖myqueue的master為rabbitmq1,停掉rabbtmq1后,結果如下:rabbitmq2成為了master。

 

  我們發現rabbitmq1節點掛了后,rabbitmq2自動成為了myqueue的master,myqueue不會down掉,可以正常的添加/刪除/獲取消息,這就解決了普通集群宕機的問題。使用鏡像隊列,因為各個節點要同步消息,所以比較耗費資源,一般在可靠性比較高的場景使用鏡像隊列。

還可以配置其他策略的鏡像隊列,也是一行命令即可完成配置,一些其它同步模式的栗子:

#策略名為ha-twe,匹配以“my”開頭的queue或exchange,在集群中隨機挑選鏡像節點,同步的節點為2個
rabbitmqctl set_policy ha-two "^my"    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
#策略名為ha-nodes,匹配以“my”開頭的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3為同步節點
rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'

第5步:C#驅動RabbitMQ集群

  C#驅動RabbitMQ集群與C#驅動單機RabbtiMQ的方式基本一樣,區別在於使用集群時,創建Connection指定的是一個host集合。看一個簡單的栗子:

生產者代碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//用戶名
                Password = "123456",//密碼
                AutomaticRecoveryEnabled = true,//Connection斷了,自動重新連接
            };
            //集群中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機連接一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    Console.WriteLine("生產者准備就緒....");
                    #region 發布100條消息
                    for (int i = 0; i < 100; i++)
                    {
                        channel.BasicPublish(exchange: "myexchange",
                                             routingKey: "mykey",
                                             basicProperties: null,
                                             body: Encoding.UTF8.GetBytes($"第{i}條消息"));
                    }
                    #endregion
                }
            }
            Console.ReadKey();
        }

消費者代碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                UserName = "wyy",//用戶名
                Password = "123456",//密碼
            };
            //集群中的三個rabbitmq節點
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" };
            //隨機連接一個rabbitmq節點
            using (var connection = factory.CreateConnection(hosts))
            {
                using (var channel = connection.CreateModel())
                {
                    //使用Qos,每次接收1條消息
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false);
                    Console.WriteLine("消費者准備就緒....");
                    #region EventingBasicConsumer

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (sender, ea) =>
                    {
                        //處理一條消息需要10s時間
                        Thread.Sleep(1000);
                        //顯示確認消息
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        Console.WriteLine($"處理消息【{Encoding.UTF8.GetString(ea.Body)}】完成!");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "myqueue",
                                           autoAck: false,
                                           consumer: consumer);
                    Console.ReadKey();
                    #endregion
                }
            }
        }

   執行這兩個應用程序,結果如下:

 

   到這里RabbtMQ的集群搭建就告一段落了,有一個小問題:RabbitMQ的集群默認不支持負載均衡的。我們可以根據設備的性能,使用Qos給各個消費者指定合適的最大發送條數,這樣可以在一定程度上實現負載均衡。也有園友通過Haproxy實現RabbitMQ集群的負載均衡,有興趣的小伙伴可以研究一下,為什么使用Haprpxy而不用Ngnix呢?這是因為Haproxy支持四層(tcp,udp等)和七層(http,https,email等)的負載均衡,而Nginx只支持七層的負載均衡,而Rabbitmq是通過tcp傳輸的。本節也是RabbitMQ系列的最后一篇,如果文中有錯誤的話,希望大家可以指出,我會及時修改,謝謝。

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM