目錄
RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld
RabbitMQ學習總結 第三篇:工作隊列Work Queue
RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe
RabbitMQ學習總結 第六篇:Topic類型的exchange
RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)
在上篇博文中,我們對之前的日志系統做了稍許的完善。沒有使用fanout類型的exchange來廣播,而是使用了direct類型的exchange來選擇性的接收日志消息。
盡管使用了direct類型的exchange對日志系統有所提升,但還是有一些限制(消息不能夠基於多重因素來路由)。
在我們的日志系統中,希望不僅僅能夠根據日志級別來訂閱,還可以根據指定的routing key來訂閱。你應該可以理解的,就如unix的系統日志工具,日志消息路由規則不僅僅基於日志級別(info/warn/crit…),還可以基於設備(auth/cron/kern...)。
這樣大大的提高的靈活性,例如我們可以只監聽kern推送出來的error級別的日志。
為了在我們的日志記錄系統中實現這樣的功能,我們需要了解更多關於topic類型的exchange。
1、Topic類型的exchange
消息發送到topic類型的exchange上時不能隨意指定routing_key(一定是指由一系列由點號連接單詞的字符串,單詞可以是任意的,但一般都會與消息或多或少的有些關聯)。Routing key的長度不能超過255個字節。
Binding key也一定要是同樣的方式。Topic類型的exchange就像一個直接的交換:一個由生產者指定了確定routing key的消息將會被推送給所有Binding key能與之匹配的消費者。然而這種綁定有兩種特殊的情況:
- *(星號):可以(只能)匹配一個單詞
- #(井號):可以匹配多個單詞(或者零個)
下邊來舉個例子:
在這個例子中,我們將會發送一些描述動物的消息。Routing key的第一個單詞是描述速度的,第二個單詞是描述顏色的,第三個是描述物種的:“<speed>.<colour>.<species>”。
這里我們創建三個Binding:Binding key為”*.orange.*”的Q1,和binding key為”*.*.rabbit”和”lazy.#”的Q2。
這些binding可以總結為:
- Q1對所有橘色的(orange)的動物感興趣;
- Q2希望能拿到所有兔子的(rabbit)信息,還有比較懶惰的(lazy.#)動物信息。
一條以” quick.orange.rabbit”為routing key的消息將會推送到Q1和Q2兩個queue上,routing key為“lazy.orange.elephant”的消息同樣會被推送到Q1和Q2上。但如果routing key為”quick.orange.fox”的話,消息只會被推送到Q1上;routing key為”lazy.brown.fox”的消息會被推送到Q2上,routing key為"lazy.pink.rabbit”的消息也會被推送到Q2上,但同一條消息只會被推送到Q2上一次。
如果在發送消息時所指定的exchange和routing key在消費者端沒有對應的exchange和binding key與之綁定的話,那么這條消息將會被丟棄掉。例如:"orange"和"quick.orange.male.rabbit"。但是routing為”lazy.orange.male.rabbit”的消息,將會被推到Q2上。
Topic類型的exchange:
Topic類型的exchange是很強大的,也可以實現其它類型的exchange。
- 當一個隊列被綁定為binding key為”#”時,它將會接收所有的消息,此時和fanout類型的exchange很像。
- 當binding key不包含”*”和”#”時,這時候就很像direct類型的exchange。
2、最終實現
我們准備在日志系統中使用topic類型的exchange。開始我們准備routing keys使用兩個單詞:"<facility>.<severity>"。代碼和上篇博文里的差不多,EmitLogTopic.java:
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //指定一個topic類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //這里拿到routing key String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... }
ReceiveLogsTopic.java的代碼:
public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //指定一個topic類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } //綁定binding key for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
運行情況如下:
3、總結
在上邊的基礎上,只是豐富了routing key和binding key的寫法。
參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-five-java.html