在rabbitmq中有許多交換機,不同的交換機適用於不同的場景。如下:

這么多交換機中,最常用的交換機有三種:direct、topic、fanout。我分別叫他們:“直接連接交換機”,“主題路由匹配交換機”,“無路由交換機”。以下是詳細的介紹:
Direct 交換機
這個交換機就是一個直接連接交換機,什么叫做直接連接交換機呢?
所謂“直接連接交換機”就是:Producer(生產者)投遞的消息被DirectExchange (交換機)轉發到通過routingkey綁定到具體的某個Queue(隊列),把消息放入隊列,然后Consumer從Queue中訂閱消息。
以下是代碼實例:
消費者:
package com.wy.testrabbitmq.testdirect;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:56
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
//交換機名
String exchangeName = "testDirectExchange";
//隊列
String queueName = "test002";
//routingkey
String routingkey = "testDirect";
//交換機類型
String exchangeType = "direct";
/**
* 聲明一個交換機
*/
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
/**
* 聲明一個隊列
* 第一個參數表示這個信道連接哪個隊列
* 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
* 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
* 第四個參數表示隊列脫離綁定時是否自動刪除
* 第五個參數表示擴展參數,可設置為null
*/
channel.queueDeclare(queueName, true, false, false, null);
//建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
channel.queueBind(queueName, exchangeName, routingkey);
//創建消費者,指定建立在那個連接上
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//設置channel
// 第二個參數 是否自動簽收
// 第三個參數表示消費對象
channel.basicConsume(queueName, true, queueingConsumer);
//獲取消息
//int i=0;
while (true) {
// 沒有消息就阻塞
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消費端接收消息:" + message);
}
}
}
生產者:
package com.wy.testrabbitmq.testdirect;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**Direct交換機
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:16
*/
public class Producer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel=connection.createChannel();
//聲明交換機
String exchangeName="testDirectExchange";
// routingkey
String routingkey="testDirect";
// 發送消息
String message="這是一條測試direct的數據";
channel.basicPublish(exchangeName,routingkey,null,message.getBytes());
//注意:關閉連接
channel.close();
connection.close();
}
}

在管控台Exchange中可以看到多了一個交換機:

點擊testDirectExchange中Bindings可以看到我們的Routingkey:testDirect和綁定的隊列test002

點擊test002可以快速進入到隊列中,點擊binding可以查看到隊列綁定的交換機。

想一下,是不是:生產者發送消息到DirectExchange交換機,交換機根據routingkey轉發消息到綁定的Queue,供消費者消費。
Topic 交換機
舉個現實生活中的栗子:
假如你想在淘寶上買一雙運動鞋,那么你是不是會在搜索框中搜“XXX運動鞋”,這個時候系統將會模糊匹配的所有符合要求的運動鞋,然后展示給你。
所謂“主題路由匹配交換機”也是這樣一個道理,但是使用時也有一定的規則。
比如:
String routingkey = “testTopic.#”;
String routingkey = “testTopic.*”;
- *表示只匹配一個詞
- #表示匹配多個詞
具體看示例代碼和演示效果
消費者:
package com.wy.testrabbitmq.testtopic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:56
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
//交換機名
String exchangeName = "testTopicExchange";
//隊列
String queueName = "test002";
//routingkey
//String routingkey = "testTopic.#";
String routingkey = "testTopic.#";
//交換機類型
String exchangeType = "topic";
/**
* 聲明一個交換機
*/
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
/**
* 聲明一個隊列
* 第一個參數表示這個信道連接哪個隊列
* 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
* 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
* 第四個參數表示隊列脫離綁定時是否自動刪除
* 第五個參數表示擴展參數,可設置為null
*/
channel.queueDeclare(queueName, true, false, false, null);
//建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
channel.queueBind(queueName, exchangeName, routingkey);
//創建消費者,指定建立在那個連接上
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//設置channel
// 第二個參數 是否自動簽收
// 第三個參數表示消費對象
channel.basicConsume(queueName, true, queueingConsumer);
//獲取消息
//int i=0;
while (true) {
// 沒有消息就阻塞
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消費端接收消息:" + message);
}
}
}
生產者:
package com.wy.testrabbitmq.testtopic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**交換機
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:16
*/
public class Producer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel=connection.createChannel();
//聲明交換機
String exchangeName="testTopicExchange";
// routingkey
String routingkey="testTopic.qqq";
String routingkey1 = "testTopic.test1";
String routingkey2 = "testTopic.test2.test";
String routingkey3 = "testTopic.test.test";
// 發送消息
String message="這是一條測試direct的數據";
channel.basicPublish(exchangeName,routingkey,null,message.getBytes());
channel.basicPublish(exchangeName,routingkey1,null,message.getBytes());
channel.basicPublish(exchangeName,routingkey2,null,message.getBytes());
channel.basicPublish(exchangeName,routingkey3,null,message.getBytes());
//注意:關閉連接
channel.close();
connection.close();
}
}
#運行效果:可以看到以testTopic.開頭的所有routingkey都匹配成功了,有四條數據。
把代碼中#改成*運行效果:可以看到以testTopic.開頭的routingkey只匹配了一個詞,有兩條數據。
總結:
#號與 *號就好像我們sql里面的%與_ ,表示匹配多個和只能匹配一個。
注意:
如果你路由匹配了#又不想匹配#,換成了匹配*,請記得去解綁。
如下:查看管控台你會發現他綁定了兩個。

