RabbitMQ事務性消息和確認模式


事務消息與數據庫的事務類似,只是MQ的消息是要保證消息是否會全部發送成功,防止消息丟失的一種策略。

RabbitMQ有兩種策略來解決這個問題:

1.通過AMQP的事務機制實現

2.使用發送者確認模式實現

1.事務

事務的實現主要是對信道(Channel)的設置,主要方法如下:

1. channel.txSelect()  聲明啟動事務模式

2.channel.txCommit() 提交事務

3.channel.txRollback()回滾事務

1.事務性消息發送

開啟事務之后必須手動channel.txCommit();提交或者channel.txRollback();回滾。

package rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            connection = getConnection();
            channel = connection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 啟動事務,必須用txCommit()或者txRollback()回滾
            channel.txSelect();

            // 假設這里處理業務邏輯
            String message = "hello,message!";
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            // 提交事務
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    // 回滾。如果未異常會提交事務,此時回滾無影響
                    channel.txRollback();
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  測試可以注釋掉提交事務的代碼發現mq不會新增消息。

2.消費者事務測試

  經測試,自動確認模式下。即使事務不提交,也會讀取到消息並從隊列移除。也就是暫時得出的結論是事務對消費者無效。

package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 開啟事務
             * 消費者開啟事務后,即使不提交也會獲取到消息並且從隊列刪除。
             * 結論:
             *     事務對消費者沒有任何影響
             */
            createChannel.txSelect();
            
            /**
             * 接收消息。會持續堅挺,不能關閉channel和Connection
             * 參數一:隊列名稱
             * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息
             * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。
             * 參數四:消息接收者
             */
            createChannel.basicConsume("myQueue", true, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    String string = new String(body, "UTF-8");
                    System.out.println("接收到d消息: -》 " + string);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}

  上面是自動確認模式的消費者,不受事務的影響。

如果是手動確認消息的消費者,在開啟事務下,必須手動commit,否則不會移除消息。

如下:手動確認模式+事務的用法

package rabbitmq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 開啟事務
             * 消費者開啟事務后,即使不提交也會獲取到消息並且從隊列刪除。
             * 結論:
             *     如果是手動確認的消費者,開啟事物的情況下必須ack之后再commit,否則不會從隊列移除
             */
            createChannel.txSelect();
            
            /**
             * 接收消息。會持續堅挺,不能關閉channel和Connection
             * 參數一:隊列名稱
             * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息
             * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。
             * 參數四:消息接收者
             */
            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    // 該消息是否已經被處理過,true表示已經處理過,false表示沒有處理過
                    boolean redeliver = envelope.isRedeliver();
                    
                    String string = new String(body, "UTF-8");
                    // 獲取消息的編號,根據編號確認消息
                    long deliveryTag = envelope.getDeliveryTag();
                    // 獲取當前內部類中的通道
                    Channel channel = this.getChannel();
                    System.out.println("處理消息成功, 消息: " + string + "\t redeliver: " + redeliver);
                    
                    // 手動確認
                    channel.basicAck(deliveryTag, true);
                    
                    // 提交事務
                    channel.txCommit();
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
        
    }
}

這里envelope.isRedeliver() 可以返回該消息是否已經被處理過。

2. 確認模式

Confirm發送方確認模式使用和事務類似,也是通過設置Channel進行發送方確認的。最終確保所有的消息全部發送成功。confirm確認模式要比事務快。

Confirm的三種實現方式:

方式一:channel.waitForConfirms()普通發送方確認模式;

方式二:channel.waitForConfirmsOrDie()批量確認模式;

方式三:channel.addConfirmListener()異步監聽發送方確認模式;

1. 普通發送方確認模式

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 啟動發送者確認模式
            channel.confirmSelect();
    
            String message = "hello,message! confirmSelect";
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            // 阻塞線程,等待服務器返回響應。該方法可以指定一個等待時間,發送成功返回true,否則返回false
            if (channel.waitForConfirms()) {
                System.out.print("發送成功");
            } else {
                // 返回false可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            // channel.waitForConfirms 可能返回超時異常
            // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  這里需要用confirmSelect() 開啟確認模式,然后channel.waitForConfirms() 阻塞等待發送。返回false或者拋出InterruptedException中斷異常都是發送失敗。可以進行補發,可以用重試機制或者先存到redis,隨后用定時任務發送。

2.批量確認模式

  這種用於確認一大批消息模式。但是一旦消息集合有一個沒發送成功就會全部失敗,需要全部進行補發。

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 啟動發送者確認模式
            channel.confirmSelect();
    
            String message = "hello,message! confirmSelect";
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            try {
                // 阻塞線程,等待服務器返回響應。該方法可以指定一個等待時間。該方法無返回值,只能根據拋出的異常進行判斷。
                channel.waitForConfirmsOrDie();
            } catch (InterruptedException e) {
                // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發
            } catch (IOException e) {
                // 可以進行補發。重試幾次發送或者利用redis+定時任務來完成補發                
            }
            System.out.print("發送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                // ignore
            }
        }
    }
}

  這種模式方法無返回值,只能根據異常進行判斷。如果確認失敗會拋出IOException和InterruptedException。源碼如下:

