前言
RabbitMQ六種隊列模式-簡單隊列
RabbitMQ六種隊列模式-工作隊列
RabbitMQ六種隊列模式-發布訂閱
RabbitMQ六種隊列模式-路由模式 [本文]
RabbitMQ六種隊列模式-主題模式
本文帶大家了解 RabbitMQ 隊列模式中的路由模式。
其實只要看過上篇發布模式后,相信路由模式上手就非常 easy 了,唯一差距就是兩個參數,exchange類型和 routingKey 。
文章目錄
1. 什么是路由模式2. 代碼部分2.1 日志生產者2.2 info消費者2.3 error消費者2.4 運行截圖3. 路由模式總結
1. 什么是路由模式
官網鏈接:https://msd.misuland.com/pd/2884250137616455578
路由模式跟發布訂閱模式類似,然后在訂閱模式的基礎上加上了類型,訂閱模式是分發到所有綁定到交換機的隊列,路由模式只分發到綁定在交換機上面指定路由鍵的隊列,我們可以看一下下面這張圖:
P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示隊列。
上圖是一個結合日志消費級別的配圖,在路由模式它會把消息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的 direct 模式。
以上圖的配置為例,我們以 routingKey="error" 發送消息到 Exchange,則消息會路由到Queue1(amqp.gen-S9b…,這是由RabbitMQ自動生成的Queue名稱)和Queue2(amqp.gen-Agl…)。如果我們以 routingKey="info" 或 routingKey="warning" 來發送消息,則消息只會路由到 Queue2。如果我們以其他 routingKey 發送消息,則消息不會路由到這兩個 Queue 中。
相對於發布訂閱模式,我們可以看到不再是廣播似的接收全部消息,而是有選擇性的消費。
我們就以接收不同日志級別的隊列為例吧。
2. 代碼部分
2.1 日志生產者
public class ProdecerRouting {
private static final String EXCHANGE_NAME = "my_fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.創建新的連接 */
Connection connection = MQConnectionUtils.newConnection();
/** 2.創建通道 */
Channel channel = connection.createChannel();
/** 3.綁定的交換機 參數1交互機名稱 參數2 exchange類型 */
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
/** 4.發送消息 */
String message = "",sendType="";
for (int i = 0; i < 10; i++)
{
if(i%2==0){
sendType = "info";
message = "我是 info 級別的消息類型:" + i;
}else{
sendType = "error";
message = "我是 error 級別的消息類型:" + i;
}
System.out.println("[send]:" + message + " " +sendType);
channel.basicPublish(EXCHANGE_NAME, sendType, null, message.getBytes("utf-8"));
try {
Thread.sleep(5 * i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 5.關閉通道、連接 */
channel.close();
connection.close();
/** 注意:如果消費沒有綁定交換機和隊列,則消息會丟失 */
}
}
注意:exchangeDeclare() 方法 exchange 類型為 direct
2.2 info消費者
public class ConsumerInfo {
private static final String QUEUE_NAME = "consumer_info";
private static final String EXCHANGE_NAME = "my_fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("info消費者啟動");
/* 1.創建新的連接 */
Connection connection = MQConnectionUtils.newConnection();
/* 2.創建通道 */
Channel channel = connection.createChannel();
/* 3.消費者關聯隊列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產者消息:" + msg);
}
};
/* 5.消費者監聽隊列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.3 error消費者
public class ConsumerError {
private static final String QUEUE_NAME = "consumer_error";
private static final String EXCHANGE_NAME = "my_fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("error消費者啟動");
/* 1.創建新的連接 */
Connection connection = MQConnectionUtils.newConnection();
/* 2.創建通道 */
Channel channel = connection.createChannel();
/* 3.消費者關聯隊列 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/* 4.消費者綁定交換機 參數1 隊列 參數2交換機 參數3 routingKey */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消費者獲取生產者消息:" + msg);
}
};
/* 5.消費者監聽隊列消息 */
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2.4 運行截圖
先運行兩個消費者,再運行生產者。如果沒有提前將隊列綁定到交換機,那么直接運行生產者的話,消息是不會發到任何隊列里的。
生產者
info消費者
error消費者
3. 路由模式總結
1、兩個隊列消費者設置的路由不一樣,接收到的消息就不一樣。路由模式下,決定消息向隊列推送的主要取決於路由,而不是交換機了。
2、該模式必須設置交換機,且聲明路由模式 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
生產者發送消息到交換機,同時定義了一個路由 routingKey,多個消費者聲明多個隊列,與交換機進行綁定,同時定義路由 routingKey,只有路由 routingKey相同的消費者才能消費數據
案例代碼:https://www.lanzous.com/i5ydu6d
我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo
