RabbitMQ學習總結 第六篇:Topic類型的exchange


目錄

RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld

RabbitMQ學習總結 第三篇:工作隊列Work Queue

RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe

RabbitMQ學習總結 第五篇:路由Routing

RabbitMQ學習總結 第六篇:Topic類型的exchange

RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)

 

在上篇博文中,我們對之前的日志系統做了稍許的完善。沒有使用fanout類型的exchange來廣播,而是使用了direct類型的exchange來選擇性的接收日志消息。

盡管使用了direct類型的exchange對日志系統有所提升,但還是有一些限制(消息不能夠基於多重因素來路由)。

在我們的日志系統中,希望不僅僅能夠根據日志級別來訂閱,還可以根據指定的routing key來訂閱。你應該可以理解的,就如unix的系統日志工具,日志消息路由規則不僅僅基於日志級別(info/warn/crit…),還可以基於設備(auth/cron/kern...)。

這樣大大的提高的靈活性,例如我們可以只監聽kern推送出來的error級別的日志。

為了在我們的日志記錄系統中實現這樣的功能,我們需要了解更多關於topic類型的exchange。

1、Topic類型的exchange

消息發送到topic類型的exchange上時不能隨意指定routing_key(一定是指由一系列由點號連接單詞的字符串,單詞可以是任意的,但一般都會與消息或多或少的有些關聯)。Routing key的長度不能超過255個字節。

Binding key也一定要是同樣的方式。Topic類型的exchange就像一個直接的交換:一個由生產者指定了確定routing key的消息將會被推送給所有Binding key能與之匹配的消費者。然而這種綁定有兩種特殊的情況:

  • *(星號):可以(只能)匹配一個單詞
  • #(井號):可以匹配多個單詞(或者零個)

下邊來舉個例子:

在這個例子中,我們將會發送一些描述動物的消息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:“<speed>.<colour>.<species>”。

這里我們創建三個Binding:Binding key為”*.orange.*”的Q1,和binding key為”*.*.rabbit”和”lazy.#”的Q2。

這些binding可以總結為:

  • Q1對所有橘色的(orange)的動物感興趣;
  • Q2希望能拿到所有兔子的(rabbit)信息,還有比較懶惰的(lazy.#)動物信息。

一條以” quick.orange.rabbit”為routing key的消息將會推送到Q1和Q2兩個queue上,routing key為“lazy.orange.elephant”的消息同樣會被推送到Q1和Q2上。但如果routing key為”quick.orange.fox”的話,消息只會被推送到Q1上;routing key為”lazy.brown.fox”的消息會被推送到Q2上,routing key為"lazy.pink.rabbit”的消息也會被推送到Q2上,但同一條消息只會被推送到Q2上一次。

如果在發送消息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之綁定的話,那么這條消息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing為”lazy.orange.male.rabbit”的消息,將會被推到Q2上。

Topic類型的exchange

Topic類型的exchange是很強大的,也可以實現其它類型的exchange。

  • 當一個隊列被綁定為binding key為”#”時,它將會接收所有的消息,此時和fanout類型的exchange很像。
  • 當binding key不包含”*”和”#”時,這時候就很像direct類型的exchange。

2、最終實現

我們准備在日志系統中使用topic類型的exchange。開始我們准備routing keys使用兩個單詞:"<facility>.<severity>"。代碼和上篇博文里的差不多,EmitLogTopic.java:

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv)
                  throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //指定一個topic類型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        //這里拿到routing key
        String routingKey = getRouting(argv);
        String message = getMessage(argv);
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

        connection.close();
    }
    //...
}

ReceiveLogsTopic.java的代碼:

public class ReceiveLogsTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] argv)
                  throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //指定一個topic類型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1){
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        //綁定binding key
        for(String bindingKey : argv){
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            String routingKey = delivery.getEnvelope().getRoutingKey();

            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        }
    }
}

運行情況如下:

3、總結

在上邊的基礎上,只是豐富了routing key和binding key的寫法。

 

參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-five-java.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM