RabbitMQ如何保證發送端消息的可靠投遞-發生鏡像隊列發生故障轉移時


上一篇最后提到了mandatory這個參數,對於設置mandatory參數個人感覺還是很重要的,尤其在RabbitMQ鏡像隊列發生故障轉移時。

模擬個測試環境如下:

首先在集群隊列中增加兩個鏡像隊列的策略:

對於ha-promote-on-shutdown這個參數,可以參考文檔,其作用就是當集群中master出現故障時強制進行故障轉移從而選出新的master節點,這里的master出現故障表示的是人為的故障比如通過命令行rabbitmqctl.bat start_app之類的關閉RabbitMQ實例或者說是關閉電腦之類的。因為這種強制切換master節點的情況通常發生在斷電之類的非可控因素上,所以通過設置這個參數為always模擬非可控因素。

當然設置這個參數會存在一定風險,文檔里也說了,會發生消息不同步也就是會丟消息。

然后創建四個隊列和兩個Echange,采用綁定Exchange的topic模式

然后先貼一下測試代碼在進行說明

C#代碼

       List<string> hosts = new List<string>();
            hosts.Add("192.168.1.1");
            hosts.Add("192.168.1.2"); 
            int curHostIndex = 0;
            string exchange = "always.exchange";
            string touteKey = "yu.1";
            byte[] msg = Encoding.UTF8.GetBytes("hello");
            ConnectAgain:
            ConnectionFactory factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = hosts[curHostIndex];
            IConnection conn = factory.CreateConnection(); 
            IModel channel = conn.CreateModel();
            IBasicProperties props = channel.CreateBasicProperties();
            props.ContentType = "text/plain";
            props.DeliveryMode = 2;
            for (int i = 0; i < 5000000; i++)
            {
                try
                {
                    channel.ConfirmSelect();
                    channel.BasicAcks += (sender, eventArgs) => { };
                    channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投遞失敗 " + eventArgs.ReplyText); 
                    channel.BasicPublish(exchange, touteKey, true, props, msg);
                    bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 0, 0, 1));
                    if (!success)
                        Console.WriteLine("表示消息投遞失敗 ");
                }
                catch (Exception ex)
                {   //發生鏈接異常時換個IP進行連接
                    channel.Close();
                    conn.Close();
                    if (curHostIndex == 0)
                        curHostIndex = 1;
                    else
                        curHostIndex = 0;
                    goto ConnectAgain;
                }
            }

Java代碼:

 public static void publish() throws Exception {
        List<String> hosts = new ArrayList<String>();
        hosts.add("192.168.1.1");
        hosts.add("192.168.1.2");
        int curHostIndex = 1;
        String exchange = "common.exchange";
        String routeKey = "yu.1";
        byte[] msg = "hello".getBytes("UTF-8");
        ConnectAgain:
        while (true) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(hosts.get(curHostIndex));
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");
            // 創建一個新的連接
            Connection connection = factory.newConnection();
            // 創建一個頻道
            Channel channel = connection.createChannel();
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println(l);
                }

                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
            });
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                    System.out.println("響應狀態碼-ReplyCode:" + i);
                    System.out.println("響應內容-ReplyText:" + s);
                    System.out.println("Exchange:" + s1);
                    System.out.println("RouteKey" + s2);
                    System.out.println("投遞失敗的消息:" + new String(bytes, "UTF-8"));
                }
            });
            for (int i = 0; i < 5000000; i++) {
                try {
                    channel.confirmSelect();
                    channel.basicPublish(exchange, routeKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
                    boolean sucess = channel.waitForConfirms(10);
                    System.out.println(sucess);
                } catch (Exception ex) {
                    try {
                        connection.abort();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (curHostIndex == 0)
                        curHostIndex = 1;
                    else
                        curHostIndex = 0;
                    continue ConnectAgain;
                }
            }
        }

    }

 

先測試下always.exchange也就是非人為因素導致的故障轉移的情況,先開啟客戶端讓客戶端跑着,然后通過命令行停止master節點(也就是Node為WinServer12-1),停止時消息隊列的消息狀態為

圖片消息的總數雖然不准確(頁面存在延遲) ,但截取的是master停止時刻的消息狀態,也夠用了,這時候發現slaver節點會切換為master節點(也就是Node為DESKTOP-078UA43),並繼續接受消息,客戶點也沒有發生異常通知(因為訂閱了BasicReturn事件並且開啟了madartory,如果消息投遞失敗,我們可以得到通知,待會也會測試到)。

然后我們讓集群多跑會然后在消息有明顯變化的時候在開啟老的掛掉的當前為slaver的節點,當前隊列消息的狀態如下,master為DESKTOP-078UA43

在開啟slaver后我們在立即停止當前的master節點(也就是Node為WinServer12-1),這時候發現集群的master又切回到了DESKTOP-078UA43同時隊列中的消息也跟着清除了。。也就是說在DESKTOP-078UA43之前掛掉到重啟啟動期間WinServer12-1接收到的消息全部丟掉了。。。由此我們可知,RabbitMQ鏡像集群發生非可控因素造成的master故障為了保證可用性,會丟消息。

而對於客戶端而已,消息都是可靠投遞的,所以監聽事件並不會觸發。

當然也可以通過設置ha-sync-mode參數進行調整,默認情況下,新加入的節點不會同步已存在節點內的消息,設置為automatic后會進行同步。不過如果沒同步完master掛掉的話消息還是會丟掉的

然后測試下common.exchange會發生的情況,測試這個的時候就是體現mandatory作用的時刻了!

還是先在集群正常的情況下選取個時間點關掉主節點,當前master為DESKTOP-078UA43

 

然后WinServer12-1變為新的master,此刻發現正常接收消息,而且對客戶端而言,消息也是正常投遞的。然后打開被關閉的DESKTOP-078UA43節點,它會以slaver身份回歸集群,開啟前觀察下當前隊列狀態

然后開始操作!發現隊列狀態如下,NaN,難道說隊列停止接受數據了么!!!(如果停止接受數據,客戶端同步調用發送時會發送失敗么?)而且無法將master進行切換了。

這時候如果在啟動WinServer12-1會發現,消息還是WinServer12-1關閉時刻的消息,WinServer12-1關閉期間DESKTOP-078UA43盡管在接受消息,但實際消息並沒有被RabbitMQ可靠存儲(比較master都沒有了。。);

觀察下調試的代碼,發現消息還是在正常向RabbitMQ投遞。

 

客戶端為了保證向RabbitMQ投遞消息的可靠,及開啟了Conform模式,但此刻同步返回的RabbitMQ處理結果是消息處理完成。那豈不是NaN期間RabbitMQ把消息都吞了?而客戶端還傻傻的以為發送成功了。。

這時候就體現開啟mandatory同時訂閱 channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投遞失敗 " + eventArgs.ReplyText);事件的作用了。。因為這時候RabbitMQ會反饋給你消息實際上並沒有投遞成功的信息。

 

這里包含了持久化失敗的原因,同時包含發送消息的詳細信息,方便客戶端對消息進行在處理。

其實說了這么多,最后想說的是對於消息的一致性,最好還是不要全部依賴於RabbitMQ,實現最終一致性並保證冪等性才是相對可靠的方案。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM