http://blog.csdn.net/zhu_tianwei/article/details/40887775
參考:http://blog.csdn.NET/lmj623565791/article/details/37706355
direct類型的消息通過綁定鍵轉發到隊列,但是存在一些局限性:它不能夠基於多重條件進行路由選擇,有可能希望不僅根據日志的級別而且想根據日志的來源進行訂閱,這就需要主題類型的轉發器來實現。
發往主題類型的轉發器的消息不能隨意的設置選擇鍵(routing_key),必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。一些合法的選擇鍵的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定義任何數量的標識符,上限為255個字節。
綁定鍵和選擇鍵的形式一樣。主題類型的轉發器背后的邏輯和直接類型的轉發器很類似:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。需要注意的是:關於綁定鍵有兩種特殊的情況。
*可以匹配一個標識符。
#可以匹配0個或多個標識符。
1.發送日志消息SendLogTopic,發送4個消息綁定不同的綁定鍵, "kernal.info", "cron.warning", "auth.info", "kernel.critical"
- package cn.slimsmart.rabbitmq.demo.topic;
- import java.util.UUID;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- //發送消息端
- public class SendLogTopic {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws Exception {
- // 創建連接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.101.174");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(AMQP.PROTOCOL.PORT);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明轉發器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- //定義綁定鍵
- String[] routing_keys = new String[] { "kernal.info", "cron.warning",
- "auth.info", "kernel.critical" };
- for (String routing_key : routing_keys)
- {
- //發送4條不同綁定鍵的消息
- String msg = UUID.randomUUID().toString();
- channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
- .getBytes());
- System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
- }
- channel.close();
- connection.close();
- }
- }
2.定義接收kernel.*消息的消費者
- package cn.slimsmart.rabbitmq.demo.topic;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- //接收kernel.*消息
- public class ReceiveLogsTopicForKernel {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws Exception {
- // 創建連接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.101.174");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(AMQP.PROTOCOL.PORT);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明轉發器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 隨機生成一個隊列
- String queueName = channel.queueDeclare().getQueue();
- //接收所有與kernel相關的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");
- System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received routingKey = " + routingKey
- + ",msg = " + message + ".");
- }
- }
- }
3.接收*.critical消息消費者
- package cn.slimsmart.rabbitmq.demo.topic;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- //接收*.critical消息
- public class ReceiveLogsTopicForCritical {
- private static final String EXCHANGE_NAME = "topic_logs";
- public static void main(String[] args) throws Exception {
- // 創建連接和頻道
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.101.174");
- factory.setUsername("admin");
- factory.setPassword("admin");
- factory.setPort(AMQP.PROTOCOL.PORT);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- // 聲明轉發器
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 隨機生成一個隊列
- String queueName = channel.queueDeclare().getQueue();
- // 接收所有與kernel相關的消息
- channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");
- System.out
- .println(" [*] Waiting for critical messages. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true)
- {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received routingKey = " + routingKey
- + ",msg = " + message + ".");
- }
- }
- }
啟動2個消費者,再啟動發送4類消息生產者。觀察接收到的消息,都收到對應的消息。可以看出使用topic類型的轉發器,成功實現了多重條件選擇的訂閱。