RabbitMQ消息確認機制


RabbitMQ消息確認的本質也就是為了解決RabbitMQ消息丟失問題,因為哪怕我們做了RabbitMQ持久化,其實也並不能保證解決我們的消息丟失問題


RabbitMQ的消息確認有兩種

  • 第一種是消息發送確認。這種是用來確認生產者將消息發送給交換器,交換器傳遞給隊列的過程中,消息是否成功投遞。發送確認分為兩步,一是確認是否到達交換器,二是確認是否到達隊列。
  • 第二種是消費接收確認。這種是確認消費者是否成功消費了隊列中的消息。

1.消息發送確認(生產者)

正常情況下,生產者會通過交換機發送消息至隊列中,再由消費者來進行消費,但是其實RabbitMQ在接收到消息后,還需要一段時間消息才能存入磁盤,並且其實也不是每條消息都會存入磁盤,可能僅僅只保存到cache中,這時,如果RabbitMQ正巧發生崩潰,消息則就會丟失,所以為了避免該情況的發生,我們引入了生產者確認機制,rabbitmq對此提供了兩種方式:

  • 通過事務實現
  • 通過發送方確認機制(publisher confirm)實現

事務實現

  • channel.txSelect(): 將當前信道設置成事務模式
  • channel.txCommit(): 用於提交事務
  • channel.txRollback(): 用於回滾事務

通過事務實現機制,只有消息成功被rabbitmq服務器接收,事務才能提交成功,否則便可在捕獲異常之后進行回滾,然后進行消息重發,但是事務非常影響rabbitmq的性能。還有就是事務機制是阻塞的過程,只有等待服務器回應之后才會處理下一條消息

/**
 * 創建生產者
 */
public class Send {public static void main(String[] args) throws IOException, TimeoutException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "你好";

        //准備交換機(已創建的交換機)
        String exchangeName = "direct-exchange";

        //准備路由
        String routekey = "email";

        try{
            //將信道設置為事務模式
            channel.txSelect();
            //發送消息給交換機
            /**
             * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
             * 參數2:routekey
             * 參數3:消息的狀態控制
             * 參數4:消息內容
             */
            //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
            channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
            //事務提交
            channel.txCommit();
            System.out.print("發送成功");
        } catch (Exception e){          
            //如果消息發送給交換機的過程出現異常,則捕捉並進行回滾
            channel.txRollback();
            System.out.print("發送失敗並回滾");
        }
        
        //關閉通道
        channel.close();
        connection.close();
    }
}

confirm實現

confirm方式有三種模式:普通confirm模式、批量confirm模式、異步confirm模式

  • channel.confirmSelect()將當前信道設置成了confirm模式

普通confirm模式

每發送一條消息,就調用waitForConfirms()方法,等待服務端返回Ack或者nack消息

/**
 * 創建生產者
 */
public class Send {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "你好";

        //准備交換機(已創建的交換機)
        String exchangeName = "direct-exchange";

        //准備路由
        String routekey = "email";

        //將當前信道設置成confirm模式
        channel.confirmSelect();
        for(int i = 0;i<20;i++){
            //發送消息給交換機
            /**
             * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
             * 參數2:routekey
             * 參數3:消息的狀態控制
             * 參數4:消息內容
             */
            //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
            channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
            //信道為confirm模式后,即可通過waitForConfirms()接收服務端返回來的信息
            if(channel.waitForConfirms()){
                System.out.print("發送成功");
            }
        }

        final long start = System.currentTimeMillis();
        System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms");

        //關閉通道
        channel.close();
        connection.close();
    }
}

批量confirm模式

每發送一批消息,就調用waitForConfirmsOrDie()方法,該方法會等到最后一條消息得到ack或者得到nack才會結束,也就是說在waitForConfirmsOrDie處才會造成程序的阻塞

/**
 * 創建生產者
 */
public class Send {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "你好";

        //准備交換機(已創建的交換機)
        String exchangeName = "direct-exchange";

        //准備路由
        String routekey = "email";
        //將當前信道設置成confirm模式
        channel.confirmSelect();
        for(int i = 0;i<20;i++){
            //發送消息給交換機
            /**
             * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
             * 參數2:routekey
             * 參數3:消息的狀態控制
             * 參數4:消息內容
             */
            //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
            channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
        }

