RabbitMQ如何保證發送端消息的可靠投遞


消息發布者向RabbitMQ進行消息投遞時默認情況下是不返回發布者該條消息在broker中的狀態的,也就是說發布者不知道這條消息是否真的抵達RabbitMQ的broker之上,也因此會發生消息丟失的情況。

對此,RabbitmQ提供了兩種解決方案(以官方提供的SDK為例)

1.通過AMOP提供的事務機制:

C#代碼:

                try
                {
                    channel.TxSelect();
                    channel.BasicPublish("yu.exchange", "yu.1", props, msg);
                    channel.TxCommit();
                }
                catch (Exception ex)
                {
                    channel.TxRollback();
                }

java代碼是一樣的操作。。。

        byte[] msg = "hello".getBytes();
        try {
            channel.txSelect();
            channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
            channel.txCommit();
        } catch (Exception ex) {
            channel.txRollback();
        }

事務開啟,提交,回滾都有了。。。

2.Conform模式,也就是官網推薦的消息批量提交的方式

Conform模式主要包含兩種編程模式,一種同步的,一種異步通知的:

同步回調的調用方式與事務模式差不多

C#代碼:

                channel.ConfirmSelect();
                byte[] msg = Encoding.UTF8.GetBytes("hello");
                channel.BasicPublish("yu.exchange1", "yu.1", props, msg);
                bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10));
                Console.WriteLine(success);

Java代碼:

        byte[] msg = "hello".getBytes();
        channel.confirmSelect();
        channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
        boolean success = channel.waitForConfirms(10);

通道Channel開啟Conform,在發送完消息后可以通過WaitForConfirm等待消息的投遞結果,這里有個可選參數,就是阻塞等待的時間,如果返回結果為false則表示消息投遞失敗,則發送端這時候也就可以采取重試之類的策略了。

異步回調的方式也就是通道訂閱RabbitMQ的發送完畢確認事件,消息投遞成功會回調這個方法給發送方,回調的參數包含當前消息在該通道中發送的編號DeliveryTag(批量提交的時候可以根據這個編號對應提交集合的索引,保證對應集合索引上的消息可靠投遞),的最大值是9223372036854775807

C#代碼:

           channel.BasicAcks += (sender, eventArgs) =>
                {
                    ulong tag = eventArgs.DeliveryTag;
                };
                channel.BasicReturn += (sender, eventArgs) =>
                {
                   
                };
                byte[] msg = Encoding.UTF8.GetBytes("hello");
                channel.BasicPublish("yu.exchange1", "yu.1", true, props, msg);

Java代碼:

 channel.addConfirmListener(new ConfirmListener() {
            public void handleAck(long l, boolean b) throws IOException {
                System.out.println(l);
            }

            public void handleNack(long l, boolean b) throws IOException {
                System.out.println(l);
            }
        });
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                System.out.println("響應狀態碼-ReplyCode:"+i);
                System.out.println("響應內容-ReplyText:"+s);
                System.out.println("Exchange:"+s1);
                System.out.println("RouteKey"+s2);
System.out.println("投遞失敗的消息:"+ new String(bytes,"UTF-8") );
            }
        });
        byte[] msg = "hello".getBytes();
        channel.confirmSelect();
        channel.basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);

代碼中多訂閱了一個BasicReturn事件(addReturnListener),當消息被RabbitMQ拒絕或者說沒有成功投遞的時候,則會觸發這個方法,當然想要獲取詳細信息則需要設置mandatory參數為true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三個參數。

隊列中新建一個yu.exchange1的交換機然后不綁定隊列的情況下則會投遞失敗的時間;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM