RabbitMQ中Confirm確認與Return返回消息詳解(八)


理解Confirm消息確認機制:

  消息的確認,是指生產者投遞消息后,如果Broker收到消息,則會給我們生產這一個應答。

  生產者進行接收應答,用來確定這條消息是否正常的發送到Broker,這種方式也是消息的可靠性投遞的核心保障。

   

  如何實現Confirm確認消息?

    第一步:在channel上開啟確認模式:channel.confirmSelect()

    第二步:在channel上添加監聽:addConfirmListener,監聽成功和失敗的返回結果,

        根據具體的結果對消息進行重新發送、或者記錄日志等后續處理。

 1      //生產端代碼   
 2      //1 創建ConnectionFactory
 3         ConnectionFactory connectionFactory = new ConnectionFactory();
 4         connectionFactory.setHost("127.0.0.1");
 5         connectionFactory.setPort(5672);
 6         connectionFactory.setVirtualHost("/");
 7         
 8         //2 獲取C    onnection
 9         Connection connection = connectionFactory.newConnection();
10         
11         //3 通過Connection創建一個新的Channel
12         Channel channel = connection.createChannel();
13         
14         
15         //4 指定我們的消息投遞模式: 消息的確認模式 
16         channel.confirmSelect();
17         
18         String exchangeName = "test_confirm_exchange";
19         String routingKey = "confirm.save";
20         
21         //5 發送一條消息
22         String msg = "Hello RabbitMQ Send confirm message!";
23         channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
24         
25         //6 添加一個確認監聽
26         channel.addConfirmListener(new ConfirmListener() {
27             @Override
28             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
29                 System.err.println("-------no ack!-----------");
30             }
31             
32             @Override
33             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
34                 System.err.println("-------ack!-----------");
35             }
36         });    

 

 1     //消費端代碼
 2     //1 創建ConnectionFactory
 3         ConnectionFactory connectionFactory = new ConnectionFactory();
 4         connectionFactory.setHost("127.0.0.1");
 5         connectionFactory.setPort(5672);
 6         connectionFactory.setVirtualHost("/");
 7         
 8         //2 獲取C    onnection
 9         Connection connection = connectionFactory.newConnection();
10         
11         //3 通過Connection創建一個新的Channel
12         Channel channel = connection.createChannel();
13         
14         String exchangeName = "test_confirm_exchange";
15         String routingKey = "confirm.#";
16         String queueName = "test_confirm_queue";
17         
18         //4 聲明交換機和隊列 然后進行綁定設置, 最后制定路由Key
19         channel.exchangeDeclare(exchangeName, "topic", true);
20         channel.queueDeclare(queueName, true, false, false, null);
21         channel.queueBind(queueName, exchangeName, routingKey);
22         
23         //5 創建消費者 
24         QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
25         channel.basicConsume(queueName, true, queueingConsumer);
26         
27         while(true){
28             Delivery delivery = queueingConsumer.nextDelivery();
29             String msg = new String(delivery.getBody());
30             
31             System.err.println("消費端: " + msg);
32         }

 Return消息機制

  Return Listener用於處理一些不可路由的消息。

  我們的消息生產者,通過指定一個Exchange和Routingkey,把消息送到某一個隊列中,

  然后我們的消費者監聽隊列,進行消息處理操作。

  但是在某些情況下,如果我們在發送消息的時候,當前的exchange不存在或者指定的路由key路由不到,

  這個時候我們需要監聽這種不可達的消息,就要使用return listener。

  

  在基礎API中有一個關鍵的配置項:

    Mandatory:如果為true,則監聽會接收到路由不可達的消息,然后進行后續處理,

          如果為false,那么broker端自動刪除該消息。(默認false)

          

 1                 //生產端代碼   
 2              ConnectionFactory connectionFactory = new ConnectionFactory();
 3         connectionFactory.setHost("127.0.0.1");
 4         connectionFactory.setPort(5672);
 5         connectionFactory.setVirtualHost("/");
 6         
 7         Connection connection = connectionFactory.newConnection();
 8         Channel channel = connection.createChannel();
 9         
10         String exchange = "test_return_exchange";
11         String routingKey = "return.save";
12         String routingKeyError = "abc.save";
13         
14         String msg = "Hello RabbitMQ Return Message";
15         
16         
17         channel.addReturnListener(new ReturnListener() {
18             @Override
19             public void handleReturn(int replyCode, String replyText, String exchange,
20                     String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
21                 
22                 System.err.println("---------handle  return----------");
23                 System.err.println("replyCode: " + replyCode);
24                 System.err.println("replyText: " + replyText);
25                 System.err.println("exchange: " + exchange);
26                 System.err.println("routingKey: " + routingKey);
27                 System.err.println("properties: " + properties);
28                 System.err.println("body: " + new String(body));
29             }
30         });
31         
32         
33         channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
34             
         //消費端代碼
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_return_exchange";
        String routingKey = "return.#";
        String queueName = "test_return_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        
        channel.basicConsume(queueName, true, queueingConsumer);
        
        while(true){
            
            Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消費者: " + msg);
        }

  運行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM