我們知道,消費者有兩種方式從消息中間件獲取消息:
- 推模式:消息中間件主動將消息推送給消費者
- 拉模式:消費者主動從消息中間件拉取消息
推模式將消息提前推送給消費者,消費者必須設置一個緩沖區緩存這些消息。好處很明顯,消費者總是有一堆在內存中待處理的消息,所以效率高。缺點是緩沖區可能會溢出。
拉模式在消費者需要時才去消息中間件拉取消息,這段網絡開銷會明顯增加消息延遲,降低系統吞吐量。
選擇推模式還是拉模式需要考慮使用場景。
前面 HelloWorld 例子中使用 Consumer 接口的方式是推模式。RabbitMQ 會把隊列中的消息全部推送給消費者。如果刪除 Sender 的 sleep 代碼行,只要 Receiver 連着 hello 隊列,你就會發現 hello 隊列根本存不住消息,全都發送給了 Receiver,哪怕 Receiver 處理的再慢。
RabbitMQ 同時提供了拉模式,代碼如下:
gordon.study.rabbitmq.helloworld.ReceiverByGet.java
public class ReceiverByGet {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
while (true) {
GetResponse resp = channel.basicGet(QUEUE_NAME, true);
if (resp == null) {
System.out.println("Get Nothing!");
TimeUnit.MILLISECONDS.sleep(1000);
} else {
String message = new String(resp.getBody(), "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "Receiver", QUEUE_NAME, message);
TimeUnit.MILLISECONDS.sleep(500);
}
}
}
}
代碼第14行通過 basicGet 方法,主動去第一個參數指定的隊列(hello隊列)嘗試獲取一個消息,這是一個非阻塞方法,當從隊列中獲取消息失敗時,會返回 null,成功則返回 GetResponse 實例。第二個參數 autoAck 指定消息確認模式,作用同前文 basicConsume 方法同名參數。
代碼第15行判斷返回為 null 的場景。當返回為 null 時一般是因為隊列中無可用消息,所以需要讓自己 sleep 一小段時間,以防止過於頻繁地嘗試獲取消息。