void waitForConfirmsOrDie() throws IOException, InterruptedException;

3.異步Confirm模式

  異步模式的優點,就是執行效率高,不需要等待消息執行完,只需要監聽消息即可。需要注意的是不可以關閉channel和connection。

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 啟動發送者確認模式
            channel.confirmSelect();
            
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            for (int i = 0; i< 500; i ++) {
                String message = "hello,message! confirmSelect " + i;
                channel.basicPublish("", "myQueue", null, message.getBytes());
            }
            
            //異步監聽確認和未確認的消息
            channel.addConfirmListener(new ConfirmListener() {
                /**
                 * 消息沒有確認的回調方法
                 * 參數一:沒有確認的消息的編號
                 * 參數二: 是否沒有確認多個
                 */
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("確認消息,序號:%d,是否多個消息:%b", deliveryTag, multiple));
                }
                
                /**
                 * 消息確認后回調
                 * 參數一: 確認的消息的編號,從1開始遞增
                 * 參數二: 當前消息是否同時確認了多個
                 */
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println(String.format("確認消息,序號:%d,是否多個消息:%b", deliveryTag, multiple));
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
//            try {
//                if (channel != null) {
//                    channel.close();
//                }
//                if (connection != null) {
//                    connection.close();
//                }
//            } catch (Exception ignore) {
                // ignore
//            }
        }
    }
}

3.消費者確認模式

  為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(Message Acknowledgment)。消費者在聲明隊列時,可以指定noAck參數,當noAck=false時,rabbitMQ會等待消費者顯式發回ack信號后從內存(和磁盤,如果是持久化消息)中刪除消息。這里需要注意。如果一個消息設置了手動確認,就必須應答或者拒絕,否則會一直阻塞。

手動確認主要使用一些方法:

basicAck(long deliveryTag, boolean multiple):用於肯定確認,multiple參數用於確認多個消息。確認后從隊列刪除消息。

basicRecover(bool) :消息重回隊列。參數為true表示盡可能的將消息投遞給其他消費者消費,而不是自己再次消費。false則表示在睡眠5s后消息重新投遞給自己。

basicReject(long deliveryTag, boolean requeue):接收端告訴服務器這個消息我拒絕接受,可以設置是否回到隊列中還是丟棄。true則重新入隊列,該消費者還是會消費到該條被reject的消息。false表示丟棄或者進入死信隊列。

basicNack(long deliveryTag, boolean multiple, boolean requeue):可以一次拒絕N條消息,客戶端可以設置basicNack()的multiple參數為true。與basicReject()的區別就是同時支持多個消息,可以nack該消費者先前接收未ack的所有消息。nack后的消息也會被自己消費到。

例如:

生產者:

package rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    
    public static Connection getConnection() throws Exception {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory.newConnection();
    }

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            try {
                connection = getConnection();
            } catch (Exception e) {
                // ignore
            }
            channel = connection.createChannel();
            channel.queueDeclare("myQueue", true, false, false,null);
            
            // 啟動發送者確認模式
            channel.confirmSelect();
            
            /**
             * 發送消息到MQ
             * 參數一:交換機名稱,為""表示不用交換機
             * 參數二:為隊列名稱或者routingKey.當指定了交換機就是routingKey
             * 參數三:消息的屬性信息
             * 參數四:消息內容的字節數組
             */
            String message = "hello,message! confirmSelect 1";
            channel.basicPublish("", "myQueue", null, message.getBytes());
            
            //異步監聽確認和未確認的消息
            channel.waitForConfirms();
        } catch (Exception e) {
            // ignore
        } finally {
            try {
                if (channel != null) {
                    channel.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (Exception ignore) {
                 // ignore
            }
        }
    }
}

