RabbitMQ 集群之鏡像同步


mirrored

在上個博文中講到了如果做集群,那么集群是成功了,但是queue是如何存放的呢?消息又是怎么同步呢。

默認的,也就是什么也不配置,直接在某個節點中添加一個queue,那么它僅僅是屬於這個節點的。其它節點有的只是它的影子。所以像斷線重連、操作恢復是無法做到的,實驗證明確實是這樣的。聲明queue的節點關閉那么是無法再進行發布消息與消費的。這自然失去了集群的意義

image

 

所以default模式一般是不會進行使用的,我們的選擇是 鏡像節點

queue mirror 也就是說,也就是 會有一個 master 對應零個或多個 slave

也可以說成主從復制,每個節點都有着相同的數據,某個節點掛掉了,另一個可以立即頂上

 

官方的文檔也可以看到,鏡像的配置是通過 policy 策略的方式

image

我們有三種同步的方式,一般來說  all 是我們的最佳選擇

all 所有的節點都將被同步

exactly 指定個數的節點被同步

nodes 指定的名稱的節點被同步

一個簡單的示例,下面使用的是ha-all 是我們的策略名稱,后面的是^ha\. 則是一個正則,queue名滿足這個規則的,將被做鏡像。方式是所有的節點都被同步

這里需要說明的是 策略 可以被應用到exchange與queue上面,也可以選擇只被應用到哪個上面,下面的命令則是exchange與queue都被應用到了

rabbitmqctl set_policy ha-all "^ha\." "{""ha-mode"":""all""}"

image

翻看文檔,似乎沒有提供apply to 的參數,所以下面的操作都將使用WEB UI可以看到WEB UI的創建策略給我們提供了豐富的選項,比如Apply to,優先級,等其它設置

image 

我們需要關注提上圖中 關於 HA的參數

Ha mode 同步方式

Ha params 選擇其它同步方式的參數 比如節點名

Ha async mode 同步方式,這個需要詳細說明的,默認的情況下。當節點斷線后那么這個節點就已經是落后的版本,當我們再去啟動節點的時候數據我們需要去手動的同步,這自然是不好的,做到自動化是最完美的,所以設置成automatic是最佳選擇

下面設置一個作用與queue,集群中的所有節點將被同步,並且斷線重啟后自動同步的的策略

image

 

設置完畢后,就可以通過代碼來看看效果了,因為使用了集群,所以創建connection的時候需要做些變化了,創建連接的時候把節點的ip放了進去。

AutomaticRecoveryEnabled  斷線重連,也就是如果當前的連接斷開了,將會嘗試重連

TopologyRecoveryEnabled 重連后恢復當前的工作進程,比如channel、queue、發布的消息進度等

               var factory = new ConnectionFactory()
                {
                    UserName = "admin",
                    Password = "admin",
                    AutomaticRecoveryEnabled = true,
                    TopologyRecoveryEnabled = true
                };


                Connection = factory.CreateConnection(new string[3] { "192.168.1.115", "192.168.1.113", "192.168.1.123" });

這里聲明了一個test1queue,然后進行發布消息,當節點崩潰,代碼自然也會異常。所以try catch起來,但是沒關系。因為我們已經設置了斷線重連,所以一會兒,就會恢復如常

//創建返回一個新的頻道
            using (var channel = RabbitMqHelper.GetConnection().CreateModel())
            {
                channel.QueueDeclare("test1", true, false, false, null);

                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;
                for (var i = 0; i < 10000; i++)
                {
                    try
                    {

                        channel.BasicPublish(string.Empty, "test1", properties, Encoding.UTF8.GetBytes($"這是{i}個消息"));
                        Thread.Sleep(1000);
                        Console.WriteLine($"發布消息 {i}");
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                        i--;
                    }
                }

            }

程序運行后,在WEB UI中已經可以看到了,主節點是rabbimq1,剩下的是從節點。

image

可以看到現在連接到的節點是rabbitmq2,現在我們可以手動的把它掛掉

image

D4UIV_$Y2L]VUDC8Y{[3W~8

可以看到應用程序一直在報錯,就這么報了一會,然后又繼續發布消息了。這樣就保證了我們消息隊列的高可用、高可靠

image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM