RabbitMQ消息確認的本質也就是為了解決RabbitMQ消息丟失問題,因為哪怕我們做了RabbitMQ持久化,其實也並不能保證解決我們的消息丟失問題
RabbitMQ的消息確認有兩種
- 第一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
- 第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。
1.消息發送確認(生產者)
正常情況下,生產者會通過交換機發送消息至隊列中,再由消費者來進行消費,但是其實RabbitMQ在接收到消息后,還需要一段時間消息才能存入磁盤,並且其實也不是每條消息都會存入磁盤,可能僅僅只保存到cache中,這時,如果RabbitMQ正巧發生崩潰,消息則就會丟失,所以為了避免該情況的發生,我們引入了生產者確認機制,rabbitmq對此提供了兩種方式:
- 通過事務實現
- 通過發送方確認機制(
publisher confirm
)實現
事務實現
channel.txSelect()
: 將當前信道設置成事務模式channel.txCommit()
: 用於提交事務channel.txRollback()
: 用於回滾事務
通過事務實現機制,只有消息成功被rabbitmq服務器接收,事務才能提交成功,否則便可在捕獲異常之后進行回滾,然后進行消息重發,但是事務非常影響rabbitmq的性能。還有就是事務機制是阻塞的過程,只有等待服務器回應之后才會處理下一條消息
/** * 創建生產者 */ public class Send {public static void main(String[] args) throws IOException, TimeoutException { //從MQ工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //創建一個通道 Channel channel = connection.createChannel(); //准備發送的消息內容 String msg = "你好"; //准備交換機(已創建的交換機) String exchangeName = "direct-exchange"; //准備路由 String routekey = "email"; try{ //將信道設置為事務模式 channel.txSelect(); //發送消息給交換機 /** * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有 * 參數2:routekey * 參數3:消息的狀態控制 * 參數4:消息內容 */ //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱 channel.basicPublish(exchangeName,routekey,null,msg.getBytes()); //事務提交 channel.txCommit(); System.out.print("發送成功"); } catch (Exception e){ //如果消息發送給交換機的過程出現異常,則捕捉並進行回滾 channel.txRollback(); System.out.print("發送失敗並回滾"); } //關閉通道 channel.close(); connection.close(); } }
confirm實現
confirm方式有三種模式:普通confirm模式、批量confirm模式、異步confirm模式
channel.confirmSelect()
: 將當前信道設置成了confirm模式
普通confirm模式
每發送一條消息,就調用waitForConfirms()方法,等待服務端返回Ack或者nack消息
/** * 創建生產者 */ public class Send { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //從MQ工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //創建一個通道 Channel channel = connection.createChannel(); //准備發送的消息內容 String msg = "你好"; //准備交換機(已創建的交換機) String exchangeName = "direct-exchange"; //准備路由 String routekey = "email"; //將當前信道設置成confirm模式 channel.confirmSelect(); for(int i = 0;i<20;i++){ //發送消息給交換機 /** * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有 * 參數2:routekey * 參數3:消息的狀態控制 * 參數4:消息內容 */ //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱 channel.basicPublish(exchangeName,routekey,null,msg.getBytes()); //信道為confirm模式后,即可通過waitForConfirms()接收服務端返回來的信息 if(channel.waitForConfirms()){ System.out.print("發送成功"); } } final long start = System.currentTimeMillis(); System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms"); //關閉通道 channel.close(); connection.close(); } }
批量confirm模式
每發送一批消息,就調用waitForConfirmsOrDie()方法,該方法會等到最后一條消息得到ack或者得到nack才會結束,也就是說在waitForConfirmsOrDie處才會造成程序的阻塞
/** * 創建生產者 */ public class Send { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //從MQ工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //創建一個通道 Channel channel = connection.createChannel(); //准備發送的消息內容 String msg = "你好"; //准備交換機(已創建的交換機) String exchangeName = "direct-exchange"; //准備路由 String routekey = "email"; //將當前信道設置成confirm模式 channel.confirmSelect(); for(int i = 0;i<20;i++){ //發送消息給交換機 /** * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有 * 參數2:routekey * 參數3:消息的狀態控制 * 參數4:消息內容 */ //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱 channel.basicPublish(exchangeName,routekey,null,msg.getBytes()); } final long start = System.currentTimeMillis(); //消息批量發送完成后,通過waitForConfirmsOrDie()方法來接收服務端返回的信息 channel.waitForConfirmsOrDie(); System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms"); //關閉通道 channel.close(); connection.close(); } }
異步confirm模式
通過channel,addConfirmListener()監聽發送方確認模式,通過信道中的waitForConfirmsOrDie等待傳回ack或者nack
/** * 創建生產者 */ public class Send { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //從MQ工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //創建一個通道 Channel channel = connection.createChannel(); //准備發送的消息內容 String msg = "你好"; //准備交換機(已創建的交換機) String exchangeName = "fanout-exchanges"; //准備路由 String routekey = ""; //將當前信道設置成confirm模式 channel.confirmSelect(); for(int i = 0;i<100;i++){ msg= i + "chen"; //發送消息給交換機 /** * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有 * 參數2:routekey * 參數3:消息的狀態控制 * 參數4:消息內容 */ //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱 channel.basicPublish(exchangeName,routekey,null,msg.getBytes()); } final long start = System.currentTimeMillis(); //通過addConfirmListener來監聽信道 channel.addConfirmListener(new ConfirmListener() { //消息發送成功 @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("以確認消息:"+ deliveryTag + " 多個消息:" + multiple); } //消息發送失敗 @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("no ack"); } }); System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms"); //關閉通道 channel.close(); connection.close(); } }
2.消息接收確認(消費者)
消息接收確認機制,分為消息自動確認模式和消息手動確認模式,當消息確認后,我們隊列中的消息將會移除
那這兩種模式要如何選擇呢?
- 如果消息不太重要,丟失也沒有影響,那么自動ACK會比較方便。好處就是可以提高吞吐量,缺點就是會丟失消息
- 如果消息非常重要,不容丟失,則建議手動ACK,正常情況都是更建議使用手動ACK。雖然可以解決消息不會丟失的問題,但是可能會造成消費者過載
消息自動確認模式的實現
注:自動確認模式,消費者不會判斷消費者是否成功接收到消息,也就是當我們程序代碼有問題,我們的消息都會被自動確認,消息被自動確認了,我們隊列就會移除該消息,這就會造成我們的消息丟失
/** * 消費者 */ public class Recv { //設定隊列名稱(已存在的隊列) private static final String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { //從mq工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //獲取一個通道 Channel channel = connection.createChannel(); //監聽該隊列,true代表自動確認 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ System.out.println("接收到的消息:"+ new String(body,"UTF-8")); } }); } }
實現效果,消費者會將我們隊列中的消息全部接收然后確認,並移除隊列
消息手動確認模式的實現
/** * 消費者 */ public class Recv { //設定隊列名稱(已存在的隊列) private static final String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { //從mq工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //獲取一個通道 Channel channel = connection.createChannel(); //監聽該隊列,false代表手動確認 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ System.out.println("接收到的消息:"+ new String(body,"UTF-8")); } }); } }
手動確認模式下,當我們消費者成功接收到消息后,在隊列中消息會進入Unacked項,也就是待確認模式
所以我們還需要加上下列代碼,來實現消息者在成功接收到消息后,手動確認
#添加紅色字段
/** * 消費者 */ public class Recv { //設定隊列名稱(已存在的隊列) private static final String QUEUE_NAME = "queue1"; public static void main(String[] args) throws IOException, TimeoutException { //從mq工具類獲取連接信息 Connection connection = MqConnectionUtils.getConnection(); //獲取一個通道 Channel channel = connection.createChannel(); //監聽該隊列 channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{ System.out.println("接收到的消息:"+ new String(body,"UTF-8")); //獲取消息的編號,我們需要根據消息的編號來確認消息 long tag = envelope.getDeliveryTag(); //獲取當前內部類中的通道 Channel c = this.getChannel(); //手動確認消息,確認以后,則表示消息已經成功處理,消息就會從隊列中移除,false代表只確認當前一個消息,true確認所有consumer獲得的消息 c.basicAck(tag,false);
} }); } }
此時,我們的消息才會成功被確認,並移除隊列