參考:http://www.rabbitmq.com/tutorials/tutorial-five-java.html
源碼:https://github.com/zuzhaoyue/JavaDemo
主題
(使用Java客戶端)
先決條件
本教程假定RabbitMQ 在標准端口(5672)上的本地主機上安裝並運行。如果您使用不同的主機,端口或證書,則連接設置需要進行調整。
在教程四中,我們改進了日志記錄系統。我們沒有使用只有虛擬廣播的fanout交換機,而是使用了direct交換機,並獲得了選擇性接收日志的可能性。
盡管使用直接交換改進了我們的系統,但它仍然有局限性 - 它不能根據多個規則進行路由。
在我們的日志系統中,我們可能不僅需要根據嚴重等級來訂閱日志,還要根據發布日志的來源進行訂閱。您可能從syslog unix工具知道這個概念,該 工具根據嚴重性(info/warn/error...)和工具(auth / cron / kern ...)來路由日志。
這會給我們很大的靈活性 - 我們可能想聽取來自'cron'的嚴重錯誤,而且還聽取來自'kern'的所有日志。
為了在我們的日志系統中實現這一點,我們需要了解更復雜的topic交換。
Topic exchange
發送到topic exchange的消息不是一個隨意的 routing_key - 它必須是由點分隔的單詞列表。單詞可以是任何東西,但通常它們指定了與該消息相關的一些功能。一些有效的路由鍵例子如下:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。只要您願意,路由鍵中可以有多少個字,最多255個字節。
綁定鍵也必須是相同的形式。topic交換背后的邏輯 類似於direct exchange - 使用特定路由鍵發送的消息將被傳遞到與匹配綁定鍵綁定的所有隊列。綁定鍵有兩個重要的特殊用法:
- *(星號)可以代替一個單詞。
- #(散列)可以替代零個或多個單詞。
在一個例子中解釋這個很簡單:

在這個例子中,我們將發送所有描述動物的消息。消息將使用由三個字(兩個點)組成的路由鍵發送。路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個種類:“ <speed>.<color>.<species>”。
我們創建了三個綁定:Q1綁定了綁定鍵“ * .orange。* ”,Q2 綁定了“ *。*。rabbit ”和“ lazy。# ”。
這些綁定可以概括為:
- Q1對所有的橙色動物都感興趣。
- Q2希望聽到關於兔子的一切,以及關於懶惰的一切。
將路由鍵設置為“ quick.orange.rabbit ”的消息將傳遞到兩個隊列。消息“ lazy.orange.elephant ”也會傳送到他們兩個。另一方面,“ quick.orange.fox ”只會進入第一個隊列,而“ lazy.brown.fox ”只會進入第二個隊列。“ lazy.pink.rabbit ”只會傳遞到第二個隊列一次,即使第二個隊列匹配了兩個綁定。“ quick.brown.fox ”不匹配任何綁定,因此將被丟棄。
如果我們違反我們的規則並發送帶有一個或四個單詞的消息,如“ orange ”或“ quick.orange.male.rabbit ” ,這些消息將不匹配任何綁定,於是會被丟失。
另一方面,“ lazy.orange.male.rabbit ”即使有四個單詞,也會匹配最后一個綁定,並將傳遞到第二個隊列。
topic exchange
topic exchage功能強大,可以像其他exchange一樣行事。
當使用“ # ”(hash)綁定鍵綁定隊列時,它將接收所有消息,而不管路由密鑰如何 - 就像在fanout exchange中一樣。
當在綁定中沒有使用特殊字符“ * ”(星號)和“ # ”(hash)時,topic將像direct一樣。
把以上放在一起
我們將在我們的日志系統中使用topic。我們首先假定日志的路由鍵有兩個詞:“ <facility>。<severity> ”。
生產者EmitLogTopic.java代碼如下:
//package rmq.topics; /** * Created by zuzhaoyue on 18/5/17. */ import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { System.out.println("參數是:" + argv.toString()); Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) {} } } } //若沒有參數 ,則返回anoymous.info //若有參數 ,則返回參數第一個 private static String getRouting(String[] strings){ if (strings.length < 1) return "anonymous.info"; return strings[0]; } //若參數個數小於2,則返回hello world,否則返回joinstring()相應的值 private static String getMessage(String[] strings){ if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); } //返回輸入的數組中從startindex開始的值,這些值以delimeter為分隔符。 private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0 ) return ""; if (length < startIndex ) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
消費者ReceiveLogsTopic.java代碼如下:
//package rmq.topics; /** * Created by zuzhaoyue on 18/5/17. */ import com.rabbitmq.client.*; import java.io.IOException; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'"); } }; channel.basicConsume(queueName, true, consumer); } }
測試
1.編譯
javac -cp /data/amqp-client-4.2.0.jar EmitLogTopic.java ReceiveLogsTopic.java
2.執行
1)啟動消費者
打開三個窗口,分別輸入以下命令:
第一個窗口輸入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic ""(表示接收所有的消息)
第二個窗口輸入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "*.critical"(表示接收后綴為critical的消息)
第三個窗口輸入java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. ReceiveLogsTopic "kern.*" "*.critical"(表示接收前綴為kern和后綴為critical的消息)
2)啟動生產者
依次輸入以下命令
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "aa.critical"
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "kern.0"
java -cp /data/amqp-client-4.2.0.jar:/data/slf4j-api-1.7.21.jar:. EmitLogTopic "a"
觀察消費者的打印情況,發現已經按照不同的規則進行了接收:
調試成功~