RabbitMQ消費端自定義監聽(九)


  場景

    我們一般在代碼中編寫while循環,進行consumer.nextDelivery方法進行獲取下一條消息,然后進行消費處理。

  實際環境

    我們使用自定義的Consumer更加的方便,解耦性更強,也在實際工作中最常用。

  操作  

        //生產端代碼
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchange = "test_consumer_exchange";
        String routingKey = "consumer.save";
        
        String msg = "Hello RabbitMQ Consumer Message";
        
        for(int i =0; i<5; i ++){
            channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
        }
        //消費端代碼
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        //使用自定義consumer
        channel.basicConsume(queueName, true, new MyConsumer(channel));    
       //自定義消費端
        //繼承DefaultConsumer類
        public class MyConsumer extends DefaultConsumer {


               public MyConsumer(Channel channel) {
                       super(channel);
               }
    
               //重寫handleDelivery()
              @Override
              public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                      System.err.println("-----------consume message----------");
                      System.err.println("consumerTag: " + consumerTag);
                      System.err.println("envelope: " + envelope);
                      System.err.println("properties: " + properties);
                      System.err.println("body: " + new String(body));
               }

  
          }    

    運行結果:

    

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM