RabbitMQ詳解(三)------RabbitMQ的五種模式
1.簡單隊列(模式)
上一篇文章末尾的實例給出的代碼就是簡單模式.
一個生產者對應一個消費者!!!
pom.xml
必須導入RabbitMQ依賴包
<!--RabbitMQ-client-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.2</version>
</dependency>
ConnectionUtil.java
package org.alva.Utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,RabbitMQ的連接工具類
* <詳細介紹>,
*
*/
public class ConnectionUtil {
public static Connection getConnection(String host, int port, String vhost, String username, String password) throws IOException, TimeoutException {
//1.定義連接工廠
ConnectionFactory connectionFactory = new ConnectionFactory();
//2.設置服務器地址
connectionFactory.setHost(host);
//3.設置端口
connectionFactory.setPort(port);
//4.設置虛擬主機,用戶名,密碼
connectionFactory.setVirtualHost(vhost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//5.通過連接工廠獲取連接
Connection connection = connectionFactory.newConnection();
return connection;
}
}
Consumer.java
package org.alva.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import org.alva.Utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,消費者
* <詳細介紹>,
*
*/
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.獲取連接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.聲明通道
Channel channel = connection.createChannel();
//3.聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4.定義隊列的消費者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
//5.監聽隊列
/*
true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息成功消費.
false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,
如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其發送消息,
直到該消費者反饋.
*/
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
//6.獲取消息
while (true){
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received '" + message + "'");
}
}
}
Productor.java
package org.alva.RabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.alva.Utils.ConnectionUtil;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* <一句話描述>,生產者
* <詳細介紹>,
*
*/
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//1.獲取連接
Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
//2.聲明通道
Channel channel = connection.createChannel();
//3.聲明(創建)隊列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//4.定義消息內容
String message = "hello rabbitmq";
//5.發布消息
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("[x] send'"+message+"'");
//6.關閉通道和連接
channel.close();
connection.close();
}
}
2.work模式
一個生產者對應多個消費者,但是只能有一個消費者獲得消息!!!
競爭消費者模式.
-
生產者
package org.alva.RabbitMQ.WorkModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,生產者 * <詳細介紹>,Work模式下的生產者 * */ public class Producter { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.定義消息內容,發布多條消息 for (int i = 0; i < 10; i++) { String message = "hello rabbitmq " + i; //5.發布消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("[x] send message is '" + message + "'"); //6.模擬發送消息延時,便於展示多個消費者競爭接受消息 Thread.sleep(i * 10); } //7.關閉信道 channel.close(); //8.關閉連接 connection.close(); } }
-
消費者
需要創建兩個消費者.
消費者1:每接收一條消息后休眠10毫秒.
package org.alva.RabbitMQ.WorkModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,Work模式下的消費者 * */ public class Consumer1 { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明通道 Channel channel = connection.createChannel(); //3.聲明隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //同一時刻服務器只會發送一條消息給消費者 // channel.basicQos(1); //4.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME,false,queueingConsumer); //6.獲取消息 while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[x] received message : '"+message+"'"); //休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
消費者2:每接收一條消息后休眠1000毫秒
package org.alva.RabbitMQ.WorkModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>, * <詳細介紹>, * */ public class Consumer2 { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME,false,queueingConsumer); while (true){ QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[x] received message : '" + message + "'"); Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } } }
-
測試結果
-
首先生產者一次打印從0-9條消息
-
然后是消費者1:結果為打印偶數條消息(注:先啟動的消費者為消費者1)
-
消費者2:結果為打印奇數條消息
結論:
*消費者1和消費者2獲取到的消息內容是不同的,也就是說同一個消息只能被一個消費者獲取.
*消費者1和消費者2分別獲取奇數條消息和偶數條消息,兩種獲取消息的條數是一樣的.
前面我們說這種模式是競爭消費者模式,一條隊列被多個消費者監聽,這里兩個消費者,其中消費者1和消費者2在獲取消息后分別休眠了10毫秒和1000毫秒,也就是說兩個消費者獲取消息的效率是不一樣的,但是結果卻是兩者獲得的消息條數是一樣的,這根本不構成競爭關系,那么我們應該怎么辦才能讓工作效率更高的消費者獲取消息更多,也就是消費者1獲取消息更多呢?
-
能者多勞
channel.basicQos(1);
增加如上代碼,表示同一時刻服務器只會發送一條消息給消費者.消費者1和消費者2獲取消息結果如下:
-
應用場景
效率高的消費者消費消息多,可以用來進行負載均衡.
-
3.發布/訂閱模式
一個消費者將消息首先發送到交換器,交換器綁定多個隊列,然后被監聽該隊列的消費者所接收並消費.
*在RabbitMQ中,交換器主要有四種類型:direct,fanout,topic,headers,這里的交換器是fanout.
-
生產者
package org.alva.RabbitMQ.PublishModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,發布/訂閱模式下的生產者 * <詳細介紹>, * */ public class Producer { private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明交換器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //4.定義消息內容 String message = "hello rabbitmq"; //5.發布消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("[x] send'" + message + "'"); //6.關閉通道和連接 channel.close(); connection.close(); } }
-
消費者
消費者1:
package org.alva.RabbitMQ.PublishModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,發布/訂閱模式下的消費者 * */ public class Consumer1 { public static final String QUEUE_NAME = "fanout_queue_1"; public static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明交換器 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.綁定隊列到交換器 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, queueingConsumer); //6.獲取消息 while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者1] received message : '" + message + "'"); //休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2:
package org.alva.RabbitMQ.PublishModel; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>, * <詳細介紹>, * */ public class Consumer2 { public static final String QUEUE_NAME = "fanout_queue_2"; private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); channel.basicQos(1); QueueingConsumer queueingConsumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, queueingConsumer); while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者2] received message : '" + message + "'"); Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
注意:消費者1和消費者2兩者監聽的隊列名稱是不一樣的.
-
測試結果
消費者1和消費者2都消費了該消息.
ps:這是因為消費者1和消費者2都監聽了被同一個交換器綁定的隊列.如果消息發送到沒有隊列綁定的交換器時,消息將丟失,因為交換器沒有存儲消息的能力,消息只能存儲在隊列中.
-
應用場景:
比如一個商城系統需要在管理員上傳新的商品圖片時,前台系統必須更新圖片,日志系統必須記錄相應的日志,那么就可以將兩個隊列綁定到圖片上傳交換器上,一個用於前台系統剛更新圖片,另一個用於日志系統記錄日志.
4.路由模式
生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那么消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費信息.
-
生產者
package org.alva.RabbitMQ.DirectExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,路由模式下的生產者 * <詳細介紹>, * */ public class Producer { private final static String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明交換器,類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //4.定義消息內容 String message = "hello rabbitmq"; //5.發布消息 channel.basicPublish(EXCHANGE_NAME, "update", null, message.getBytes()); System.out.println("[x] send'" + message + "'"); //6.關閉通道和連接 channel.close(); connection.close(); } }
-
消費者
消費者1:
package org.alva.RabbitMQ.DirectExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,路由模式下的消費者1 * <p> * 這種模式添加了一個路由鍵,生產者發布消息的時候添加路由鍵,消費者綁定隊列到交換機時添加鍵值, * 這樣就可以接收到需要接收的消息。 * */ public class Consumer1 { public static final String QUEUE_NAME = "direct_queue_1"; public static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.綁定隊列到交換器,指定路由key為update channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "add"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, queueingConsumer); //6.獲取消息 while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者1] received message : '" + message + "'"); //休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消費者2:
package org.alva.RabbitMQ.DirectExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,路由模式下的消費者2 * */ public class Consumer2 { public static final String QUEUE_NAME = "direct_queue_2"; public static final String EXCHANGE_NAME = "direct_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.綁定隊列到交換器,指定路由key為select channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, queueingConsumer); //6.獲取消息 while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者1] received message : '" + message + "'"); //休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
-
測試結果
生產者發布消息,指定的路由key為update,消費者1綁定隊列和交換器時key分別是update/delete/add;消費者2綁定隊列和交換器時key時select.
所以可以猜到生產者發送的消息,只有消費者1能夠接收並消費,而消費者2是不能接收的.
-
應用場景
利用消費者能夠有選擇性的接收消息的特性,比如商場系統的后台管理系統對於商品進行修改、刪除、新增操作都需要更新前台系統的界面展示,而查詢操作不需要,那么這兩個隊列分開接收消息就比較好.
5.主題模式
上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配.
符號"#"表示匹配一個或多個詞,符號"*"表示匹配一個詞.
-
生產者
package org.alva.RabbitMQ.TopicExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,主題模式下的生產者 * <詳細介紹>, * */ public class Producer { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5674, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明交換器,類型為direct channel.exchangeDeclare(EXCHANGE_NAME, "topic"); //4.定義消息內容 String message = "hello rabbitmq"; //5.發布消息 channel.basicPublish(EXCHANGE_NAME, "update.Name", null, message.getBytes()); System.out.println("[x] send'" + message + "'"); //6.關閉通道和連接 channel.close(); connection.close(); } }
-
消費者1
package org.alva.RabbitMQ.TopicExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,主題模式下的消費者1 * */ public class Consumer1 { public static final String QUEUE_NAME = "topic_queue_1"; public static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5673, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.綁定隊列到交換器,指定路由key為update channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update.#"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, queueingConsumer); //6.獲取消息 while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者1] received message : '" + message + "'"); //休眠10毫秒 Thread.sleep(10); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
-
消費者2
package org.alva.RabbitMQ.TopicExchange; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; import org.alva.Utils.ConnectionUtil; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * <一句話描述>,消費者 * <詳細介紹>,主題模式下的消費者2 * * @author 穆國超 * @since 設計wiki | 需求wiki */ public class Consumer2 { public static final String QUEUE_NAME = "topic_queue_2"; public static final String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1.獲取連接 Connection connection = ConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest"); //2.聲明信道 Channel channel = connection.createChannel(); //3.聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4.綁定隊列到交換器,指定路由key為select channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "select.#"); //同一時刻服務器只會發送一條消息給消費者 channel.basicQos(1); //5.定義隊列的消費者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel); //5.監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, queueingConsumer); //6.獲取消息 while (true) { QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("[消費者1] received message : '" + message + "'"); //休眠10毫秒 Thread.sleep(1000); //返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
-
分析結果
生產者發送消息綁定的路由key為update.Name;消費者1監聽的隊列和交換器綁定路由key為update.#;消費者2監聽的隊列和交換器綁定路由key為select.#.
很顯然,消費者1會接收到消息,而消費者2接收不到
6.四種交換器
前面介紹了五種隊列模式,但是實際上只有三種,第一種簡單隊列,第二種工作模式,剩下的三種都是和交換器綁定的合起來稱為一種,這節詳細介紹交換器.
交換器分為四種,分別是:direct,fanout,topic和headers.
前三種分別對應路由模式,發布訂閱模式和通配符模式,headers交換器允許匹配AMQP消息的header而非路由鍵,除此之外,header交換器和direct交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器,這不做詳細介紹.
-
direct
如果路由鍵完全匹配的話,消息才會被投放到相應的隊列.
-
fanout
當發送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器的上的隊列.
-
topic
設置模糊的綁定方式,"*"操作符將"."視為分隔符,匹配單個字符;"#"操作符沒有分塊的概念,它將任意"."均視為關鍵字的匹配部分,能夠匹配多個字符.
7.總結
關於RabbitMQ的五種隊列,其實實際使用最多的是最后一種主題模式,通過模糊匹配,使得操作更加自如.