RabbitMQ (十四) 普通集群


上篇文章把單機集群搭建好了,可以開始驗證普通集群的相關功能了.

我們首先在管理后台(15672,15673 都可以)添加一個用戶,並用新用戶登錄,添加一個虛擬主機

由於是在一台機器上模擬集群,所以我們把創建連接的工具類小改一下,將端口號作為入參.

    public static class ConnectionHelper
    {
        public static IConnection GetConnection(int port)
        {
            //定義一個連接工廠
            ConnectionFactory factory = new ConnectionFactory
            {  
                HostName = "127.0.0.1",//設置服務器地址
                Port = port,  //設置端口號
                VirtualHost = "/vhost_wjire",//設置虛擬主機
                UserName = "wjire",//設置用戶名
                Password = "******"//設置密碼
            };
            //factory.AutomaticRecoveryEnabled  //自動恢復連接,默認就是true
            //factory.NetworkRecoveryInterval //自動恢復連接失敗,默認每 5 秒重試一次

            //連接恢復后才會進行拓撲恢復
            //factory.TopologyRecoveryEnabled //默認也是true
            return factory.CreateConnection();
        }
    }

 

一.非持久化隊列驗證

1.生產者連接到 node1 (5672) 聲明隊列,發送消息,消費者連接到 node2 (5673) 接收消息.

生產者

    public class Producer
    {
        private const string QueueName = "test_queue";
        public static void Send()
        {
            using (IConnection connection = ConnectionHelper.GetConnection(5672))
            using (IModel channel = connection.CreateModel())
            {
                var msg = "hello world";
                channel.QueueDeclare(QueueName, false, false, false, null);
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }
        }
    }

消費者

    public class Consumer
    {
        private const string QueueName = "test_queue";
        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection(5673);
            IModel channel = connection.CreateModel();
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (s, e) =>
            {
                string str = Encoding.Default.GetString(e.Body);
                Console.WriteLine("consumer receive : " + str);
            };
            channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer);
        }
    }

運行結果如下,並且在管理后台我們可以看到該隊列的節點.

       

2.生產者連接到 node2 ,將消息發送到上面代碼在 node1 聲明的隊列,而消費者則連接到 node2 接收消息.

測試結果一切正常,就不上圖了.

二.持久化隊列及消息驗證 

生產者部分代碼

                channel.QueueDeclare(QueueName, true, false, false, null);//隊列持久化 var pros = channel.CreateBasicProperties();
                pros.Persistent = true;//將消息設置為持久化
                channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg));

消息發送后

現在,我們關閉 node1 的RabbitMQ.然后再看管理后台,當然,15672 肯定訪問不了,只能訪問 15673

下面,我們嘗試讓生產者在 node2 也就是連接到 5673 重新聲明一個叫 "test_queue"的隊列.

結果直接異常了,下面是異常的部分截圖:

現在我們重新啟動 node1 ,過了幾秒后,隊列恢復到了 node1 關閉之前的狀態.

三.集群節點介紹

RabbitMQ集群中的節點分內存節點(RAM)和磁盤節點(disc).

  • 內存節點:將所有的隊列,交換機,綁定,用戶,權限,vhost的元數據都存儲在內存中;
  • 磁盤節點:將數據存放在磁盤上.磁盤節點需要保存集群的配置信息

如果發送的是持久化消息,那么即使是內存節點,數據還是會放在磁盤中.內存節點的性能只能體現在資源管理上,比如增加或刪除隊列,虛擬主機,交換機等,但發送和接受消息速度同磁盤節點一樣.

一個集群至少有一個節點是磁盤節點,其他節點可以都是內存節點,當節點加入或者離開集群時都要將變更通知到至少一個磁盤節點.實際使用時至少要兩個磁盤節點,原因很簡單,如果只有一個磁盤節點,恰巧磁盤節點掛了,那么RabbitMQ將不能創建隊列,創建交換機,創建綁定,添加用戶,更改權限,添加或刪除節點等操作,但是可以正常的發布和消費消息. 在實際使用中必須將集群的配置放到磁盤節點上來保存.

一個集群中的節點可以共享user,vhost,exchange等,所有的數據和狀態都會在所有節點上復制.在集群模式下只要有任何一個節點可以工作,RabbitMQ集群對外就能提供服務.

單機集群只允許磁盤節點,否則每次重啟所有數據將會丟失.

 

 



 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM