事務消息與數據庫的事務類似,只是MQ的消息是要保證消息是否會全部發送成功,防止消息丟失的一種策略。
RabbitMQ有兩種策略來解決這個問題:
1.通過AMQP的事務機制實現
2.使用發送者確認模式實現
1.事務
事務的實現主要是對信道(Channel)的設置,主要方法如下:
1. channel.txSelect() 聲明啟動事務模式
2.channel.txCommit() 提交事務
3.channel.txRollback()回滾事務
1.事務性消息發送
開啟事務之后必須手動channel.txCommit();提交或者channel.txRollback();回滾。
package rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static Connection getConnection() throws Exception { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory.newConnection(); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = getConnection(); channel = connection.createChannel(); /** * 聲明一個隊列。 * 參數一:隊列名稱 * 參數二:是否持久化 * 參數三:是否排外 如果排外則這個隊列只允許有一個消費者 * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列 * 參數五:隊列的附加屬性 * 注意: * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列; * 2.隊列名可以任意取值,但需要與消息接收者一致。 * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。 */ channel.queueDeclare("myQueue", true, false, false,null); // 啟動事務,必須用txCommit()或者txRollback()回滾 channel.txSelect(); // 假設這里處理業務邏輯 String message = "hello,message!"; /** * 發送消息到MQ * 參數一:交換機名稱,為""表示不用交換機 * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey * 參數三:消息的屬性信息 * 參數四:消息內容的字節數組 */ channel.basicPublish("", "myQueue", null, message.getBytes()); // 提交事務 channel.txCommit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (channel != null) { // 回滾。如果未異常會提交事務,此時回滾無影響 channel.txRollback(); channel.close(); } if (connection != null) { connection.close(); } } catch (Exception ignore) { // ignore } } } }
測試可以注釋掉提交事務的代碼發現mq不會新增消息。
2.消費者事務測試
經測試,自動確認模式下。即使事務不提交,也會讀取到消息並從隊列移除。也就是暫時得出的結論是事務對消費者無效。
package rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class Consumer { public static ConnectionFactory getConnectionFactory() { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory; } public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = getConnectionFactory(); Connection newConnection = null; Channel createChannel = null; try { newConnection = connectionFactory.newConnection(); createChannel = newConnection.createChannel(); /** * 聲明一個隊列。 * 參數一:隊列名稱 * 參數二:是否持久化 * 參數三:是否排外 如果排外則這個隊列只允許有一個消費者 * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列 * 參數五:隊列的附加屬性 * 注意: * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列; * 2.隊列名可以任意取值,但需要與消息接收者一致。 * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。 */ createChannel.queueDeclare("myQueue", true, false, false,null); /** * 開啟事務 * 消費者開啟事務后,即使不提交也會獲取到消息並且從隊列刪除。 * 結論: * 事務對消費者沒有任何影響 */ createChannel.txSelect(); /** * 接收消息。會持續堅挺,不能關閉channel和Connection * 參數一:隊列名稱 * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息 * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。 * 參數四:消息接收者 */ createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String string = new String(body, "UTF-8"); System.out.println("接收到d消息: -》 " + string); } }); } catch (Exception e) { e.printStackTrace(); } finally { } } }
上面是自動確認模式的消費者,不受事務的影響。
如果是手動確認消息的消費者,在開啟事務下,必須手動commit,否則不會移除消息。
如下:手動確認模式+事務的用法
package rabbitmq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { public static ConnectionFactory getConnectionFactory() { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory; } public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = getConnectionFactory(); Connection newConnection = null; Channel createChannel = null; try { newConnection = connectionFactory.newConnection(); createChannel = newConnection.createChannel(); /** * 聲明一個隊列。 * 參數一:隊列名稱 * 參數二:是否持久化 * 參數三:是否排外 如果排外則這個隊列只允許有一個消費者 * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列 * 參數五:隊列的附加屬性 * 注意: * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列; * 2.隊列名可以任意取值,但需要與消息接收者一致。 * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。 */ createChannel.queueDeclare("myQueue", true, false, false,null); /** * 開啟事務 * 消費者開啟事務后,即使不提交也會獲取到消息並且從隊列刪除。 * 結論: * 如果是手動確認的消費者,開啟事物的情況下必須ack之后再commit,否則不會從隊列移除 */ createChannel.txSelect(); /** * 接收消息。會持續堅挺,不能關閉channel和Connection * 參數一:隊列名稱 * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息 * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。 * 參數四:消息接收者 */ createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // 該消息是否已經被處理過,true表示已經處理過,false表示沒有處理過 boolean redeliver = envelope.isRedeliver(); String string = new String(body, "UTF-8"); // 獲取消息的編號,根據編號確認消息 long deliveryTag = envelope.getDeliveryTag(); // 獲取當前內部類中的通道 Channel channel = this.getChannel(); System.out.println("處理消息成功, 消息: " + string + "\t redeliver: " + redeliver); // 手動確認 channel.basicAck(deliveryTag, true); // 提交事務 channel.txCommit(); } }); } catch (Exception e) { e.printStackTrace(); } finally { } } }
這里envelope.isRedeliver() 可以返回該消息是否已經被處理過。
2. 確認模式
Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的。最終確保所有的消息全部發送成功。confirm確認模式要比事務快。
Confirm的三種實現方式:
方式一:channel.waitForConfirms()普通發送方確認模式;
方式二:channel.waitForConfirmsOrDie()批量確認模式;
方式三:channel.addConfirmListener()異步監聽發送方確認模式;
1. 普通發送方確認模式
package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static Connection getConnection() throws Exception { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory.newConnection(); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { try { connection = getConnection(); } catch (Exception e) { // ignore } channel = connection.createChannel(); channel.queueDeclare("myQueue", true, false, false,null); // 啟動發送者確認模式 channel.confirmSelect(); String message = "hello,message! confirmSelect"; /** * 發送消息到MQ * 參數一:交換機名稱,為""表示不用交換機 * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey * 參數三:消息的屬性信息 * 參數四:消息內容的字節數組 */ channel.basicPublish("", "myQueue", null, message.getBytes()); // 阻塞線程,等待服務器返回響應。該方法可以指定一個等待時間,發送成功返回true,否則返回false if (channel.waitForConfirms()) { System.out.print("發送成功"); } else { // 返回false可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發 } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { // channel.waitForConfirms 可能返回超時異常 // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發 } finally { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception ignore) { // ignore } } } }
這里需要用confirmSelect() 開啟確認模式,然后channel.waitForConfirms() 阻塞等待發送。返回false或者拋出InterruptedException中斷異常都是發送失敗。可以進行補發,可以用重試機制或者先存到redis,隨后用定時任務發送。
2.批量確認模式
這種用於確認一大批消息模式。但是一旦消息集合有一個沒發送成功就會全部失敗,需要全部進行補發。
package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static Connection getConnection() throws Exception { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory.newConnection(); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { try { connection = getConnection(); } catch (Exception e) { // ignore } channel = connection.createChannel(); channel.queueDeclare("myQueue", true, false, false,null); // 啟動發送者確認模式 channel.confirmSelect(); String message = "hello,message! confirmSelect"; /** * 發送消息到MQ * 參數一:交換機名稱,為""表示不用交換機 * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey * 參數三:消息的屬性信息 * 參數四:消息內容的字節數組 */ channel.basicPublish("", "myQueue", null, message.getBytes()); channel.basicPublish("", "myQueue", null, message.getBytes()); channel.basicPublish("", "myQueue", null, message.getBytes()); channel.basicPublish("", "myQueue", null, message.getBytes()); try { // 阻塞線程,等待服務器返回響應。該方法可以指定一個等待時間。該方法無返回值,只能根據拋出的異常進行判斷。 channel.waitForConfirmsOrDie(); } catch (InterruptedException e) { // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發 } catch (IOException e) { // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發 } System.out.print("發送成功"); } catch (IOException e) { e.printStackTrace(); } finally { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception ignore) { // ignore } } } }
這種模式方法無返回值,只能根據異常進行判斷。如果確認失敗會拋出IOException和InterruptedException。源碼如下:
void waitForConfirmsOrDie() throws IOException, InterruptedException;
3.異步Confirm模式
異步模式的優點,就是執行效率高,不需要等待消息執行完,只需要監聽消息即可。需要注意的是不可以關閉channel和connection。
package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static Connection getConnection() throws Exception { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory.newConnection(); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { try { connection = getConnection(); } catch (Exception e) { // ignore } channel = connection.createChannel(); channel.queueDeclare("myQueue", true, false, false,null); // 啟動發送者確認模式 channel.confirmSelect(); /** * 發送消息到MQ * 參數一:交換機名稱,為""表示不用交換機 * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey * 參數三:消息的屬性信息 * 參數四:消息內容的字節數組 */ for (int i = 0; i< 500; i ++) { String message = "hello,message! confirmSelect " + i; channel.basicPublish("", "myQueue", null, message.getBytes()); } //異步監聽確認和未確認的消息 channel.addConfirmListener(new ConfirmListener() { /** * 消息沒有確認的回調方法 * 參數一:沒有確認的消息的編號 * 參數二: 是否沒有確認多個 */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("確認消息,序號:%d,是否多個消息:%b", deliveryTag, multiple)); } /** * 消息確認后回調 * 參數一: 確認的消息的編號,從1開始遞增 * 參數二: 當前消息是否同時確認了多個 */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("確認消息,序號:%d,是否多個消息:%b", deliveryTag, multiple)); } }); } catch (IOException e) { e.printStackTrace(); } finally { // try { // if (channel != null) { // channel.close(); // } // if (connection != null) { // connection.close(); // } // } catch (Exception ignore) { // ignore // } } } }
3.消費者確認模式
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(Message Acknowledgment)。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,rabbitMQ會等待消費者顯式發回ack信號后從內存(和磁盤,如果是持久化消息)中刪除消息。這里需要注意。如果一個消息設置了手動確認,就必須應答或者拒絕,否則會一直阻塞。
手動確認主要使用一些方法:
basicAck(long deliveryTag, boolean multiple):用於肯定確認,multiple參數用於確認多個消息。確認后從隊列刪除消息。
basicRecover(bool) :消息重回隊列。參數為true表示盡可能的將消息投遞給其他消費者消費,而不是自己再次消費。false則表示在睡眠5s后消息重新投遞給自己。
basicReject(long deliveryTag, boolean requeue):接收端告訴服務器這個消息我拒絕接受,可以設置是否回到隊列中還是丟棄。true則重新入隊列,該消費者還是會消費到該條被reject的消息。false表示丟棄或者進入死信隊列。
basicNack(long deliveryTag, boolean multiple, boolean requeue):可以一次拒絕N條消息,客戶端可以設置basicNack()的multiple參數為true。與basicReject()的區別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。
例如:
生產者:
package rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { public static Connection getConnection() throws Exception { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory.newConnection(); } public static void main(String[] args) { Connection connection = null; Channel channel = null; try { try { connection = getConnection(); } catch (Exception e) { // ignore } channel = connection.createChannel(); channel.queueDeclare("myQueue", true, false, false,null); // 啟動發送者確認模式 channel.confirmSelect(); /** * 發送消息到MQ * 參數一:交換機名稱,為""表示不用交換機 * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey * 參數三:消息的屬性信息 * 參數四:消息內容的字節數組 */ String message = "hello,message! confirmSelect 1"; channel.basicPublish("", "myQueue", null, message.getBytes()); //異步監聽確認和未確認的消息 channel.waitForConfirms(); } catch (Exception e) { // ignore } finally { try { if (channel != null) { channel.close(); } if (connection != null) { connection.close(); } } catch (Exception ignore) { // ignore } } } }
(1) 手動應答basicAck,源碼如下:
/** * Acknowledge one or several received * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being acknowledged. * @see com.rabbitmq.client.AMQP.Basic.Ack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to acknowledge all messages up to and * including the supplied delivery tag; false to acknowledge just * the supplied delivery tag. * @throws java.io.IOException if an error is encountered */ void basicAck(long deliveryTag, boolean multiple) throws IOException;
測試代碼:
package rabbitmq; import java.io.IOException; import java.util.Date; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer { public static ConnectionFactory getConnectionFactory() { // 創建連接工程,下面給出的是默認的case ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.99.100"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/"); return factory; } public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = getConnectionFactory(); Connection newConnection = null; Channel createChannel = null; try { newConnection = connectionFactory.newConnection(); createChannel = newConnection.createChannel(); /** * 聲明一個隊列。 * 參數一:隊列名稱 * 參數二:是否持久化 * 參數三:是否排外 如果排外則這個隊列只允許有一個消費者 * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列 * 參數五:隊列的附加屬性 * 注意: * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列; * 2.隊列名可以任意取值,但需要與消息接收者一致。 * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。 */ createChannel.queueDeclare("myQueue", true, false, false,null); /** * 接收消息。會持續堅挺,不能關閉channel和Connection * 參數一:隊列名稱 * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息 * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。 * 參數四:消息接收者 */ createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // 該消息是否已經被處理過,true表示已經處理過,false表示沒有處理過 boolean redeliver = envelope.isRedeliver(); String string = new String(body, "UTF-8"); // 獲取消息的編號,根據編號確認消息 long deliveryTag = envelope.getDeliveryTag(); // 獲取當前內部類中的通道 Channel channel = this.getChannel(); System.out.println((new Date()) + "\t處理消息成功, 消息: " + string + "\t redeliver: " + redeliver); // 手動確認 channel.basicAck(deliveryTag, true); } }); } catch (Exception e) { e.printStackTrace(); } } }
生成者生產一條消息后啟動消費者,控制台如下:
Fri Nov 06 22:45:31 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: false
從隊列查看發現消息變為0,也就是消息被刪除
(2)basicRecover(bool)重新回到隊列:true則重新入隊列,並且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己。
源碼如下:
/** * Ask the broker to resend unacknowledged messages. In 0-8 * basic.recover is asynchronous; in 0-9-1 it is synchronous, and * the new, deprecated method basic.recover_async is asynchronous. * @param requeue If true, messages will be requeued and possibly * delivered to a different consumer. If false, messages will be * redelivered to the same consumer. */ Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
測試
1》參數為true重新回到隊列:盡可能的推給其他消費者
channel.basicRecover(true);
結果:(會一直收到這條消息,並且不會刪除。當然代碼可以根據是否redeliver進行判斷)
Fri Nov 06 22:47:03 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: false Fri Nov 06 22:47:03 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:47:03 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:47:03 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true
2》 參數為false重新投遞給自己,只是會進行時間的延遲,推遲五秒后投遞。
Fri Nov 06 22:49:10 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:49:15 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:49:20 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:49:25 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 22:49:30 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true
(3)basicReject 測試
源碼如下:
/** * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method * containing the received message being rejected. * @see com.rabbitmq.client.AMQP.Basic.Reject * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicReject(long deliveryTag, boolean requeue) throws IOException;
1》測試第二個參數為true的情況重新回到隊列
channel.basicReject(deliveryTag, true);
結果:
Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: false Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true Fri Nov 06 23:02:26 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true 。。。
2》測試第二個參數為false的情況丟棄
channel.basicReject(deliveryTag, false);
結果:(發現消息被丟棄的同時從隊列刪除)
Fri Nov 06 23:03:39 CST 2020 處理消息成功, 消息: hello,message! confirmSelect 1 redeliver: true
(4) basicNack 測試
源碼如下:
/** * Reject one or several received messages. * * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected. * @see com.rabbitmq.client.AMQP.Basic.Nack * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} * @param multiple true to reject all messages up to and including * the supplied delivery tag; false to reject just the supplied * delivery tag. * @param requeue true if the rejected message(s) should be requeued rather * than discarded/dead-lettered * @throws java.io.IOException if an error is encountered */ void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
測試效果和上面basicReject一樣。