        final long start = System.currentTimeMillis();
        //消息批量發送完成后,通過waitForConfirmsOrDie()方法來接收服務端返回的信息
        channel.waitForConfirmsOrDie();
        System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms");

        //關閉通道
        channel.close();
        connection.close();
    }
}

異步confirm模式

通過channel,addConfirmListener()監聽發送方確認模式,通過信道中的waitForConfirmsOrDie等待傳回ack或者nack

/**
 * 創建生產者
 */
public class Send {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //從MQ工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //創建一個通道
        Channel channel = connection.createChannel();

        //准備發送的消息內容
        String msg = "你好";

        //准備交換機(已創建的交換機)
        String exchangeName = "fanout-exchanges";
        //准備路由
        String routekey = "";
        //將當前信道設置成confirm模式
        channel.confirmSelect();
        for(int i = 0;i<100;i++){
            msg= i + "chen";
            //發送消息給交換機
            /**
             * 參數1:交換機,不定義也會有默認的,因為我們的消息是通過交換機來進行投遞給隊列的,所以交換機不可能沒有
             * 參數2:routekey
             * 參數3:消息的狀態控制
             * 參數4:消息內容
             */
            //該模式因為是由交換機發給該交換機綁定的所有隊列,所以可以不標明隊列名稱
            channel.basicPublish(exchangeName,routekey,null,msg.getBytes());
        }
        final long start = System.currentTimeMillis();
        //通過addConfirmListener來監聽信道
        channel.addConfirmListener(new ConfirmListener() {
            //消息發送成功
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("以確認消息:"+ deliveryTag + " 多個消息:" + multiple);
            }
            //消息發送失敗
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("no ack");
            }
        });
        System.out.println("執行waitForConfirmsOrDie耗費時間"+(System.currentTimeMillis()-start)+"ms");

        //關閉通道
        channel.close();
        connection.close();
    }
}

2.消息接收確認(消費者)

消息接收確認機制,分為消息自動確認模式和消息手動確認模式,當消息確認后,我們隊列中的消息將會移除

那這兩種模式要如何選擇呢?

  • 如果消息不太重要,丟失也沒有影響,那么自動ACK會比較方便。好處就是可以提高吞吐量,缺點就是會丟失消息
  • 如果消息非常重要,不容丟失,則建議手動ACK,正常情況都是更建議使用手動ACK。雖然可以解決消息不會丟失的問題,但是可能會造成消費者過載

消息自動確認模式的實現

注:自動確認模式,消費者不會判斷消費者是否成功接收到消息,也就是當我們程序代碼有問題,我們的消息都會被自動確認,消息被自動確認了,我們隊列就會移除該消息,這就會造成我們的消息丟失

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該隊列,true代表自動確認
        channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
            }
        });
    }
}

實現效果,消費者會將我們隊列中的消息全部接收然后確認,並移除隊列

消息手動確認模式的實現

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該隊列,false代表手動確認
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的消息:"+ new String(body,"UTF-8"));
            }
        });
    }
}

手動確認模式下,當我們消費者成功接收到消息后,在隊列中消息會進入Unacked項,也就是待確認模式

所以我們還需要加上下列代碼,來實現消息者在成功接收到消息后,手動確認

#添加紅色字段

/**
 * 消費者
 */
public class Recv {
    //設定隊列名稱(已存在的隊列)
    private static final String QUEUE_NAME = "queue1";
    public static void main(String[] args) throws IOException, TimeoutException {
        //從mq工具類獲取連接信息
        Connection connection = MqConnectionUtils.getConnection();
        //獲取一個通道
        Channel channel = connection.createChannel();
        //監聽該隊列
        channel.basicConsume(QUEUE_NAME,false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] body) throws IOException{
                System.out.println("接收到的消息:"+ new String(body,"UTF-8"));

                //獲取消息的編號,我們需要根據消息的編號來確認消息 long tag = envelope.getDeliveryTag(); //獲取當前內部類中的通道 Channel c = this.getChannel(); //手動確認消息,確認以后,則表示消息已經成功處理,消息就會從隊列中移除,false代表只確認當前一個消息,true確認所有consumer獲得的消息 c.basicAck(tag,false);           
} }); } }

此時,我們的消息才會成功被確認,並移除隊列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM