上篇文章把單機集群搭建好了,可以開始驗證普通集群的相關功能了.
我們首先在管理后台(15672,15673 都可以)添加一個用戶,並用新用戶登錄,添加一個虛擬主機
由於是在一台機器上模擬集群,所以我們把創建連接的工具類小改一下,將端口號作為入參.
public static class ConnectionHelper { public static IConnection GetConnection(int port) { //定義一個連接工廠 ConnectionFactory factory = new ConnectionFactory { HostName = "127.0.0.1",//設置服務器地址 Port = port, //設置端口號 VirtualHost = "/vhost_wjire",//設置虛擬主機 UserName = "wjire",//設置用戶名 Password = "******"//設置密碼 }; //factory.AutomaticRecoveryEnabled //自動恢復連接,默認就是true //factory.NetworkRecoveryInterval //自動恢復連接失敗,默認每 5 秒重試一次 //連接恢復后才會進行拓撲恢復 //factory.TopologyRecoveryEnabled //默認也是true return factory.CreateConnection(); } }
一.非持久化隊列驗證
1.生產者連接到 node1 (5672) 聲明隊列,發送消息,消費者連接到 node2 (5673) 接收消息.
生產者
public class Producer { private const string QueueName = "test_queue"; public static void Send() { using (IConnection connection = ConnectionHelper.GetConnection(5672)) using (IModel channel = connection.CreateModel()) { var msg = "hello world"; channel.QueueDeclare(QueueName, false, false, false, null); channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } } }
消費者
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(5673); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { string str = Encoding.Default.GetString(e.Body); Console.WriteLine("consumer receive : " + str); }; channel.BasicConsume(queue: QueueName, autoAck: true, consumer: consumer); } }
運行結果如下,並且在管理后台我們可以看到該隊列的節點.
2.生產者連接到 node2 ,將消息發送到上面代碼在 node1 聲明的隊列,而消費者則連接到 node2 接收消息.
測試結果一切正常,就不上圖了.
二.持久化隊列及消息驗證
生產者部分代碼
channel.QueueDeclare(QueueName, true, false, false, null);//隊列持久化 var pros = channel.CreateBasicProperties(); pros.Persistent = true;//將消息設置為持久化 channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg));
消息發送后
現在,我們關閉 node1 的RabbitMQ.然后再看管理后台,當然,15672 肯定訪問不了,只能訪問 15673
下面,我們嘗試讓生產者在 node2 也就是連接到 5673 重新聲明一個叫 "test_queue"的隊列.
結果直接異常了,下面是異常的部分截圖:
現在我們重新啟動 node1 ,過了幾秒后,隊列恢復到了 node1 關閉之前的狀態.
三.集群節點介紹
RabbitMQ集群中的節點分內存節點(RAM)和磁盤節點(disc).
- 內存節點:將所有的隊列,交換機,綁定,用戶,權限,vhost的元數據都存儲在內存中;
- 磁盤節點:將數據存放在磁盤上.磁盤節點需要保存集群的配置信息
如果發送的是持久化消息,那么即使是內存節點,數據還是會放在磁盤中.內存節點的性能只能體現在資源管理上,比如增加或刪除隊列,虛擬主機,交換機等,但發送和接受消息速度同磁盤節點一樣.
一個集群至少有一個節點是磁盤節點,其他節點可以都是內存節點,當節點加入或者離開集群時都要將變更通知到至少一個磁盤節點.實際使用時至少要兩個磁盤節點,原因很簡單,如果只有一個磁盤節點,恰巧磁盤節點掛了,那么RabbitMQ將不能創建隊列,創建交換機,創建綁定,添加用戶,更改權限,添加或刪除節點等操作,但是可以正常的發布和消費消息. 在實際使用中必須將集群的配置放到磁盤節點上來保存.
一個集群中的節點可以共享user,vhost,exchange等,所有的數據和狀態都會在所有節點上復制.在集群模式下只要有任何一個節點可以工作,RabbitMQ集群對外就能提供服務.
單機集群只允許磁盤節點,否則每次重啟所有數據將會丟失.