(轉)RabbitMQ學習之主題topic(java)


http://blog.csdn.net/zhu_tianwei/article/details/40887775

參考:http://blog.csdn.NET/lmj623565791/article/details/37706355

direct類型的消息通過綁定鍵轉發到隊列,但是存在一些局限性:它不能夠基於多重條件進行路由選擇,有可能希望不僅根據日志的級別而且想根據日志的來源進行訂閱,這就需要主題類型的轉發器來實現。

發往主題類型的轉發器的消息不能隨意的設置選擇鍵(routing_key),必須是由點隔開的一系列的標識符組成。標識符可以是任何東西,但是一般都與消息的某些特性相關。一些合法的選擇鍵的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定義任何數量的標識符,上限為255個字節。
綁定鍵和選擇鍵的形式一樣。主題類型的轉發器背后的邏輯和直接類型的轉發器很類似:一個附帶特殊的選擇鍵將會被轉發到綁定鍵與之匹配的隊列中。需要注意的是:關於綁定鍵有兩種特殊的情況。
*可以匹配一個標識符。
#可以匹配0個或多個標識符。

1.發送日志消息SendLogTopic,發送4個消息綁定不同的綁定鍵, "kernal.info", "cron.warning",  "auth.info", "kernel.critical" 

 

[java]  view plain  copy
 
 print?
  1. package cn.slimsmart.rabbitmq.demo.topic;  
  2.   
  3. import java.util.UUID;  
  4.   
  5. import com.rabbitmq.client.AMQP;  
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9.   
  10. //發送消息端  
  11. public class SendLogTopic {  
  12.     private static final String EXCHANGE_NAME = "topic_logs";  
  13.     public static void main(String[] args) throws Exception {  
  14.         // 創建連接和頻道    
  15.         ConnectionFactory factory = new ConnectionFactory();    
  16.         factory.setHost("192.168.101.174");  
  17.     factory.setUsername("admin");  
  18.         factory.setPassword("admin");  
  19.         factory.setPort(AMQP.PROTOCOL.PORT);  
  20.         Connection connection = factory.newConnection();    
  21.         Channel channel = connection.createChannel();    
  22.         // 聲明轉發器  
  23.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
  24.         //定義綁定鍵     
  25.         String[] routing_keys = new String[] { "kernal.info", "cron.warning",    
  26.                 "auth.info", "kernel.critical" };    
  27.         for (String routing_key : routing_keys)    
  28.         {     
  29.             //發送4條不同綁定鍵的消息  
  30.             String msg = UUID.randomUUID().toString();    
  31.             channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg    
  32.                     .getBytes());    
  33.             System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");    
  34.         }    
  35.     
  36.         channel.close();    
  37.         connection.close();    
  38.     }  
  39.   
  40. }  


2.定義接收kernel.*消息的消費者

 

 

[java]  view plain  copy
 
 print?
  1. package cn.slimsmart.rabbitmq.demo.topic;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8.   
  9. //接收kernel.*消息  
  10. public class ReceiveLogsTopicForKernel {  
  11.     private static final String EXCHANGE_NAME = "topic_logs";    
  12.     public static void main(String[] args) throws Exception {  
  13.         // 創建連接和頻道    
  14.         ConnectionFactory factory = new ConnectionFactory();    
  15.         factory.setHost("192.168.101.174");  
  16.     factory.setUsername("admin");  
  17.        factory.setPassword("admin");  
  18.     factory.setPort(AMQP.PROTOCOL.PORT);  
  19.         Connection connection = factory.newConnection();    
  20.         Channel channel = connection.createChannel();    
  21.         // 聲明轉發器    
  22.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
  23.         // 隨機生成一個隊列    
  24.         String queueName = channel.queueDeclare().getQueue();    
  25.             
  26.         //接收所有與kernel相關的消息    
  27.         channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");    
  28.     
  29.         System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");    
  30.     
  31.         QueueingConsumer consumer = new QueueingConsumer(channel);    
  32.         channel.basicConsume(queueName, true, consumer);    
  33.     
  34.         while (true)    
  35.         {    
  36.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
  37.             String message = new String(delivery.getBody());    
  38.             String routingKey = delivery.getEnvelope().getRoutingKey();    
  39.     
  40.             System.out.println(" [x] Received routingKey = " + routingKey    
  41.                     + ",msg = " + message + ".");    
  42.         }    
  43.     }  
  44. }  

3.接收*.critical消息消費者

 

 

[java]  view plain  copy
 
 print?
  1. package cn.slimsmart.rabbitmq.demo.topic;  
  2.   
  3. import com.rabbitmq.client.AMQP;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8.   
  9. //接收*.critical消息  
  10. public class ReceiveLogsTopicForCritical {  
  11.       
  12.      private static final String EXCHANGE_NAME = "topic_logs";    
  13.   
  14.     public static void main(String[] args) throws Exception {  
  15.         // 創建連接和頻道    
  16.         ConnectionFactory factory = new ConnectionFactory();    
  17.         factory.setHost("192.168.101.174");  
  18.     factory.setUsername("admin");  
  19.         factory.setPassword("admin");  
  20.     factory.setPort(AMQP.PROTOCOL.PORT);  
  21.         Connection connection = factory.newConnection();    
  22.         Channel channel = connection.createChannel();    
  23.         // 聲明轉發器    
  24.         channel.exchangeDeclare(EXCHANGE_NAME, "topic");    
  25.         // 隨機生成一個隊列    
  26.         String queueName = channel.queueDeclare().getQueue();    
  27.         // 接收所有與kernel相關的消息    
  28.         channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");    
  29.     
  30.         System.out    
  31.                 .println(" [*] Waiting for critical messages. To exit press CTRL+C");    
  32.     
  33.         QueueingConsumer consumer = new QueueingConsumer(channel);    
  34.         channel.basicConsume(queueName, true, consumer);    
  35.     
  36.         while (true)    
  37.         {    
  38.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();    
  39.             String message = new String(delivery.getBody());    
  40.             String routingKey = delivery.getEnvelope().getRoutingKey();    
  41.     
  42.             System.out.println(" [x] Received routingKey = " + routingKey    
  43.                     + ",msg = " + message + ".");    
  44.         }    
  45.     }  
  46.   
  47. }  


啟動2個消費者,再啟動發送4類消息生產者。觀察接收到的消息,都收到對應的消息。可以看出使用topic類型的轉發器,成功實現了多重條件選擇的訂閱。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM