RabbitMQ入門_07_Fanout 與 Topic


A. 用廣播的方式實現發布訂閱

參考資料:https://www.rabbitmq.com/tutorials/tutorial-three-java.html

Fanout 類型的 Exchange 以廣播的方式向所有綁定到該 Exchange 的隊列推送消息

下面樣例代碼試圖使用 fanout 將狀態變更消息推送給所有接入系統:

gordon.study.rabbitmq.fanout.Fanout.java

public class Fanout {
 
    private static final String EXCHANGE_NAME = "StatusUpdateFanout";
 
    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
 
        final Channel senderChannel = connection.createChannel();
        senderChannel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        final CountDownLatch latch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 10;) {
                        String message = "NO. " + ++i;
                        TimeUnit.MILLISECONDS.sleep(100);
                        senderChannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                        System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", "S", EXCHANGE_NAME + ":", message);
                        if (i == 4) {
                            latch.countDown();
                        }
                    }
                    senderChannel.close();
                } catch (Exception e) {
                }
            }
        }).start();
 
        final Channel consumerChannel1 = connection.createChannel();
        consumerChannel1.queueDeclare("SystemA", false, false, true, null);
        consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "");
        consumerChannel1.basicQos(3);
        Consumer consumer1 = new DefaultConsumer(consumerChannel1) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "A", "SystemA", message);
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
                consumerChannel1.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        consumerChannel1.basicConsume("SystemA", false, consumer1);
 
        latch.await();
        final Channel consumerChannel2 = connection.createChannel();
        consumerChannel2.queueDeclare("SystemB", false, false, true, null);
        consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "");
        consumerChannel2.basicQos(3);
        Consumer consumer2 = new DefaultConsumer(consumerChannel2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.printf(" [    %2$s<===](%1$s) %3$s\n", "B -- won't receive first 4 messages", "SystemB", message);
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                }
                consumerChannel2.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        consumerChannel2.basicConsume("SystemB", false, consumer2);
    }
}

代碼第11行申明了名字叫 StatusUpdateFanout 的 fanout 類型 Exchange。

第33行申明了 SystemA 隊列,第34行將 SystemA 隊列綁定到 StatusUpdateFanout Exchange。注意到綁定路由鍵為空,路由鍵對於 fanout 類型 Exchange 無意義

同理,第20行代碼發送消息時,也不用設置路由鍵。所有發向 fanout 類型 Exchange 的消息都會無視路由鍵,廣播給每個綁定隊列

B. 用 Topic 實現發布訂閱

參考資料:https://www.rabbitmq.com/tutorials/tutorial-five-java.html

Topic 類型的 Exchange 通過支持通配符的路由鍵管理復雜的發布訂閱關系

發送消息時指定的路由鍵必須是點號(.)分隔的單詞,例如 sourceA.statusUpdate.systemA。

隊列綁定 Exchange 時指定的路由鍵可以使用通配符:

  • *(星號)替代一個單詞
  • #(井號)替代 0~n 個單詞

對每一條消息,Exchange 會遍歷所有的綁定關系,確認消息指定的路由鍵(例如 sourceA.statusUpdate.systemA)是否匹配綁定關系中的路由鍵,如果匹配,則將消息推送到相應隊列(例如 sourceA.statusUpdate.systemA、*.*.systemA、*.statusUpdate.* 和 sourceA.# 都匹配,但是 sourceB.# 和 *.systemA 都不匹配)。

gordon.study.rabbitmq.topic.Topic.java

senderChannel.exchangeDeclare(EXCHANGE_NAME, "topic");

consumerChannel1.queueBind("SystemA", EXCHANGE_NAME, "#.SystemA");

consumerChannel2.queueBind("SystemB", EXCHANGE_NAME, "*.*.SystemB");

senderChannel.basicPublish(EXCHANGE_NAME, "preOrder.statusUpdate.SystemA", null, message.getBytes("UTF-8"));

C. 沒有歷史數據?

對於中途創建的隊列(例如上面的 SystemB 隊列),是沒有辦法獲得之前的消息的。但是如果隊列提前創建好,就算沒有消費者,隊列里依然會有全量的數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM