mirrored
在上個博文中講到了如果做集群,那么集群是成功了,但是queue是如何存放的呢?消息又是怎么同步呢。
默認的,也就是什么也不配置,直接在某個節點中添加一個queue,那么它僅僅是屬於這個節點的。其它節點有的只是它的影子。所以像斷線重連、操作恢復是無法做到的,實驗證明確實是這樣的。聲明queue的節點關閉那么是無法再進行發布消息與消費的。這自然失去了集群的意義
所以default模式一般是不會進行使用的,我們的選擇是 鏡像節點
queue mirror 也就是說,也就是 會有一個 master 對應零個或多個 slave
也可以說成主從復制,每個節點都有着相同的數據,某個節點掛掉了,另一個可以立即頂上
官方的文檔也可以看到,鏡像的配置是通過 policy 策略的方式
我們有三種同步的方式,一般來說 all 是我們的最佳選擇
all 所有的節點都將被同步
exactly 指定個數的節點被同步
nodes 指定的名稱的節點被同步
一個簡單的示例,下面使用的是ha-all 是我們的策略名稱,后面的是^ha\. 則是一個正則,queue名滿足這個規則的,將被做鏡像。方式是所有的節點都被同步
這里需要說明的是 策略 可以被應用到exchange與queue上面,也可以選擇只被應用到哪個上面,下面的命令則是exchange與queue都被應用到了
rabbitmqctl set_policy ha-all "^ha\." "{""ha-mode"":""all""}"
翻看文檔,似乎沒有提供apply to 的參數,所以下面的操作都將使用WEB UI可以看到WEB UI的創建策略給我們提供了豐富的選項,比如Apply to,優先級,等其它設置
我們需要關注提上圖中 關於 HA的參數
Ha mode 同步方式
Ha params 選擇其它同步方式的參數 比如節點名
Ha async mode 同步方式,這個需要詳細說明的,默認的情況下。當節點斷線后那么這個節點就已經是落后的版本,當我們再去啟動節點的時候數據我們需要去手動的同步,這自然是不好的,做到自動化是最完美的,所以設置成automatic是最佳選擇
下面設置一個作用與queue,集群中的所有節點將被同步,並且斷線重啟后自動同步的的策略
設置完畢后,就可以通過代碼來看看效果了,因為使用了集群,所以創建connection的時候需要做些變化了,創建連接的時候把節點的ip放了進去。
AutomaticRecoveryEnabled 斷線重連,也就是如果當前的連接斷開了,將會嘗試重連
TopologyRecoveryEnabled 重連后恢復當前的工作進程,比如channel、queue、發布的消息進度等
var factory = new ConnectionFactory() { UserName = "admin", Password = "admin", AutomaticRecoveryEnabled = true, TopologyRecoveryEnabled = true }; Connection = factory.CreateConnection(new string[3] { "192.168.1.115", "192.168.1.113", "192.168.1.123" });
這里聲明了一個test1queue,然后進行發布消息,當節點崩潰,代碼自然也會異常。所以try catch起來,但是沒關系。因為我們已經設置了斷線重連,所以一會兒,就會恢復如常
//創建返回一個新的頻道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { channel.QueueDeclare("test1", true, false, false, null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; for (var i = 0; i < 10000; i++) { try { channel.BasicPublish(string.Empty, "test1", properties, Encoding.UTF8.GetBytes($"這是{i}個消息")); Thread.Sleep(1000); Console.WriteLine($"發布消息 {i}"); } catch (Exception ex) { Console.WriteLine(ex.Message); i--;
}
}
}
程序運行后,在WEB UI中已經可以看到了,主節點是rabbimq1,剩下的是從節點。
可以看到現在連接到的節點是rabbitmq2,現在我們可以手動的把它掛掉
可以看到應用程序一直在報錯,就這么報了一會,然后又繼續發布消息了。這樣就保證了我們消息隊列的高可用、高可靠