生產者和消費者,具有相同的交換機名稱(Exchange)、交換機類型和相同的密匙(routingKey),那么消費者即可成功獲取到消息。
(PS:相對比只要交換機名稱即可接收到消息的廣播模式(fanout),direct模式在其基礎上,多加了一層密碼限制(routingKey)。)
一、什么是direct(直接交換模式)
RabbitMQ消息模型的核心思想(core idea): 生產者會把消息發送給RabbitMQ的交換中心(Exchange),Exchange的一側是生產者,另一側則是一個或多個隊列,由Exchange決定一條消息的生命周期–發送給某些隊列,或者直接丟棄掉。
二、代碼域
1. 生產者【DirectBoss】
package com.iyungu.phantaci.test.rabbitmq; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.apache.log4j.Logger; import java.io.IOException; import java.util.concurrent.TimeoutException; //消息生產者 public class DirectBoss { private static final Logger logger = Logger.getLogger(DirectBoss.class); public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); try { factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //聲明交換機(名稱和類型) channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT); String message = "2018年8月8日14:03:48"; //消息發布(其中"directLogs"為交換機名稱,"jay"為routingKey) channel.basicPublish("directLogs","jay",null,message.getBytes()); logger.info("********Message********:發送成功"); channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } }
2. 消費者【DirectWorker】
package com.iyungu.phantaci.test.rabbitmq; import com.rabbitmq.client.*; import org.apache.log4j.Logger; import java.io.IOException; import java.util.concurrent.TimeoutException; //消息消費者 public class DirectWorker { private static final Logger logger = Logger.getLogger(DirectWorker.class); public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //交換機聲明(參數為:交換機名稱;交換機類型) channel.exchangeDeclare("directLogs", BuiltinExchangeType.DIRECT); //獲取一個臨時隊列 String queueName = channel.queueDeclare().getQueue(); //隊列與交換機綁定(參數為:隊列名稱;交換機名稱;密匙-routingKey) channel.queueBind(queueName,"directLogs","jay"); logger.info("********Waiting for messages********"); //這里重寫了DefaultConsumer的handleDelivery方法,因為發送的時候對消息進行了getByte(),在這里要重新組裝成String Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); String message = new String(body,"UTF-8"); logger.info("received:" + message); } }; //聲明隊列中被消費掉的消息(參數為:隊列名稱;消息是否自動確認;consumer主體) channel.basicConsume(queueName,true,consumer); //這里不能關閉連接,調用了消費方法后,消費者會一直連接着rabbitMQ等待消費 } catch (IOException |TimeoutException e) { e.printStackTrace(); } } }
三、direct模式效果
1.
1)先運行一個消費者,即【DirectWorker】
2)把消費者【DirectWorker】里的routingKey進行修改
把channel.queueBind(queueName,"directLogs","jay");修改為channel.queueBind(queueName,"directLogs","jjlin");。修改完畢后,再運行該消費者
RabbitMQ網頁控制台如下,可看到兩個消費者隊列
2. 再運行一個生產者,即【DirectBoss】
控制台效果圖如下,一條消息發布后,和交換機routingKey一致的消費者收到了消息,不一致的無消息;
消息是由生產者發送給交換機,所以要以生產者發布消息時的routingKey為准。此時,生產者里的routingKey為:”jay”
消費者的routingKey為:”jay“,與生產者一致。消息接收成功
消費者的routingKey為:”jjlin“,與生產者不一致。消息接收失敗
四、多路由(routingKey)接收
通過給消費者綁定多個路由(routingKey),可以使該消費者同時接收多個路由獲取的消息。
如給消費者代碼【DirectWorker】同時綁定兩個routingKey,其余不變
channel.queueBind(queueName, "directLogs", "jjlin"); //routingKey為jjlin channel.queueBind(queueName, "directLogs", "jay"); //routingKey為jay
1.綁定兩個routingKey后,運行一個消費者【DirectWorker】
2.運行一個生產者【DirectBoss】,代碼默認其routingKey為”jay”
3.修改生產者routingKey為”jjlin”,然后再運行生產者【DirectBoss】
把生產者代碼里的routingKey進行修改,即channel.basicPublish("directLogs","jay",null,message.getBytes());改為channel.basicPublish("directLogs","jjlin",null,message.getBytes());,修改完畢后再運行生產者。
4.局限性
通過多路由綁定的例子,可以體會到direct模式相對比fanout模式,可以選擇性的接收消息;但局限是面對更多、更復雜的路由匹配時,仍舊會力不從心,這時可以使用更全面的topic主題模式。
原文鏈接:https://blog.csdn.net/fakerswe/article/details/81508963