如果你換綁了,請記得解綁,不然出來的數據是既符合#,又符合*,就像並集一樣。
Fanout 交換機
“無路由交換機”,說白了就是,使用這個交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。
消費者:
package com.wy.testrabbitmq.testfanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:56
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel = connection.createChannel();
//交換機名
String exchangeName = "testFanoutExchange";
//隊列
String queueName = "test002";
//routingkey
String routingkey = "";
//交換機類型
String exchangeType = "fanout";
/**
* 聲明一個交換機
*/
channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
/**
* 聲明一個隊列
* 第一個參數表示這個信道連接哪個隊列
* 第二個參數表示是否持久化,當這個參數設置為true,即使你的服務器關了從新開數據還是存在的
* 第三個參數表示是否獨占隊列,也就是所只能自己去監聽這個隊列
* 第四個參數表示隊列脫離綁定時是否自動刪除
* 第五個參數表示擴展參數,可設置為null
*/
channel.queueDeclare(queueName, true, false, false, null);
//建立綁定關系(哪個隊列,哪個交換機,綁定哪個routingkey)
channel.queueBind(queueName, exchangeName, routingkey);
//創建消費者,指定建立在那個連接上
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//設置channel
// 第二個參數 是否自動簽收
// 第三個參數表示消費對象
channel.basicConsume(queueName, true, queueingConsumer);
//獲取消息
//int i=0;
while (true) {
// 沒有消息就阻塞
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消費端接收消息:" + message);
}
}
}
生產者:
package com.wy.testrabbitmq.testfanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**交換機
* @author wangyan@163.com
* @version 1.0
* @date 2019-06-05 10:16
*/
public class Producer {
public static void main(String[] args) throws Exception {
//創建連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
//創建連接
Connection connection = connectionFactory.newConnection();
// 創建通道
Channel channel=connection.createChannel();
//聲明交換機
String exchangeName="testFanoutExchange";
// 發送消息
String message="這是一條測試direct的數據";
//wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系
for (int i = 1; i <= 7; i++) {
channel.basicPublish(exchangeName,"wewe",null,message.getBytes());
}
//注意:關閉連接
channel.close();
connection.close();
}
}
示例代碼中,我在生產端寫了一個不存在的routingkey,如下:
//wewe是隨便寫的routingkey,這里為了驗證fanout交換機和路由沒關系
for (int i = 1; i <= 7; i++) {
channel.basicPublish(exchangeName,“wewe”,null,message.getBytes());
}
消費端並未指定routingkey,如下:
String routingkey = “”;
我們只在消費端綁定了Queue,運行結果證明在fanout交換機下,不使用路由並不影響我們生產者投遞信息,消費者訂閱信息。
再看一下我們的管控台:生成了交換機
我們Bindings中並沒有Routingkey
這個也充分說明了fanout交換機是“無路由交換機”。
總結:
fanout交換機不需要routingkey綁定,和路由沒有關系,它是直接綁定到隊列的。
fanout交換機直接綁定了隊列,沒有經過routingkey進行匹配之類的,相對來說是所有交換機中最快的。
原文地址: 地址一
