准備工作
1、安裝RabbitMQ,參考【RabbitMQ】 RabbitMQ安裝
2、新建Java項目,引入RabbitMQ的Maven依賴
1 <dependency> 2 <groupId>com.rabbitmq</groupId> 3 <artifactId>amqp-client</artifactId> 4 <version>5.5.0</version> 5 </dependency>
單生產者消費者
流程圖:
1、創建生產者Producer1
1 public class Producer1 { 2 3 private final static String QUEUE_NAME = "rabbitMQ.test.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 7 // 創建連接工廠 8 ConnectionFactory factory = new ConnectionFactory(); 9 // 設置RabbitMQ相關信息 10 factory.setHost("localhost"); 11 factory.setUsername("guest"); 12 factory.setPassword("guest"); 13 factory.setPort(5672); 14 // 創建一個新的連接 15 Connection connection = factory.newConnection(); 16 // 創建一個通道 17 Channel channel = connection.createChannel(); 18 19 // 聲明一個隊列 20 // queueDeclare(隊列名稱,是否持久化(true表示是,隊列將在服務器重啟時生存),是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除), 21 // 當所有消費者客戶端連接斷開時是否自動刪除隊列,隊列的其他參數) 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 String message = "Hello RabbitMQ ~"; 24 25 // 發送消息到隊列中 26 // basicPublish(交換機名稱,隊列映射的路由key,消息的其他屬性,發送信息的主體) 27 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); 28 System.out.println("Producer Send +'" + message + "'"); 29 // 關閉通道和連接 30 channel.close(); 31 connection.close(); 32 33 } 34 35 }
2、創建消費者Customer1
1 public class Customer1 { 2 3 private final static String QUEUE_NAME = "rabbitMQ.test.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 // 創建連接工廠 7 ConnectionFactory factory = new ConnectionFactory(); 8 // 設置RabbitMQ地址 9 factory.setHost("localhost"); 10 factory.setUsername("guest"); 11 factory.setPassword("guest"); 12 factory.setPort(5672); 13 // 創建一個新的連接 14 Connection connection = factory.newConnection(); 15 // 創建一個通道 16 final Channel channel = connection.createChannel(); 17 // 聲明要關注的隊列 18 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 19 channel.basicQos(1);//保證一次只分發一個 20 System.out.println("Customer Waiting Received messages"); 21 22 // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道, 23 // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery 24 Consumer consumer = new DefaultConsumer(channel) { 25 @Override 26 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 27 byte[] body) throws IOException { 28 String message = new String(body, "UTF-8"); 29 System.out.println("Customer Received '" + message + "'"); 30 } 31 }; 32 33 // 自動回復隊列應答 -- RabbitMQ中的消息確認機制 34 channel.basicConsume(QUEUE_NAME, true, consumer); 35 } 36 37 }
3、運行結果
a、生產者
b、消費者
推送確認和消費應答
流程圖
1、創建推送確認生產者Producer2
1 public class Producer2 { 2 3 private final static String QUEUE_NAME = "rabbitMQ.test.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 7 // 創建連接工廠 8 ConnectionFactory factory = new ConnectionFactory(); 9 // 設置RabbitMQ相關信息 10 factory.setHost("localhost"); 11 factory.setUsername("guest"); 12 factory.setPassword("guest"); 13 factory.setPort(5672); 14 // 創建一個新的連接 15 Connection connection = factory.newConnection(); 16 // 創建一個通道 17 Channel channel = connection.createChannel(); 18 19 // 聲明一個隊列 20 // queueDeclare(隊列名稱,是否持久化(true表示是,隊列將在服務器重啟時生存),是否是獨占隊列(創建者可以使用的私有隊列,斷開后自動刪除), 21 // 當所有消費者客戶端連接斷開時是否自動刪除隊列,隊列的其他參數) 22 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 23 24 // 開啟發送方確認模式 25 channel.confirmSelect(); 26 27 // 存儲未確認的消息標識tag 28 final SortedSet<Long> confirmSet = Collections.synchronizedNavigableSet(new TreeSet<Long>()); 29 30 // 異步監聽確認和未確認的消息 31 channel.addConfirmListener(new ConfirmListener() { 32 33 /** 34 * 處理返回確認成功 35 * 36 * @param deliveryTag 如果是多條,這個就是最后一條消息的tag 37 * @param multiple 是否多條 38 * @throws IOException 39 */ 40 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 41 System.out.println("消息發送成功, deliveryTag:" + deliveryTag + " multiple:" + multiple + ""); 42 if (multiple) { 43 // 移除發送成功的多條消息標識tag 44 confirmSet.headSet(deliveryTag + 1).clear(); 45 } else { 46 // 移除發送成功的一條消息標識tag 47 confirmSet.remove(deliveryTag); 48 } 49 } 50 51 /** 52 * 處理返回確認失敗 53 * 54 * @param deliveryTag 如果是多條,這個就是最后一條消息的tag 55 * @param multiple 是否多條 56 * @throws IOException 57 */ 58 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 59 System.out.println("失敗,deliveryTag:" + deliveryTag + "multiple:" + multiple + ""); 60 if (multiple) { 61 confirmSet.headSet(deliveryTag + 1).clear(); 62 } else { 63 confirmSet.remove(deliveryTag); 64 } 65 } 66 67 }); 68 69 String message = "Hello RabbitMQ ~ "; 70 71 // 發送消息到隊列中 72 // basicPublish(交換機名稱,隊列映射的路由key,消息的其他屬性,發送信息的主體) 73 for (int i = 1; i <= 10; i++) { 74 String msg = message + i; 75 long tag = channel.getNextPublishSeqNo(); 76 confirmSet.add(tag); 77 System.out.println("tag:" + tag); 78 79 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes("UTF-8")); 80 System.out.println("Producer Send +'" + msg + "'"); 81 82 } 83 84 System.out.println("============================"); 85 // 關閉通道和連接 86 // channel.close(); 87 // connection.close(); 88 89 } 90 }
2、創建消費應答消費者Customer2
1 public class Customer2 { 2 3 private final static String QUEUE_NAME = "rabbitMQ.test.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 // 創建連接工廠 7 ConnectionFactory factory = new ConnectionFactory(); 8 // 設置RabbitMQ地址 9 factory.setHost("localhost"); 10 factory.setUsername("guest"); 11 factory.setPassword("guest"); 12 factory.setPort(5672); 13 // 創建一個新的連接 14 Connection connection = factory.newConnection(); 15 // 創建一個通道 16 final Channel channel = connection.createChannel(); 17 // 聲明要關注的隊列 18 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 19 channel.basicQos(1);// 保證一次只分發一個 20 System.out.println("Customer Waiting Received messages"); 21 22 // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道, 23 // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery 24 Consumer consumer = new DefaultConsumer(channel) { 25 @Override 26 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 27 byte[] body) throws IOException { 28 String message = new String(body, "UTF-8"); 29 System.out.println("Customer Received '" + message + "'"); 30 31 // 返回消費確認狀態 32 channel.basicAck(envelope.getDeliveryTag(), false); 33 } 34 }; 35 36 // 消費手動確認 -- RabbitMQ中的消息確認機制 37 channel.basicConsume(QUEUE_NAME, false, consumer); 38 } 39 40 }
3、運行結果
生產者:
消費者:
多消費者
流程圖
1、創建推送確認生產者Producer3(與示例Producer2一樣)
2、創建消費應答消費者Customer3
1 public class Customer3 { 2 3 private final static String QUEUE_NAME = "rabbitMQ.test.queue"; 4 5 public static void main(String[] args) throws IOException, TimeoutException { 6 Customer3 customer3 = new Customer3(); 7 customer3.createCustomer("customer1"); 8 customer3.createCustomer("customer2"); 9 } 10 11 public void createCustomer(final String customerName) throws IOException, TimeoutException{ 12 13 // 創建連接工廠 14 ConnectionFactory factory = new ConnectionFactory(); 15 // 設置RabbitMQ地址 16 factory.setHost("localhost"); 17 factory.setUsername("guest"); 18 factory.setPassword("guest"); 19 factory.setPort(5672); 20 // 創建一個新的連接 21 Connection connection = factory.newConnection(); 22 // 創建一個通道 23 final Channel channel = connection.createChannel(); 24 // 聲明要關注的隊列 25 channel.queueDeclare(QUEUE_NAME, false, false, false, null); 26 channel.basicQos(1);// 保證一次只分發一個 27 System.out.println(customerName + " Waiting Received messages"); 28 29 // DefaultConsumer類實現了Consumer接口,通過傳入一個頻道, 30 // 告訴服務器我們需要那個頻道的消息,如果頻道中有消息,就會執行回調函數handleDelivery 31 Consumer consumer = new DefaultConsumer(channel) { 32 @Override 33 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 34 byte[] body) throws IOException { 35 String message = new String(body, "UTF-8"); 36 System.out.println(customerName + " Received '" + message + "'"); 37 38 // doWork處理任務 39 doWork(customerName); 40 41 // 返回消費確認狀態 42 channel.basicAck(envelope.getDeliveryTag(), false); 43 } 44 }; 45 46 // 消費手動確認 -- RabbitMQ中的消息確認機制 47 channel.basicConsume(QUEUE_NAME, false, consumer); 48 } 49 50 51 private void doWork(String customer) { 52 try { 53 Thread.sleep(2000); // 暫停2秒鍾 54 System.out.println(customer + ": completion of the job!"); 55 } catch (InterruptedException _ignored) { 56 Thread.currentThread().interrupt(); 57 } 58 } 59 }
3、運行結果
生產者與前面相同
消費者