(1) 手動應答basicAck,源碼如下:

    /**
     * Acknowledge one or several received
     * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
     * containing the received message being acknowledged.
     * @see com.rabbitmq.client.AMQP.Basic.Ack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to acknowledge all messages up to and
     * including the supplied delivery tag; false to acknowledge just
     * the supplied delivery tag.
     * @throws java.io.IOException if an error is encountered
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

測試代碼:

package rabbitmq;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {

    public static ConnectionFactory getConnectionFactory() {
        // 創建連接工程,下面給出的是默認的case
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.99.100");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        return factory;
    }

    public static void main(String[] args) throws IOException, TimeoutException  {
        ConnectionFactory connectionFactory = getConnectionFactory();
        Connection newConnection = null;
        Channel createChannel = null;
        try {
            newConnection = connectionFactory.newConnection();
            createChannel = newConnection.createChannel();
            /**
             * 聲明一個隊列。
             * 參數一:隊列名稱
             * 參數二:是否持久化
             * 參數三:是否排外  如果排外則這個隊列只允許有一個消費者
             * 參數四:是否自動刪除隊列,如果為true表示沒有消息也沒有消費者連接自動刪除隊列
             * 參數五:隊列的附加屬性
             * 注意:
             * 1.聲明隊列時,如果已經存在則放棄聲明,如果不存在則會聲明一個新隊列;
             * 2.隊列名可以任意取值,但需要與消息接收者一致。
             * 3.下面的代碼可有可無,一定在發送消息前確認隊列名稱已經存在RabbitMQ中,否則消息會發送失敗。
             */
            createChannel.queueDeclare("myQueue", true, false, false,null);
            
            /**
             * 接收消息。會持續堅挺,不能關閉channel和Connection
             * 參數一:隊列名稱
             * 參數二:消息是否自動確認,true表示自動確認接收完消息以后會自動將消息從隊列移除。否則需要手動ack消息
             * 參數三:消息接收者的標簽,用於多個消費者同時監聽一個隊列時用於確認不同消費者。
             * 參數四:消息接收者
             */
            createChannel.basicConsume("myQueue", false, "", new DefaultConsumer(createChannel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {

                    // 該消息是否已經被處理過,true表示已經處理過,false表示沒有處理過
                    boolean redeliver = envelope.isRedeliver();
                    
                    String string = new String(body, "UTF-8");
                    // 獲取消息的編號,根據編號確認消息
                    long deliveryTag = envelope.getDeliveryTag();
                    // 獲取當前內部類中的通道
                    Channel channel = this.getChannel();
                    System.out.println((new Date()) + "\t處理消息成功, 消息: " + string + "\t redeliver: " + redeliver);
                    
                    // 手動確認
                    channel.basicAck(deliveryTag, true);
                }
            });
            
        } catch (Exception e) {
            e.printStackTrace();
        } 
    }
}

生成者生產一條消息后啟動消費者,控制台如下:

Fri Nov 06 22:45:31 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false

從隊列查看發現消息變為0,也就是消息被刪除

(2)basicRecover(bool)重新回到隊列:true則重新入隊列,並且盡可能的將之前recover的消息投遞給其他消費者消費,而不是自己再次消費。false則消息會重新被投遞給自己。

源碼如下:

    /**
     * Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * @param requeue If true, messages will be requeued and possibly
     * delivered to a different consumer. If false, messages will be
     * redelivered to the same consumer.
     */
    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

測試

1》參數為true重新回到隊列:盡可能的推給其他消費者

channel.basicRecover(true);

結果:(會一直收到這條消息,並且不會刪除。當然代碼可以根據是否redeliver進行判斷)

Fri Nov 06 22:47:03 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false
Fri Nov 06 22:47:03 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:47:03 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:47:03 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

2》 參數為false重新投遞給自己,只是會進行時間的延遲,推遲五秒后投遞。

Fri Nov 06 22:49:10 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:49:15 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:49:20 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:49:25 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 22:49:30 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

(3)basicReject 測試

源碼如下:

    /**
     * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
     * containing the received message being rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Reject
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

1》測試第二個參數為true的情況重新回到隊列

channel.basicReject(deliveryTag, true);

結果:

Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: false
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
Fri Nov 06 23:02:26 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true
。。。

2》測試第二個參數為false的情況丟棄

                    channel.basicReject(deliveryTag, false);

結果:(發現消息被丟棄的同時從隊列刪除)

Fri Nov 06 23:03:39 CST 2020    處理消息成功, 消息: hello,message! confirmSelect 1     redeliver: true

(4) basicNack 測試

源碼如下:

    /**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

  測試效果和上面basicReject一樣。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM