消息發布者向RabbitMQ進行消息投遞時默認情況下是不返回發布者該條消息在broker中的狀態的,也就是說發布者不知道這條消息是否真的抵達RabbitMQ的broker之上,也因此會發生消息丟失的情況。
對此,RabbitmQ提供了兩種解決方案(以官方提供的SDK為例)
1.通過AMOP提供的事務機制:
C#代碼:
try { channel.TxSelect(); channel.BasicPublish("yu.exchange", "yu.1", props, msg); channel.TxCommit(); } catch (Exception ex) { channel.TxRollback(); }
java代碼是一樣的操作。。。
byte[] msg = "hello".getBytes(); try { channel.txSelect(); channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); channel.txCommit(); } catch (Exception ex) { channel.txRollback(); }
事務開啟,提交,回滾都有了。。。
2.Conform模式,也就是官網推薦的消息批量提交的方式
Conform模式主要包含兩種編程模式,一種同步的,一種異步通知的:
同步回調的調用方式與事務模式差不多
C#代碼:
channel.ConfirmSelect(); byte[] msg = Encoding.UTF8.GetBytes("hello"); channel.BasicPublish("yu.exchange1", "yu.1", props, msg); bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 10)); Console.WriteLine(success);
Java代碼:
byte[] msg = "hello".getBytes(); channel.confirmSelect(); channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); boolean success = channel.waitForConfirms(10);
通道Channel開啟Conform,在發送完消息后可以通過WaitForConfirm等待消息的投遞結果,這里有個可選參數,就是阻塞等待的時間,如果返回結果為false則表示消息投遞失敗,則發送端這時候也就可以采取重試之類的策略了。
異步回調的方式也就是通道訂閱RabbitMQ的發送完畢確認事件,消息投遞成功會回調這個方法給發送方,回調的參數包含當前消息在該通道中發送的編號DeliveryTag(批量提交的時候可以根據這個編號對應提交集合的索引,保證對應集合索引上的消息可靠投遞),的最大值是9223372036854775807
C#代碼:
channel.BasicAcks += (sender, eventArgs) => { ulong tag = eventArgs.DeliveryTag; }; channel.BasicReturn += (sender, eventArgs) => { }; byte[] msg = Encoding.UTF8.GetBytes("hello"); channel.BasicPublish("yu.exchange1", "yu.1", true, props, msg);
Java代碼:
channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { System.out.println(l); } public void handleNack(long l, boolean b) throws IOException { System.out.println(l); } }); channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { System.out.println("響應狀態碼-ReplyCode:"+i); System.out.println("響應內容-ReplyText:"+s); System.out.println("Exchange:"+s1); System.out.println("RouteKey"+s2);
System.out.println("投遞失敗的消息:"+ new String(bytes,"UTF-8") );
} }); byte[] msg = "hello".getBytes(); channel.confirmSelect(); channel.basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
代碼中多訂閱了一個BasicReturn事件(addReturnListener),當消息被RabbitMQ拒絕或者說沒有成功投遞的時候,則會觸發這個方法,當然想要獲取詳細信息則需要設置mandatory參數為true,也就是basicPublish("yu.exchange1", "yu.1",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);中的第三個參數。
隊列中新建一個yu.exchange1的交換機然后不綁定隊列的情況下則會投遞失敗的時間;