RabbitMQ初學之二:直接發送消息到隊列


一. 背景

  總前提:隊列無論是在生產者聲明還是在消費者聲明,只有聲明了,才能在RabbitMQ的管理界面看到該隊列

  生產者直接發送消息到隊列,消費者直接消費隊列中的消息,而不用指定exchange並綁定。這種需求下,分三種情況:① 生產者聲明隊列(指定隊列名稱),消費者不指定隊列,而是直接消費生產者指定的隊列;② 生產者聲指定隊列,但不聲明隊列,而是直接將消息發送到該隊列,消費生聲明該隊列,並從該隊列接收消息;③ 生產者聲明隊列並將消息發送到該隊列,消費者也聲明該隊列,並從該隊列消費消息,但是:生產者和消費者聲明隊列時指定的參數要一致,否則會報錯。下面分別進行說明:

1. 生產者聲明隊列(指定隊列名稱),消費者不指定隊列,而是直接消費生產者指定的隊列,但是此時,聲明隊列的一方要先運行,否則消費者連不上隊列,要報錯

  ① 生產者代碼

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE1";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         // 聲明隊列
19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
20         String message = "Hello World!";  
21         
22         // 發行消息到隊列
23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
24         System.out.println(" [x] Sent '" + message + "'");  
25  
26         channel.close();  
27         connection.close();  
28     }  
29 }

  2. 消費者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE1";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22         
23         QueueingConsumer consumer = new QueueingConsumer(channel);  
24         
25         // 消費者不聲明隊列,直接從隊列中消費
26         channel.basicConsume(QUEUE_NAME, true, consumer);  
27         while(true){  
28             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
29             String message = new String(delivery.getBody(),"UTF-8");  
30             System.out.println(" 【[x] Received 】:" + message);  
31         }  
32     }  
33 }

2.  生產者聲指定隊列,但不聲明隊列,而是直接將消息發送到該隊列,消費生聲明該隊列,並從該隊列接收消息,生產者可先運行)(不報錯),但是發的消息無效(被丟棄),只有聲明隊列的一方運行后,在管理界面才能看到該隊列

  ① 生產者

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE2";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         String message = "Hello World!";  
19         
20         // 發行消息到隊列
21         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
22         System.out.println(" [x] Sent '" + message + "'");  
23  
24         channel.close();  
25         connection.close();  
26     }  
27 }

 

  ② 消費者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE2";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         // 聲明隊列
22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
24         
25         QueueingConsumer consumer = new QueueingConsumer(channel);  
26         
27         // 消費者不聲明隊列,直接從隊列中消費
28         channel.basicConsume(QUEUE_NAME, true, consumer);  
29         while(true){  
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
31             String message = new String(delivery.getBody(),"UTF-8");  
32             System.out.println(" 【[x] Received 】:" + message);  
33         }  
34     }  
35 }

 

3. 生產者聲明隊列並將消息發送到該隊列,消費者也聲明該隊列,並從該隊列消費消息,但是:生產者和消費者聲明隊列時指定的參數要一致,否則會報錯。

  ① 生產者

 1 import java.io.IOException;
 2 import com.rabbitmq.client.Channel;
 3 import com.rabbitmq.client.Connection;
 4 import com.rabbitmq.client.ConnectionFactory;
 5  
 6 public class Producer {
 7     private final static String QUEUE_NAME = "QUEUE2";  
 8  
 9     public static void main(String[] args) throws IOException {  
10         ConnectionFactory factory = new ConnectionFactory();  
11         factory.setHost("localhost");
12         factory.setUsername("guest");
13         factory.setPassword("guest");
14         factory.setPort(5672);
15         Connection connection = factory.newConnection();  
16         Channel channel = connection.createChannel();  
17  
18         // 聲明隊列
19         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
20         String message = "Hello World!";  
21         
22         // 發行消息到隊列
23         channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
24         System.out.println(" [x] Sent '" + message + "'");  
25  
26         channel.close();  
27         connection.close();  
28     }  
29 }

  ② 消費者

 1 import com.rabbitmq.client.ConnectionFactory;  
 2 import com.rabbitmq.client.QueueingConsumer;  
 3 import com.rabbitmq.client.Channel;  
 4 import com.rabbitmq.client.Connection;  
 5 
 6 public class Reqv {
 7     
 8     private final static String QUEUE_NAME = "QUEUE2";  
 9       
10     public static void main(String[] argv) throws Exception {  
11         
12         ConnectionFactory factory = new ConnectionFactory();
13         factory.setUsername("guest");
14         factory.setPassword("guest");
15         factory.setHost("localhost");
16         factory.setPort(5672);  
17         
18         Connection connection = factory.newConnection();  
19         Channel channel = connection.createChannel();  
20         
21         // 聲明隊列
22         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
23         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
24         
25         QueueingConsumer consumer = new QueueingConsumer(channel);  
26         
27         // 消費者不聲明隊列,直接從隊列中消費
28         channel.basicConsume(QUEUE_NAME, true, consumer);  
29         while(true){  
30             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
31             String message = new String(delivery.getBody(),"UTF-8");  
32             System.out.println(" 【[x] Received 】:" + message);  
33         }  
34     }  
35 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM