目錄
RabbitMQ學習總結 第一篇:理論篇
RabbitMQ學習總結 第二篇:快速入門HelloWorld
RabbitMQ學習總結 第三篇:工作隊列Work Queue
RabbitMQ學習總結 第四篇:發布/訂閱 Publish/Subscribe
RabbitMQ學習總結 第六篇:Topic類型的exchange
RabbitMQ學習總結 第七篇:RCP(遠程過程調用協議)
上一篇中我們構建了一個簡單的日志系統,我們可以把日志消息廣播給多個接受者。
這篇中我們將來添加一個特性只接收部分消息。例如我只將一些錯誤log存到文件中,把所有的log都打印到控制台里。
1、綁定(Bindings)
在上篇博文中,我們已經創建了一個binding,代碼如下:
channel.queueBind(queueName, EXCHANGE_NAME, "");
一個binding就是exchange和Queue之間的一個關系。可以簡單的理解為:這個Queue對其相對於的exchange的消息感興趣(原文是the queue is interested in messages from this exchange,能理解什么意思,但總覺得怪怪的)。
Binding可以使用一個已經存在的routingKey參數。為了避免和basic_publish參數混淆,我們稱之為binding key。下邊就是我們怎么用key來創建一個binding:
channel.queueBind(queueName, EXCHANGE_NAME, "black");
binding key的意義有時候取決於exchange的類型。對於Fanout類型的exchange,會忽略binding key。
2、Direct類型的exchange
我們上篇博文中的日志系統會把所有的log消息廣播給所有的消費者。我們想擴展來根據他們的日志級別來過濾log消息。例如:我們只想把error級別的日志寫到磁盤文件中,而其它級別的日志消息則過濾掉。
我們之前使用的fanout類型的exchange,但這樣就不會有太多的靈活性。
在這里我們將要使用direct類型的exchange。Direct類型exchange的路由算法是很簡單的:要想一個消息能到達這個隊列,需要binding key和routing key正好能匹配得上。
為了說明這個道理,可以看看下邊的描述:
在這樣的結構中,我們可以看到direct類型的exchange X,有兩個queue綁定到它。第一個queue是以orange為binding key綁定到exchange X上的,第二個queue是由兩個binding key(black和green)綁定到exchange X的。
在這樣的設置中,一條消息被推送到exchange,如果使用的routing key是orange,那么消息就會被路由到Q1中;如果使用的routing key是black或green,那么該消息將會被路由到Q2中。其它的消息都將會被丟棄掉。
3、多重綁定(Multiple bindings)
用同一個binding來把多個queue綁定到同一個exchange也是可行的。例如在之前例子的基礎上,在X和Q1之間添加binding key名字為black,這樣的話,這里的direct類型的exchange就和fanout類型的一樣了,可以把消息推送給所有的queue。帶有routing key為black的消息將會被推送到Q1和Q2中。
4、發送日志(Emitting logs)
我們將會使用這種模型,不使用fanout類型的exchange,而是使用direct類型的。我們使用日志級別做為routing key,接收端根據設置的日志級別做為binding key來接收消息。首先來看看發射日志:
如之前一樣,首先來創建一個exchange:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
然后准備發送消息;
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
這里的”severity”可以是”info”、“warning”、”error”等。
5、訂閱(Subscribing)
這里接收消息和上篇博文中的一樣,只是有一點例外:我們將會為每一個感興趣的日志級別進行綁定。
String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
6、最終實現
- EmitLogDirect.java的代碼:
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明direct類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//拿到日志級別 String severity = getSeverity(argv); //拿到日志消息 String message = getMessage(argv); //指定routing key,發送消息 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close(); connection.close(); } //.. }
- ReceiveLogsDirect.java的代碼:
public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明direct類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } //綁定我們需要接收的日志級別 for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
- 運行三個日志接收器:
接收error和info級別的日志:
接收error級別的日志:
接收info級別的日志:
- 運行兩個日志發生器:
產生error級別的日志:
、
產生info級別的日志:
- 觀察接收器端的變化:
接收error級別的接收器,只接收error級別的日志:
接收info級別的接收器,只接收info級別的日志:
Error和info級別日志都接收的接收器,info和error級別的日志都接收:
7、總結:
要記住生產者端的routing key,那么在消費者端設置binding key和之前的routing key一樣,就可以用direct類型的exchange了,以此來獲取到自己需要的消息。
參考鏈接:http://www.rabbitmq.com/tutorials/tutorial-four-java.html