上一篇講了個 哈嘍World,現在來看看如果存在多個消費者的情況。
生產者:
package com.example.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 競爭消費者模式 */ public class CompetingSend { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 連接工廠 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 獲取連接 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 聲明隊列,只有他不存在的時候創建 String msg = "Hello World!"; // 發送多條消息 for (int i = 0; i < 5; i++){ channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); System.out.println("Sending:" + msg); } channel.close(); connection.close(); } }
消費者:
package com.example.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 一個生產者,多個消費者 */ public class CompetingReceiveA { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 連接工廠 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 獲取連接 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 聲明隊列,只有他不存在的時候創建 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Receive:" + recv); try { doWork(recv); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Done"); } } }; // true代表接收到消息后,給兔子發消息,讓這條消息失效 channel.basicConsume(QUEUE_NAME, true, consumer); } // 模擬每條消息處理時間不一樣 private static void doWork(String msg) throws InterruptedException { char c = msg.charAt(msg.length() - 1); for (int i = 0; i < Integer.parseInt(c+""); i++) Thread.sleep(1000); } }
先啟動兩個消費者,再啟動生產者,查看控制台:
消費者A
消費者B
生產者(這里不必有疑問,這里打印的是修改之前的消息)
要說明的是什么觀點呢?
默認情況下,RabbitMQ將按順序將每條消息發送給下一個使用者。一般來說,每個消費者得到的消息是一樣多。但是,並不是說每個消費者的任務重量是平均的。很有可能出現A總在處理耗時任務,B一直吃西瓜的情況。
因為兔子不知道每個消息的耗時,他就會傻傻的派遣任務。
不過,官方也有解決辦法。
為了解決這個問題,我們可以使用basicQos方法,設置prefetchCount = 1。這告訴RabbitMQ不要向消費者發送多於一條消息。換句話說,在它處理並確認了前一個消息之前,不要向工作人員發送新消息。
如果當前消費者正在忙碌(沒有確認消息),它會將其分派給空閑下一個消費者。
int prefetchCount = 1; channel.basicQos(prefetchCount);