一. 背景
總前提:隊列無論是在生產者聲明還是在消費者聲明,只有聲明了,才能在RabbitMQ的管理界面看到該隊列
生產者直接發送消息到隊列,消費者直接消費隊列中的消息,而不用指定exchange並綁定。這種需求下,分三種情況:① 生產者聲明隊列(指定隊列名稱),消費者不指定隊列,而是直接消費生產者指定的隊列;② 生產者聲指定隊列,但不聲明隊列,而是直接將消息發送到該隊列,消費生聲明該隊列,並從該隊列接收消息;③ 生產者聲明隊列並將消息發送到該隊列,消費者也聲明該隊列,並從該隊列消費消息,但是:生產者和消費者聲明隊列時指定的參數要一致,否則會報錯。下面分別進行說明:
1. 生產者聲明隊列(指定隊列名稱),消費者不指定隊列,而是直接消費生產者指定的隊列,但是此時,聲明隊列的一方要先運行,否則消費者連不上隊列,要報錯
① 生產者代碼
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE1"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 // 聲明隊列 19 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 20 String message = "Hello World!"; 21 22 // 發行消息到隊列 23 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 24 System.out.println(" [x] Sent '" + message + "'"); 25 26 channel.close(); 27 connection.close(); 28 } 29 }
2. 消費者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE1"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 22 23 QueueingConsumer consumer = new QueueingConsumer(channel); 24 25 // 消費者不聲明隊列,直接從隊列中消費 26 channel.basicConsume(QUEUE_NAME, true, consumer); 27 while(true){ 28 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 29 String message = new String(delivery.getBody(),"UTF-8"); 30 System.out.println(" 【[x] Received 】:" + message); 31 } 32 } 33 }
2. 生產者聲指定隊列,但不聲明隊列,而是直接將消息發送到該隊列,消費生聲明該隊列,並從該隊列接收消息,生產者可先運行)(不報錯),但是發的消息無效(被丟棄),只有聲明隊列的一方運行后,在管理界面才能看到該隊列
① 生產者
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE2"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 String message = "Hello World!"; 19 20 // 發行消息到隊列 21 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 22 System.out.println(" [x] Sent '" + message + "'"); 23 24 channel.close(); 25 connection.close(); 26 } 27 }
② 消費者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE2"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 聲明隊列 22 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 23 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 24 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 27 // 消費者不聲明隊列,直接從隊列中消費 28 channel.basicConsume(QUEUE_NAME, true, consumer); 29 while(true){ 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody(),"UTF-8"); 32 System.out.println(" 【[x] Received 】:" + message); 33 } 34 } 35 }
3. 生產者聲明隊列並將消息發送到該隊列,消費者也聲明該隊列,並從該隊列消費消息,但是:生產者和消費者聲明隊列時指定的參數要一致,否則會報錯。
① 生產者
1 import java.io.IOException; 2 import com.rabbitmq.client.Channel; 3 import com.rabbitmq.client.Connection; 4 import com.rabbitmq.client.ConnectionFactory; 5 6 public class Producer { 7 private final static String QUEUE_NAME = "QUEUE2"; 8 9 public static void main(String[] args) throws IOException { 10 ConnectionFactory factory = new ConnectionFactory(); 11 factory.setHost("localhost"); 12 factory.setUsername("guest"); 13 factory.setPassword("guest"); 14 factory.setPort(5672); 15 Connection connection = factory.newConnection(); 16 Channel channel = connection.createChannel(); 17 18 // 聲明隊列 19 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 20 String message = "Hello World!"; 21 22 // 發行消息到隊列 23 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 24 System.out.println(" [x] Sent '" + message + "'"); 25 26 channel.close(); 27 connection.close(); 28 } 29 }
② 消費者
1 import com.rabbitmq.client.ConnectionFactory; 2 import com.rabbitmq.client.QueueingConsumer; 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 6 public class Reqv { 7 8 private final static String QUEUE_NAME = "QUEUE2"; 9 10 public static void main(String[] argv) throws Exception { 11 12 ConnectionFactory factory = new ConnectionFactory(); 13 factory.setUsername("guest"); 14 factory.setPassword("guest"); 15 factory.setHost("localhost"); 16 factory.setPort(5672); 17 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 21 // 聲明隊列 22 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 23 System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); 24 25 QueueingConsumer consumer = new QueueingConsumer(channel); 26 27 // 消費者不聲明隊列,直接從隊列中消費 28 channel.basicConsume(QUEUE_NAME, true, consumer); 29 while(true){ 30 QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 31 String message = new String(delivery.getBody(),"UTF-8"); 32 System.out.println(" 【[x] Received 】:" + message); 33 } 34 } 35 }