在上一篇博客《RabbitMQ入門:發布/訂閱(Publish/Subscribe)》中,我們認識了fanout類型的exchange,它是一種通過廣播方式發送消息的路由器,所有和exchange建立的綁定關系的隊列都會接收到消息。但是有一些場景只需要訂閱到一部分消息,這個時候就不能使用fanout 類型的exchange了,這個就引出來今天的“豬腳”--Direct Exchange,通過Routing Key來決定需要將消息發送到哪個或者哪些隊列中。
接下來請收看詳細內容:
- Direct Exchange(直接路由器)
- 多重綁定
- 代碼實例
一、Direct Exchange(直接路由器)
在上文中介紹exchange的時候,對direct exchange進行了簡單介紹,它是一種完全按照routing key(路由關鍵字)進行投遞的:當消息中的routing key和隊列中的binding key完全匹配時,才進行會將消息投遞到該隊列中。這里提到了一個routing key和binding key(綁定關鍵字),是什么東東?
- routing key:
在發送消息的時候,basicPublish的第二個參數就是routing key,由於上次是fanout 類型的exchange 進行廣播方式投遞,這個字段不會影響投遞結果,因此我們這里就傳入了“”,但是在direct 類型的exchange中我們就不能傳入""了,需要指定具體的關鍵字。

- binding key:
我們在前文中建立綁定關系的時候,queueBind的第三個參數就是綁定關鍵字

我們聲明direact exchange的時候使用:

二、多重綁定
多個隊列以相同的綁定鍵綁定到同一個路由器的情況,我們稱之為多重綁定。
工作模型為(P代表生產者,X代表路由器,紅色的Q代表隊列,C代表消費者):

三、代碼實例
預備知識了解完了,現在來寫個程序感受下。
- 生產者
public class LogDirectSender { // exchange名字 public static String EXCHANGE_NAME = "directExchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明direct類型的exchange channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.發送消息到指定的exchange,隊列指定為空,由exchange根據情況判斷需要發送到哪些隊列 String routingKey = "debug"; String msg = " hello rabbitmq, I am " + routingKey; channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.關閉連接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
和上次博客中生產者的區別就是黑字粗體部分:1.路由器類型改為direct 2.消息發布的時候指定了routing key
- 消費者
public class LogDirectReciver { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.創建連接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.為通道聲明direct類型的exchange channel.exchangeDeclare(LogDirectSender.EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 3.創建隨機名字的隊列 String queueName = channel.queueDeclare().getQueue(); // 4.建立exchange和隊列的綁定關系 String[] bindingKeys = { "error", "info", "debug" }; // String[] bindingKeys = { "error" }; for (int i = 0; i < bindingKeys.length; i++) { channel.queueBind(queueName, LogDirectSender.EXCHANGE_NAME, bindingKeys[i]); System.out.println(" **** LogDirectReciver keep alive ,waiting for " + bindingKeys[i]); } // 5.通過回調生成消費者並進行監聽 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 獲取消息內容然后處理 String msg = new String(body, "UTF-8"); System.out.println("*********** LogDirectReciver" + " get message :[" + msg + "]"); } }; // 6.消費消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
和上次博客中消費者的區別就是黑字粗體部分:1.路由器類型改為direct 2.建立綁定關系的時候指定了binding key
- 執行消費者,控制台log打印如下:
**** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug
這個消費者我們視為消費者1,它會接收error,info,debug三個關鍵字的消息。
- 將String[] bindingKeys = { "error", "info", "debug" };改為String[] bindingKeys = { "error" };,然后再運行一次消費者。控制台log打印如下:
**** LogDirectReciver keep alive ,waiting for error這個消費者我們視為消費者2,它只會接收error 關鍵字的消息。
- 執行生產者,然后將String routingKey = "debug";的值分別改為“info"和"error",然后分別執行,這樣一共執行了三次生產者
第一次執行: product send a msg: hello rabbitmq, I am debug 第二次執行: product send a msg: hello rabbitmq, I am info 第三次執行: product send a msg: hello rabbitmq, I am error - 再次查看兩個消費者的控制台log:
消費者1: **** LogDirectReciver keep alive ,waiting for error **** LogDirectReciver keep alive ,waiting for info **** LogDirectReciver keep alive ,waiting for debug *********** LogDirectReciver get message :[ hello rabbitmq, I am debug] *********** LogDirectReciver get message :[ hello rabbitmq, I am info] *********** LogDirectReciver get message :[ hello rabbitmq, I am error] 消費者2: **** LogDirectReciver keep alive ,waiting for error *********** LogDirectReciver get message :[ hello rabbitmq, I am error]
- 查看RabbitMQ管理頁面
exchanges標簽頁里面多了個direct類型的路由器。進入詳細頁面:

有4個綁定關系,其中三個的隊列是同一個。切換到Queues標簽頁:

有兩個臨時隊列。
-
如果關掉消費者1和消費者2,會發現隊列自動刪除了,綁定關系也不存在了。
