RabbitMQ:Confirm確認消息 Return返回消息


1.Confirm消息確認機制

消息的確認:是指生產者投遞消息后,如果Broker收到消息,則會給生產者一個應答。

生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障。

生產端

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建一個連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.132");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //通過連接創建一個Channel
        Channel channel = connection.createChannel();
        //指定消息的投遞模式:消息的確認模式
        channel.confirmSelect();

        //通過Channel發送數據
        channel.basicPublish("","hello",null,"hello world".getBytes());
        //添加一個確認監聽
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----handleAck---");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("----handleNack---");
            }
        });
    }

消費端:

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //創建一個連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.132");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //通過連接創建一個Channel
        Channel channel = connection.createChannel();
        //創建一個隊列
        String queueName = "hello";
        channel.queueDeclare(queueName,true,false,false,null);
        //創建一個消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //設置Channel
        channel.basicConsume(queueName,true,consumer);
        //獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("消費端:"+msg);
        }
    }

 運行結果:

消費端:

 

 生產端:

2.Return返回消息機制

某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,這時候如果我們需要監聽這種不可達的消息,就需要使用Return Listener

在API中有個一重要配置項:

Mandatory:如果為true,則監聽器會接收到路由不可達的消息,然后進行后續處理,如果為false,則broker端自動刪除該消息。

Return消息機制流程:

 消費端跟上文一樣,

生產端:

    public static void main(String[] args) throws IOException, TimeoutException {
        //創建一個連接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.10.132");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        //創建連接
        Connection connection = connectionFactory.newConnection();
        //通過連接創建一個Channel
        Channel channel = connection.createChannel();


        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange,
                                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
                System.out.println(properties);
                System.out.println(Arrays.toString(body));
            }
        });

        //通過Channel發送數據
        // 在這里要設置Mandatory(第三個參數)為true,否則broker會自動刪除消息
        channel.basicPublish("","return",true,null,"hello world".getBytes());
    }  

 打印結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM