引入RabbitMQ的jar包
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.1.2</version> </dependency>
創建消息生產者
1 import com.rabbitmq.client.Channel; 2 import com.rabbitmq.client.Connection; 3 import com.rabbitmq.client.ConnectionFactory; 4 5 import java.io.IOException; 6 import java.util.concurrent.TimeoutException; 7 8 public class Producer { 9 private static final String QUEUE_NAME ="queue.test"; 10 11 public static void main(String[] args) throws IOException, TimeoutException { 12 //創建連接工廠 13 ConnectionFactory connectionFactory = new ConnectionFactory(); 14 connectionFactory.setHost("localhost"); 15 //創建一個連接 16 Connection connection = connectionFactory.newConnection(); 17 //創建一個通道 18 Channel channel = connection.createChannel(); 19 //聲明隊列 20 //queueDeclare第一個參數表示隊列名稱、 21 // 第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、 22 // 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、 23 // 第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數 24 channel.queueDeclare(QUEUE_NAME,false,false,false,null); 25 String msg = "hello rabbit"; 26 //發送消息到隊列 27 //basicPublish第一個參數為交換機名稱、 28 // 第二個參數為隊列映射的路由key、 29 // 第三個參數為消息的其他屬性、 30 // 第四個參數為發送信息的主體 31 channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8")); 32 System.out.println("Producer Send +'" + msg + "'"); 33 channel.close(); 34 connection.close(); 35 } 36 }
創建消費者
package com.ysl.rabbit; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Customer { private static final String QUEUE_NAME ="queue.test"; public static void main(String[] args) throws IOException, TimeoutException { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setUsername("admin"); connectionFactory.setPassword("admin"); //創建一個連接 Connection connection = connectionFactory.newConnection(); //創建一個通道 Channel channel = connection.createChannel(); //聲明隊列 //queueDeclare第一個參數表示隊列名稱、 // 第二個參數為是否持久化(true表示是,隊列將在服務器重啟時生存)、 // 第三個參數為是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除)、 // 第四個參數為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數為隊列的其他參數 channel.queueDeclare(QUEUE_NAME,false,false,false,null); System.out.println("Customer Waiting Received messages"); Consumer consumer = new DefaultConsumer(channel){ /** * envelope主要存放生產者相關信息(比如交換機、路由key等)body是消息實體。 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received '" + message + "'"); } }; //自動回復隊列應答 -- RabbitMQ中的消息確認機制 channel.basicConsume(QUEUE_NAME,true, consumer); } }