最近公司需要使用RabbitMQ,但我之前一直使用的是ActiveMQ,對RabbitMQ進行了初步的學習,但是還不系統,自己做了一些小測試,怕自己以后忘了
一. 背景
拿到代碼以后,發現,生產者在向外發送消息時,指定了exchange(交換機)和routing key,但是沒有指定queue(隊列)也沒有將queue(隊列)綁定到exchange,剛開始因為不熟悉rabbitMQ,所有不知道怎么回事,后來知道了:消費者在消費消息時,需要聲明隊列(隊列名字隨便),並將聲明的隊列通過routing key綁定到exchange,這樣才能接收到數據,因此,生產者方需要將exchange和routing key實現告知消費者方。
二. 代碼實例:生產者方指定了exchange(交換機)和routing key,但是不指定queue(隊列)也不將queue(隊列)綁定到exchange,隊列聲明和綁定隊列到exchange的工作由消費者方完成
1. 生產者方
① 生產者方代碼
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE8"; 8 9 public static void main(String[] args) throws IOException { 10 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 factory.setPort(5672); 14 factory.setUsername("guest"); 15 factory.setPassword("guest"); 16 17 Connection connection = factory.newConnection(); 18 Channel channel = connection.createChannel(); 19 20 String message = "Hello World!"; 21 22 // 指定exchange和routing key,並發送消息到exchange 23 channel.basicPublish("FILETOPIC", "KEY.FILE", null, message.getBytes()); 24 System.out.println(" [x] Sent '" + message + "'"); 25 26 channel.close(); 27 connection.close(); 28 } 29 }
② 生產者方代碼運行后,可在rabbiteMQ managerment 管理界面看到相應exchange,如下圖所示:
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2. 消費者方:消費者聲明隊列,隊列名稱隨便起,並將該隊列通過生產者指定的routing key綁定的其指定的exchange上
① 消費者方代碼
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE8"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setPort(5672); 14 factory.setUsername("guest"); 15 factory.setPassword("guest"); 16 factory.setHost("localhost"); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 聲明隊列 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 24 25 // 綁定隊列到交換機 26 channel.queueBind(QUEUE_NAME, "FILETOPIC", "KEY.FILE"); 27 28 QueueingConsumer consumer = new QueueingConsumer(channel); 29 channel.basicConsume(QUEUE_NAME, true, consumer); 30 while(true){ 31 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 32 String message = new String(delivery.getBody(),"UTF-8"); 33 System.out.println(" 【[x] Received 】:" + message); 34 } 35 } 36 }
② 運行效果,消費者方代碼運行后,在rabbiteMQ managerment 管理界面可以看到聲明的隊列,並發現該隊列已經綁定到了生產者指定的exchange上
---------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
---------------------------------------------------------------------------------------------------------------------------------------------------------------
三. 代碼實例:生產者指定exchange和routing key,聲明隊列並將隊列綁定到exchange,消費者只需從生產者綁定的隊列消費即可
1. 生產者
① 生產者代碼
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE1"; 8 9 public static void main(String[] args) throws IOException { 10 11 ConnectionFactory factory = new ConnectionFactory(); 12 factory.setHost("localhost"); 13 factory.setPort(5672); 14 factory.setUsername("guest"); 15 factory.setPassword("guest"); 16 17 Connection connection = factory.newConnection(); 18 Channel channel = connection.createChannel(); 19 20 String message = "Hello World!"; 21 22 // 聲明隊列 23 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 24 25 // 綁定隊列到交換機 26 channel.queueBind(QUEUE_NAME, "FILETOPIC", "KEY.FILE"); 27 28 // 指定exchange和routing key,並發送消息到exchange 29 channel.basicPublish("FILETOPIC", "KEY.FILE", null, message.getBytes()); 30 System.out.println(" [x] Sent '" + message + "'"); 31 32 channel.close(); 33 connection.close(); 34 } 35 }
② 生產者運行效果:生產者代碼運行后,在rabbiteMQ managerment 管理界面可以看到生產者聲明的隊列,並發現該隊列已經綁定到了生產者指定的exchange上,如下圖所示:
2. 消費者
① 消費者代碼:消費者現在直接消費隊列中的消息即可,既不用聲明隊列,也不用綁定
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE1"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setPort(5672); 14 factory.setUsername("guest"); 15 factory.setPassword("guest"); 16 factory.setHost("localhost"); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 QueueingConsumer consumer = new QueueingConsumer(channel); 22 channel.basicConsume(QUEUE_NAME, true, consumer); 23 while(true){ 24 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 25 String message = new String(delivery.getBody(),"UTF-8"); 26 System.out.println(" 【[x] Received 】:" + message); 27 } 28 } 29 }
② 消費者運行效果:在rabbiteMQ managerment 管理界面可以看到,生產者指定的隊列已經有了消費者