RabbitMQ初學之一:exchange與queue的綁定


最近公司需要使用RabbitMQ,但我之前一直使用的是ActiveMQ,對RabbitMQ進行了初步的學習,但是還不系統,自己做了一些小測試,怕自己以后忘了

一. 背景

  拿到代碼以后,發現,生產者在向外發送消息時,指定了exchange(交換機)和routing key,但是沒有指定queue(隊列)也沒有將queue(隊列)綁定到exchange,剛開始因為不熟悉rabbitMQ,所有不知道怎么回事,后來知道了:消費者在消費消息時,需要聲明隊列(隊列名字隨便),並將聲明的隊列通過routing key綁定到exchange,這樣才能接收到數據,因此,生產者方需要將exchange和routing key實現告知消費者方。

二. 代碼實例:生產者方指定了exchange(交換機)和routing key,但是不指定queue(隊列)也不將queue(隊列)綁定到exchange,隊列聲明和綁定隊列到exchange的工作由消費者方完成

  1. 生產者方

    ① 生產者方代碼

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE8";  
 8  
 9     public static void main(String[] args) throws IOException { 
10         
11         ConnectionFactory factory = new ConnectionFactory();  
12         factory.setHost("localhost");
13         factory.setPort(5672);
14         factory.setUsername("guest");
15         factory.setPassword("guest");
16         
17         Connection connection = factory.newConnection();  
18         Channel channel = connection.createChannel();  
19  
20         String message = "Hello World!"; 
21         
22         // 指定exchange和routing key,並發送消息到exchange
23         channel.basicPublish("FILETOPIC", "KEY.FILE", null, message.getBytes());  
24         System.out.println(" [x] Sent '" + message + "'");  
25  
26         channel.close();  
27         connection.close();  
28     }  
29 }

    ② 生產者方代碼運行后,可在rabbiteMQ managerment 管理界面看到相應exchange,如下圖所示:

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

  2. 消費者方:消費者聲明隊列,隊列名稱隨便起,並將該隊列通過生產者指定的routing key綁定的其指定的exchange上

    ① 消費者方代碼

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE8";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setPort(5672);  
14         factory.setUsername("guest");
15         factory.setPassword("guest");
16         factory.setHost("localhost");
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         // 聲明隊列
22         channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
24         
25         // 綁定隊列到交換機
26         channel.queueBind(QUEUE_NAME, "FILETOPIC", "KEY.FILE");
27         
28         QueueingConsumer consumer = new QueueingConsumer(channel);  
29         channel.basicConsume(QUEUE_NAME, true, consumer);  
30         while(true){  
31             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
32             String message = new String(delivery.getBody(),"UTF-8");  
33             System.out.println(" 【[x] Received 】:" + message);  
34         }  
35     }  
36 }

    ② 運行效果,消費者方代碼運行后,在rabbiteMQ managerment 管理界面可以看到聲明的隊列,並發現該隊列已經綁定到了生產者指定的exchange上

---------------------------------------------------------------------------------------------------------------------------------------------------------------

--------------------------------------------------------------------------------------------------------------------------------------------------------------------

 

---------------------------------------------------------------------------------------------------------------------------------------------------------------

 三. 代碼實例:生產者指定exchange和routing key,聲明隊列並將隊列綁定到exchange,消費者只需從生產者綁定的隊列消費即可

  1. 生產者

    ① 生產者代碼

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE1";  
 8  
 9     public static void main(String[] args) throws IOException { 
10         
11         ConnectionFactory factory = new ConnectionFactory();  
12         factory.setHost("localhost");
13         factory.setPort(5672);
14         factory.setUsername("guest");
15         factory.setPassword("guest");
16         
17         Connection connection = factory.newConnection();  
18         Channel channel = connection.createChannel();  
19  
20         String message = "Hello World!"; 
21         
22         // 聲明隊列
23         channel.queueDeclare(QUEUE_NAME, true, false, false, null);
24         
25         // 綁定隊列到交換機
26         channel.queueBind(QUEUE_NAME, "FILETOPIC", "KEY.FILE");
27         
28         // 指定exchange和routing key,並發送消息到exchange
29         channel.basicPublish("FILETOPIC", "KEY.FILE", null, message.getBytes());  
30         System.out.println(" [x] Sent '" + message + "'");  
31  
32         channel.close();  
33         connection.close();  
34     }  
35 }

    ② 生產者運行效果:生產者代碼運行后,在rabbiteMQ managerment 管理界面可以看到生產者聲明的隊列,並發現該隊列已經綁定到了生產者指定的exchange上,如下圖所示:

  2. 消費者

    ① 消費者代碼:消費者現在直接消費隊列中的消息即可,既不用聲明隊列,也不用綁定

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE1";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setPort(5672);  
14         factory.setUsername("guest");
15         factory.setPassword("guest");
16         factory.setHost("localhost");
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         QueueingConsumer consumer = new QueueingConsumer(channel);  
22         channel.basicConsume(QUEUE_NAME, true, consumer);  
23         while(true){  
24             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
25             String message = new String(delivery.getBody(),"UTF-8");  
26             System.out.println(" 【[x] Received 】:" + message);  
27         }  
28     }  
29 }

    ② 消費者運行效果:在rabbiteMQ managerment 管理界面可以看到,生產者指定的隊列已經有了消費者

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM