A. 用廣播的方式實現發布訂閱
參考資料:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
Fanout 類型的 Exchange 以廣播的方式向所有綁定到該 Exchange 的隊列推送消息。
下面樣例代碼試圖使用 fanout 將狀態變更消息推送給所有接入系統:
gordon.study.rabbitmq.fanout.Fanout.java
public class Fanout {
private static final String EXCHANGE_NAME = "StatusUpdateFanout";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel senderChannel = connection.createChannel();
senderChannel.exchangeDeclare(EXCHANGE_NAME, "fanout");
final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 10;) {
String message = "NO. " + ++i;
TimeUnit.MILLISECONDS.sleep(100);
senderChannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.printf("(%1$s)[===>%2$s ] %3$s\n", "S", EXCHANGE_NAME + ":", message);
if (i == 4) {
latch.countDown();
}
}
senderChannel.close();
} catch (Exception e) {
}
}
}).start();
final Channel consumerChannel1 = connection.createChannel();
consumerChannel1.queueDeclare("SystemA", false, false, true, null);
consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "");
consumerChannel1.basicQos(3);
Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "A", "SystemA", message);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
}
};
consumerChannel1.basicConsume("SystemA", false, consumer1);
latch.await();
final Channel consumerChannel2 = connection.createChannel();
consumerChannel2.queueDeclare("SystemB", false, false, true, null);
consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "");
consumerChannel2.basicQos(3);
Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [ %2$s<===](%1$s) %3$s\n", "B -- won't receive first 4 messages", "SystemB", message);
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
}
consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
}
};
consumerChannel2.basicConsume("SystemB", false, consumer2);
}
}
代碼第11行申明了名字叫 StatusUpdateFanout 的 fanout 類型 Exchange。
第33行申明了 SystemA 隊列,第34行將 SystemA 隊列綁定到 StatusUpdateFanout Exchange。注意到綁定路由鍵為空,路由鍵對於 fanout 類型 Exchange 無意義。
同理,第20行代碼發送消息時,也不用設置路由鍵。所有發向 fanout 類型 Exchange 的消息都會無視路由鍵,廣播給每個綁定隊列。
B. 用 Topic 實現發布訂閱
參考資料:https://www.rabbitmq.com/tutorials/tutorial-five-java.html
Topic 類型的 Exchange 通過支持通配符的路由鍵管理復雜的發布訂閱關系。
發送消息時指定的路由鍵必須是點號(.)分隔的單詞,例如 sourceA.statusUpdate.systemA。
隊列綁定 Exchange 時指定的路由鍵可以使用通配符:
- *(星號)替代一個單詞
- #(井號)替代 0~n 個單詞
對每一條消息,Exchange 會遍歷所有的綁定關系,確認消息指定的路由鍵(例如 sourceA.statusUpdate.systemA)是否匹配綁定關系中的路由鍵,如果匹配,則將消息推送到相應隊列(例如 sourceA.statusUpdate.systemA、*.*.systemA、*.statusUpdate.* 和 sourceA.# 都匹配,但是 sourceB.# 和 *.systemA 都不匹配)。
gordon.study.rabbitmq.topic.Topic.java
senderChannel.exchangeDeclare(EXCHANGE_NAME, "topic");
consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "#.SystemA");
consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "*.*.SystemB");
senderChannel.basicPublish(EXCHANGE_NAME, "preOrder.statusUpdate.SystemA", null, message.getBytes("UTF-8"));
C. 沒有歷史數據?
對於中途創建的隊列(例如上面的 SystemB 隊列),是沒有辦法獲得之前的消息的。但是如果隊列提前創建好,就算沒有消費者,隊列里依然會有全量的數據。