在rabbitmq中有許多交換機,不同的交換機適用於不同的場景。如下:
這么多交換機中,最常用的交換機有三種:direct、topic、fanout。我分別叫他們:“直接連接交換機”,“主題路由匹配交換機”,“無路由交換機”。以下是詳細的介紹:
Direct 交換機
這個交換機就是一個直接連接交換機,什么叫做直接連接交換機呢?
所謂“直接連接交換機”就是:Producer(生產者)投遞的消息被DirectExchange (交換機)轉發到通過routingkey綁定到具體的某個Queue(隊列),把消息放入隊列,然后Consumer從Queue中訂閱消息。
以下是代碼實例:
消費者: package com.wy.testrabbitmq.testdirect; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:56 */ public class Consumer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel = connection.createChannel(); //交換機名 String exchangeName = "testDirectExchange"; //隊列 String queueName = "test002"; //routingkey String routingkey = "testDirect"; //交換機類型 String exchangeType = "direct"; /** * 聲明一個交換機 */ channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); /** * 聲明一個隊列 * 第一個參數表示這個信道連接哪個隊列 * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的 * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列 * 第四個參數表示隊列脫離綁定時是否自動刪除 * 第五個參數表示擴展參數,可設置為null */ channel.queueDeclare(queueName, true, false, false, null); //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey) channel.queueBind(queueName, exchangeName, routingkey); //創建消費者,指定建立在那個連接上 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //設置channel // 第二個參數 是否自動簽收 // 第三個參數表示消費對象 channel.basicConsume(queueName, true, queueingConsumer); //獲取消息 //int i=0; while (true) { // 沒有消息就阻塞 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("消費端接收消息:" + message); } } } 生產者: package com.wy.testrabbitmq.testdirect; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /**Direct交換機 * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:16 */ public class Producer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel=connection.createChannel(); //聲明交換機 String exchangeName="testDirectExchange"; // routingkey String routingkey="testDirect"; // 發送消息 String message="這是一條測試direct的數據"; channel.basicPublish(exchangeName,routingkey,null,message.getBytes()); //注意:關閉連接 channel.close(); connection.close(); } }
在管控台Exchange中可以看到多了一個交換機:
點擊testDirectExchange中Bindings可以看到我們的Routingkey:testDirect和綁定的隊列test002
點擊test002可以快速進入到隊列中,點擊binding可以查看到隊列綁定的交換機。
想一下,是不是:生產者發送消息到DirectExchange交換機,交換機根據routingkey轉發消息到綁定的Queue,供消費者消費。
Topic 交換機
舉個現實生活中的栗子:
假如你想在淘寶上買一雙運動鞋,那么你是不是會在搜索框中搜“XXX運動鞋”,這個時候系統將會模糊匹配的所有符合要求的運動鞋,然后展示給你。
所謂“主題路由匹配交換機”也是這樣一個道理,但是使用時也有一定的規則。
比如:
String routingkey = “testTopic.#”;
String routingkey = “testTopic.*”;
- *表示只匹配一個詞
- #表示匹配多個詞
具體看示例代碼和演示效果
消費者: package com.wy.testrabbitmq.testtopic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:56 */ public class Consumer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel = connection.createChannel(); //交換機名 String exchangeName = "testTopicExchange"; //隊列 String queueName = "test002"; //routingkey //String routingkey = "testTopic.#"; String routingkey = "testTopic.#"; //交換機類型 String exchangeType = "topic"; /** * 聲明一個交換機 */ channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); /** * 聲明一個隊列 * 第一個參數表示這個信道連接哪個隊列 * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的 * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列 * 第四個參數表示隊列脫離綁定時是否自動刪除 * 第五個參數表示擴展參數,可設置為null */ channel.queueDeclare(queueName, true, false, false, null); //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey) channel.queueBind(queueName, exchangeName, routingkey); //創建消費者,指定建立在那個連接上 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //設置channel // 第二個參數 是否自動簽收 // 第三個參數表示消費對象 channel.basicConsume(queueName, true, queueingConsumer); //獲取消息 //int i=0; while (true) { // 沒有消息就阻塞 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("消費端接收消息:" + message); } } } 生產者: package com.wy.testrabbitmq.testtopic; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /**交換機 * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:16 */ public class Producer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel=connection.createChannel(); //聲明交換機 String exchangeName="testTopicExchange"; // routingkey String routingkey="testTopic.qqq"; String routingkey1 = "testTopic.test1"; String routingkey2 = "testTopic.test2.test"; String routingkey3 = "testTopic.test.test"; // 發送消息 String message="這是一條測試direct的數據"; channel.basicPublish(exchangeName,routingkey,null,message.getBytes()); channel.basicPublish(exchangeName,routingkey1,null,message.getBytes()); channel.basicPublish(exchangeName,routingkey2,null,message.getBytes()); channel.basicPublish(exchangeName,routingkey3,null,message.getBytes()); //注意:關閉連接 channel.close(); connection.close(); } }
#運行效果:可以看到以testTopic.開頭的所有routingkey都匹配成功了,有四條數據。
把代碼中#改成*運行效果:可以看到以testTopic.開頭的routingkey只匹配了一個詞,有兩條數據。
總結:
#號與 *號就好像我們sql里面的%與_ ,表示匹配多個和只能匹配一個。
注意:
如果你路由匹配了#又不想匹配#,換成了匹配*,請記得去解綁。
如下:查看管控台你會發現他綁定了兩個。
如果你換綁了,請記得解綁,不然出來的數據是既符合#,又符合*,就像並集一樣。
Fanout 交換機
“無路由交換機”,說白了就是,使用這個交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。
消費者: package com.wy.testrabbitmq.testfanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; /** * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:56 */ public class Consumer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel = connection.createChannel(); //交換機名 String exchangeName = "testFanoutExchange"; //隊列 String queueName = "test002"; //routingkey String routingkey = ""; //交換機類型 String exchangeType = "fanout"; /** * 聲明一個交換機 */ channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null); /** * 聲明一個隊列 * 第一個參數表示這個信道連接哪個隊列 * 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的 * 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列 * 第四個參數表示隊列脫離綁定時是否自動刪除 * 第五個參數表示擴展參數,可設置為null */ channel.queueDeclare(queueName, true, false, false, null); //建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey) channel.queueBind(queueName, exchangeName, routingkey); //創建消費者,指定建立在那個連接上 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //設置channel // 第二個參數 是否自動簽收 // 第三個參數表示消費對象 channel.basicConsume(queueName, true, queueingConsumer); //獲取消息 //int i=0; while (true) { // 沒有消息就阻塞 QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("消費端接收消息:" + message); } } } 生產者: package com.wy.testrabbitmq.testfanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /**交換機 * @author wangyan@163.com * @version 1.0 * @date 2019-06-05 10:16 */ public class Producer { public static void main(String[] args) throws Exception { //創建連接工廠 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //創建連接 Connection connection = connectionFactory.newConnection(); // 創建通道 Channel channel=connection.createChannel(); //聲明交換機 String exchangeName="testFanoutExchange"; // 發送消息 String message="這是一條測試direct的數據"; //wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系 for (int i = 1; i <= 7; i++) { channel.basicPublish(exchangeName,"wewe",null,message.getBytes()); } //注意:關閉連接 channel.close(); connection.close(); } }
示例代碼中,我在生產端寫了一個不存在的routingkey,如下:
//wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系
for (int i = 1; i <= 7; i++) {
channel.basicPublish(exchangeName,“wewe”,null,message.getBytes());
}
消費端並未指定routingkey,如下:
String routingkey = “”;
我們只在消費端綁定了Queue,運行結果證明在fanout交換機下,不使用路由並不影響我們生產者投遞信息,消費者訂閱信息。
再看一下我們的管控台:生成了交換機
我們Bindings中並沒有Routingkey
這個也充分說明了fanout交換機是“無路由交換機”。
總結:
fanout交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。
fanout交換機直接綁定了隊列,沒有經過routingkey進行匹配之類的,相對來說是所有交換機中最快的。
原文地址: 地址一