一、Java項目創建並整合RabbitMQ
1、創建Maven項目
2、添加依賴
官方地址: https://www.rabbitmq.com/java-client.html
依賴地址: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.10.0
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> </dependencies>
二、RabbitMQ簡單隊列實戰
⽂檔:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
1、消息生產者
package com.xdclass.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( //JDK7語法 或⾃動關閉 connnection 和channel //創建連接 Connection connection = factory.newConnection(); //創建信道 Channel channel = connection.createChannel()) { /** * 隊列名稱 * 持久化配置: mq重啟后還在 * 是否獨占:只能有⼀個消費者監聽隊列;當connection關閉是否刪除隊列,⼀般是false,發布訂閱是獨占 * ⾃動刪除: 當沒有消費者的時候,⾃動刪除掉,⼀般是false * 其他參數 * * 隊列不存在則會⾃動創建,如果存在則不會覆蓋,所以此時的時候需要注意屬性 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!222222"; /** * 參數說明: * 交換機名稱:不寫則是默認的交換機,那路由鍵需要和隊列名稱⼀樣才可以被路由, * 路由鍵名稱 * 配置信息 * 發送的消息數據:字節數組 */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
2、消息消費者
會⼀直監聽隊列
package com.xdclass.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); //消費者⼀般不增加⾃動關閉 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages.To exit press CTRL + C"); //回調方法,下⾯兩種都⾏ Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // consumerTag 是固定的 可以做此會話的名字,deliveryTag 每次接收消息 +1 System.out.println("consumerTag 消息標識 = " + consumerTag); //可以獲取交換機,路由鍵等System.out.println("envelope元數據 = "+envelope); System.out.println("properties配置信息 = " + properties); System.out.println("body=" + new String(body, "utf-8")); } }; channel.basicConsume(QUEUE_NAME, true, consumer); //也可以用下面這種 /* DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; //⾃動確認消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); */ } }
三、RabbitMQ工作隊列—輪詢策略
1、工作隊列
- 消息生產能力大於消費能力,增加多幾個消費節點
- 和簡單隊列類似,增加多個幾個消費節點,處於競爭關系
- 默認策略: round robin 輪詢
2、生產者代碼
package com.xdclass.work.rr; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); try ( //JDK7語法 或⾃動關閉 connnection 和channel //創建連接 Connection connection = factory.newConnection(); //創建信道 Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { String message = "Hello World! i==" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } }
3、消費者代碼,延遲1s消費
public class Recv1 { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*]Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模擬消費緩慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); //⼿工確認消息消費,不是多條確認 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //關閉⾃動確認消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
4、消費者代碼,延遲3s消費
public class Recv2 { private final static String QUEUE_NAME = "work_mq_rr"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.216.130"); factory.setUsername("admin"); factory.setPassword("password"); factory.setVirtualHost("/dev"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*]Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { //模擬消費緩慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } String message = new String(delivery.getBody(), "UTF-8"); System.out.println("[x] Received '" + message + "'"); //⼿工確認消息消費,不是多條確認 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; //關閉⾃動確認消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
5、輪詢策略驗證
- 先啟動兩個消費者,再啟動生產者,消息會平分到兩個消費端
- 缺點:存在部分節點消費過快,部分節點消費慢,導致不能合理處理消息
四、RabbitMQ工作隊列—公平策略
1、公平策略驗證
- 修改消費者策略
- 解決消費者能力消費不足的問題,降低消費時間問題