RabbitMQ 消息確認機制



生產端 Confirm 消息確認機制

消息的確認,是指生產者投遞消息后,如果 Broker 收到消息,則會給我們生產者一個應答。生產者進行接收應答,用來確定這條消息是否正常的發送到 Broker ,這種方式也是消息的可靠性投遞的核心保障!

Confirm 確認機制流程圖

如何實現Confirm確認消息?

  • 第一步:在 channel 上開啟確認模式: channel.confirmSelect()
  • 第二步:在 channel 上添加監聽: channel.addConfirmListener(ConfirmListener listener);, 監聽成功和失敗的返回結果,根據具體的結果對消息進行重新發送、或記錄日志等后續處理!
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class ConfirmProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routingKey = "item.update";

        //指定消息的投遞模式:confirm 確認模式
        channel.confirmSelect();

        //發送
        final long start = System.currentTimeMillis();
        for (int i = 0; i < 5 ; i++) {
            String msg = "this is confirm msg ";
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
            System.out.println("Send message : " + msg);
        }

        //添加一個確認監聽, 這里就不關閉連接了,為了能保證能收到監聽消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功的回調函數
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
                System.out.println(multiple);
                System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms");
            }
            /**
             * 返回失敗的回調函數
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.printf("defeat ack");
                System.out.println("耗時:" + (System.currentTimeMillis() - start) + "ms");
            }
        });
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class ConfirmConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
      
        String exchangeName = "test_confirm_exchange";
        String queueName = "test_confirm_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代碼綁定,在管理界面手動綁定
        channel.queueBind(queueName, exchangeName, routingKey);

        //創建消費者並接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        //設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, true, consumer);

    }
}

我們此處只關注生產端輸出消息

Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
Send message : this is confirm msg 
succuss ack
true
耗時:3ms
succuss ack
true
耗時:4ms

注意事項

  • 我們采用的是異步 confirm 模式:提供一個回調方法,服務端 confirm 了一條或者多條消息后 Client 端會回調這個方法。除此之外還有單條同步 confirm 模式、批量同步 confirm 模式,由於現實場景中很少使用我們在此不做介紹,如有興趣直接參考官方文檔。

  • 我們運行生產端會發現每次運行結果都不一樣,會有多種情況出現,因為 Broker 會進行優化,有時會批量一次性 confirm ,有時會分開幾條 confirm。

  succuss ack
  true
  耗時:3ms
  succuss ack
  false
  耗時:4ms
  
  或者
  succuss ack
  true
  耗時:3ms

Return 消息機制

  • Return Listener 用於處理一-些不可路 由的消息!

  • 消息生產者,通過指定一個 ExchangeRoutingkey,把消息送達到某一個隊列中去,然后我們的消費者監聽隊列,進行消費處理操作!

  • 但是在某些情況下,如果我們在發送消息的時候,當前的 exchange 不存在或者指定的路由 key 路由不到,這個時候如果我們需要監聽這種不可達的消息,就要使用 Return Listener !

  • 在基礎API中有一個關鍵的配置項:Mandatory:如果為 true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為 false,那么 broker 端自動刪除該消息!

Return 消息機制流程圖

Return 消息示例

  • 首先我們需要發送三條消息,並且故意將第 0 條消息的 routing Key設置為錯誤的,讓他無法正常路由到消費端。

  • mandatory 設置為 true 路由不可達的消息會被監聽到,不會被自動刪除.即channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());

  • 最后添加監聽即可監聽到不可路由到消費端的消息channel.addReturnListener(ReturnListener r))

import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
      
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_return_exchange";
        String routingKey = "item.update";
        String errRoutingKey = "error.update";

        //指定消息的投遞模式:confirm 確認模式
        channel.confirmSelect();

        //發送
        for (int i = 0; i < 3 ; i++) {
            String msg = "this is return——listening msg ";
            //@param mandatory 設置為 true 路由不可達的消息會被監聽到,不會被自動刪除
            if (i == 0) {
                channel.basicPublish(exchangeName, errRoutingKey, true,null, msg.getBytes());
            } else {
                channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
            }
            System.out.println("Send message : " + msg);
        }

        //添加一個確認監聽, 這里就不關閉連接了,為了能保證能收到監聽消息
        channel.addConfirmListener(new ConfirmListener() {
            /**
             * 返回成功的回調函數
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("succuss ack");
            }
            /**
             * 返回失敗的回調函數
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.printf("defeat ack");
            }
        });

        //添加一個 return 監聽
        channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("return relyCode: " + replyCode);
                System.out.println("return replyText: " + replyText);
                System.out.println("return exchange: " + exchange);
                System.out.println("return routingKey: " + routingKey);
                System.out.println("return properties: " + properties);
                System.out.println("return body: " + new String(body));
            }
        });

    }
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class ReturnListeningConsumer {
    public static void main(String[] args) throws Exception {
        //1. 創建一個 ConnectionFactory 並進行設置
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        //2. 通過連接工廠來創建連接
        Connection connection = factory.newConnection();

        //3. 通過 Connection 來創建 Channel
        Channel channel = connection.createChannel();

        //4. 聲明
        String exchangeName = "test_return_exchange";
        String queueName = "test_return_queue";
        String routingKey = "item.#";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代碼綁定,在管理界面手動綁定
        channel.queueBind(queueName, exchangeName, routingKey);

        //5. 創建消費者並接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };

        //6. 設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, true, consumer);
    }
}

我們只關注生產端結果,消費端只收到兩條消息。

Send message : this is return——listening msg 
Send message : this is return——listening msg 
Send message : this is return——listening msg 
return relyCode: 312
return replyText: NO_ROUTE
return exchange: test_return_exchange
return routingKey: error.update
return properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
return body: this is return——listening msg 
succuss ack
succuss ack
succuss ack

消費端 Ack 和 Nack 機制

消費端進行消費的時候,如果由於業務異常我們可以進行日志的記錄,然后進行補償!如果由於服務器宕機等嚴重問題,那我們就需要手工進行ACK保障消費端消費成功!消費端重回隊列是為了對沒有處理成功的消息,把消息重新會遞給Broker!一般我們在實際應用中,都會關閉重回隊列,也就是設置為False。

參考 api

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicAck(long deliveryTag, boolean multiple) throws IOException;

如何設置手動 Ack 、Nack 以及重回隊列

  • 首先我們發送五條消息,將每條消息對應的循環下標 i 放入消息的 properties 中作為標記,以便於我們在后面的回調方法中識別。

  • 其次, 我們將消費端的 ·channel.basicConsume(queueName, false, consumer); 中的 autoAck屬性設置為 false,如果設置為true的話 將會正常輸出五條消息。

  • 我們通過 Thread.sleep(2000)來延時一秒,用以看清結果。我們獲取到properties中的num之后,通過channel.basicNack(envelope.getDeliveryTag(), false, true);num為0的消息設置為 nack,即消費失敗,並且將 requeue屬性設置為true,即消費失敗的消息重回隊列末端。

import com.rabbitmq.client.*;
import java.util.HashMap;
import java.util.Map;

public class AckAndNackProducer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String routingKey = "item.update";

        String msg = "this is ack msg";
        for (int i = 0; i < 5; i++) {
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num" ,i);

            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .headers(headers)
                    .build();

            String tem = msg + ":" + i;

            channel.basicPublish(exchangeName, routingKey, true, properties, tem.getBytes());
            System.out.println("Send message : " + msg);
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;

public class AckAndNackConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(3000);

        Connection connection = factory.newConnection();

        final Channel channel = connection.createChannel();

        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "item.#";
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);

        //一般不用代碼綁定,在管理界面手動綁定
        channel.queueBind(queueName, exchangeName, routingKey);

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {

                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if ((Integer) properties.getHeaders().get("num") == 0) {
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        //6. 設置 Channel 消費者綁定隊列
        channel.basicConsume(queueName, false, consumer);

    }
}

我們此處只關心消費端輸出,可以看到第 0 條消費失敗重新回到隊列尾部消費。

 [x] Received 'this is ack msg:1'
 [x] Received 'this is ack msg:2'
 [x] Received 'this is ack msg:3'
 [x] Received 'this is ack msg:4'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'
 [x] Received 'this is ack msg:0'


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM