場景:
我們一般在代碼中編寫while循環,進行consumer.nextDelivery方法進行獲取下一條消息,然后進行消費處理。
實際環境:
我們使用自定義的Consumer更加的方便,解耦性更強,也在實際工作中最常用。
操作:
//生產端代碼 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); }
//消費端代碼 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //使用自定義consumer channel.basicConsume(queueName, true, new MyConsumer(channel));
//自定義消費端 //繼承DefaultConsumer類 public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } //重寫handleDelivery() @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
運行結果: