RabbitMQ五種工作模式
在SpringBoot環境下做的代碼測試,RabbitMQ的包是用SpringBoot的starter-amqp包引入的。
1、簡單隊列
一個生產者對應一個消費者!!!
1、pom文件
SpringBoot導入rabbitmq 啟動包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、工具類
/**
* 〈簡述〉<br>
* 〈連接RabbitMQ的工具類〉
*
* @create 2020/7/1
* @since 1.0.0
*/
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
return getConnection(new Properties());
}
private static Connection getConnection(Properties properties) throws Exception {
return getConnection(properties.getHost(),
properties.getPort(),
properties.getvHost(),
properties.getUserName(),
properties.getPassWord());
}
public static Connection getConnection(String host, int port, String vHost, String userName, String passWord) throws Exception {
//1、定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2、設置服務器地址
factory.setHost(host);
//3、設置端口
factory.setPort(port);
//4、設置虛擬主機、用戶名、密碼
factory.setVirtualHost(vHost);
factory.setUsername(userName);
factory.setPassword(passWord);
//5、通過連接工廠獲取連接
Connection connection = factory.newConnection();
return connection;
}
public static class Properties implements Serializable {
String host = "192.168.1.103";
int port = 5672;
String vHost = "/";
String userName = "guest";
String passWord = "guest";
public Properties() {
}
public Properties(String host, int port, String vHost, String userName, String passWord) {
this.host = host;
this.port = port;
this.vHost = vHost;
this.userName = userName;
this.passWord = passWord;
}
public String getHost() {
return host;
}
public Properties setHost(String host) {
this.host = host;
return self();
}
public int getPort() {
return port;
}
public Properties setPort(int port) {
this.port = port;
return self();
}
public String getvHost() {
return vHost;
}
public Properties setvHost(String vHost) {
this.vHost = vHost;
return self();
}
public String getUserName() {
return userName;
}
public Properties setUserName(String userName) {
this.userName = userName;
return self();
}
public String getPassWord() {
return passWord;
}
public Properties setPassWord(String passWord) {
this.passWord = passWord;
return self();
}
private Properties self(){
return this;
}
}
}
3、生產者 Producer
/**
* 〈簡述〉<br>
* 〈簡單隊列——消息生產者〉
*
* @create 2020/7/1
* @since 1.0.0
*/
public class Producer {
private final static String QUEUE_NAME = QueueName.test_simple_queue.toString();
public static void main(String[] args) throws Exception {
sendMessage();
}
public static void sendMessage() throws Exception {
//1、獲取連接
Connection connection = ConnectionUtil.getConnection();
//2、聲明信道
Channel channel = connection.createChannel();
//3、聲明(創建)隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、定義消息內容
String message = "hello rabbitmq ";
//5、發布消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent'" + message + "'");
//6、關閉通道
channel.close();
//7、關閉連接
connection.close();
}
}
4、消費者Consumer
/**
* 〈簡述〉<br>
* 〈消息消費者〉
*
* @create 2020/7/1
* @since 1.0.0
*/
public class Customer {
private final static String QUEUE_NAME = QueueName.test_simple_queue.toString();
public static void main(String[] args) throws Exception {
getMessage();
}
public static void getMessage() throws Exception {
//1、獲取連接
Connection connection = ConnectionUtil.getConnection();
//2、聲明通道
Channel channel = connection.createChannel();
//3、聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//4、定義隊列的消費者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String msgString = new String(body, "utf-8");
System.out.println("接收的消息:" + msgString);
}
};
//5、監聽隊列
/*
true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息已經成功消費
false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,
如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其
發送消息,直到該消費者反饋。
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
注意這里消費者有自動確認消息和手動確認消息兩種模式。
2、work 模式
一個生產者對應多個消費者,但是一條消息只能有一個消費者獲得消息!!!
2.1輪詢分發
1、生產者
/**
* 〈簡述〉<br>
* 〈輪詢分發——生產者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Send {
private static final String QUEUE_NAME = QueueName.test_work_queue.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
String msg = "hello " + i;
System.out.println("[mq] send:" + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}
2、消費者
這里創建兩個消費者
消費者1:每接收一條消息后休眠1秒
/**
* 〈簡述〉<br>
* 〈接收者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Receive1 {
private static final String QUEUE_NAME = QueueName.test_work_queue.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel、
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定義一個消費這
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Receive1 msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消費者2:每接收一條消息后休眠2秒
/**
* 〈簡述〉<br>
* 〈接收者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Receive2 {
private static final String QUEUE_NAME = QueueName.test_work_queue.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定義一個消費這
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Receive2 msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
3、測試結果
生產者從0-49依次發送消息
消費者1:接收到偶數
消費者2:接收到奇數
4、結論
輪詢分發就是將消息隊列中的消息,依次發送給所有消費者。一個消息只能被一個消費者獲取。
2.2公平分發
消費者關閉自動應答,開啟手動回執
/**
* 〈簡述〉<br>
* 〈接收者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Receive2 {
private static final String QUEUE_NAME = QueueName.test_work_queue.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);//保證一次只分發一個消息
//定義一個消費這
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Receive2 msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
//手動回執
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;//自動應答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
手動回執:消費者完成業務接口方法后可以告知消息隊列處理完成,消息隊列從隊列中取一條消息發送給消費者。
能者多勞:效率高的消費者消費消息多。
3、發布/訂閱模式
一個消費者將消息首先發送到交換器,交換器綁定到多個隊列,然后被監聽該隊列的消費者所接收並消費。
ps:X表示交換器,在RabbitMQ中,交換器主要有四種類型:direct、fanout、topic、headers,這里的交換器是 fanout。下面我們會詳細介紹這幾種交換器。
1、生產者
/**
* 〈簡述〉<br>
* 〈訂閱模式——生產者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Send {
private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//分發
//發送消息
String msg = "hello exchange";
System.out.println("[mq] send:" + msg);
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
2、消費者
注意:兩個消費者綁定不同的隊列,綁定相同的交換機;
消費者1:綁定隊列名=queue_fanout_email1
/**
* 〈簡述〉<br>
* 〈接收者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Receive1 {
private static final String QUEUE_NAME = MqName.queue_fanout_email.toString() + "1";
private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//定義一個消費這
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] Receive1 msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done");
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
消費者2:綁定隊列名=queue_fanout_email2
/**
* 〈簡述〉<br>
* 〈接收者〉
*
* @create 2020/7/3
* @since 1.0.0
*/
public class Receive2 {
private static final String QUEUE_NAME = MqName.queue_fanout_email.toString() + "2";
private static final String EXCHANGE_NAME = MqName.exchange_fanout.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//定義一個消費這
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] Receive2 msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done");
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
3、測試結果
如上圖,兩個消費者獲得了同一條消息。即就是,一個消息從交換機同時發送給了兩個隊列中,監聽這兩個隊列的消費者消費了這個消息;
如果沒有隊列綁定交換機,則消息將丟失。因為交換機沒有存儲能力,消息只能存儲在隊列中。
4、路由模式
生產者將消息發送到direct交換器,在綁定隊列和交換器的時候有一個路由key,生產者發送的消息會指定一個路由key,那么消息只會發送到相應key相同的隊列,接着監聽該隊列的消費者消費消息。
也就是讓消費者有選擇性的接收消息。
1、生產者
/**
* 〈簡述〉<br>
* 〈路由模式-消息發送者〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Send {
public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
public static final String ROUTING_KEY = MqName.routing_world.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String msg = "route message ->" + ROUTING_KEY;
System.out.println("對 " + ROUTING_KEY + " 發送消息:" + msg);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, msg.getBytes());
//關閉連接
channel.close();
connection.close();
}
}
2、消費者
注意:兩個消費者,綁定相同的交換機,不同的隊列,不一樣的路由
消費者1:路由=routing_hello
/**
* 〈簡述〉<br>
* 〈接收消息1〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Receive1 {
public static final String QUEUE_NAME = MqName.queue_routing_001.toString();
public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
public static final String ROUTING_KEY_hello = MqName.routing_hello.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//設置預讀取數
channel.basicQos(1);
//綁定交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_hello);
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2:路由1=routing_world 路由2=routing_hello
/**
* 〈簡述〉<br>
* 〈接收消息2〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Receive2 {
public static final String QUEUE_NAME = MqName.queue_routing_002.toString();
public static final String EXCHANGE_NAME = MqName.exchange_routing.toString();
public static final String ROUTING_KEY_world = MqName.routing_world.toString();
public static final String ROUTING_KEY_hello = MqName.routing_hello.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//設置預讀取數
channel.basicQos(1);
//綁定交換機和路由器,可以綁定多個路由
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_world);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_hello);
//定義消息消費者
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg);
}
};
//接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、測試結果
生產者發送:routing_world
消費者1:沒有接收到
消費者2:接收到了
生產者發送:routing_hello
消費者1:接收到了
消費者2:接收到了
路由模式,是以路由規則為導向,引導消息存入符合規則的隊列中。再由隊列的消費者進行消費的。
5、主題模式
上面的路由模式是根據路由key進行完整的匹配(完全相等才發送消息),這里的通配符模式通俗的來講就是模糊匹配。
符號“#”表示匹配一個或多個詞,符號“*”表示匹配一個詞。
1、生產者
/**
* 〈簡述〉<br>
* 〈主題模式-消息發送者〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Send {
public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
public static final String ROUTING_KEY = MqName.routing_goods.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// String routingKey = ROUTING_KEY + ".add";
String routingKey = ROUTING_KEY + ".publish";
// String routingKey = ROUTING_KEY + ".update";
String msg = "route message ->" + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("對 " + routingKey + " 發送消息:" + msg);
//關閉連接
channel.close();
connection.close();
}
}
2、消費者
注意兩個消費者,路由的不同
消費者1:路由1=routing_goods.add 路由2=routing_goods.update
/**
* 〈簡述〉<br>
* 〈接收消息1〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Receive1 {
public static final String QUEUE_NAME = MqName.queue_topic_001.toString();
public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
public static final String ROUTING_KEY = MqName.routing_goods.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//設置預讀取數
channel.basicQos(1);
//綁定交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".add");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".update");
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(envelope.getRoutingKey() + " [1] Receive1 msg:" + msg);
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消費者2:路由=routing_goods.*
/**
* 〈簡述〉<br>
* 〈接收消息2〉
*
* @create 2020/7/8
* @since 1.0.0
*/
public class Receive2 {
public static final String QUEUE_NAME = MqName.queue_routing_002.toString();
public static final String EXCHANGE_NAME = MqName.exchange_topic.toString();
public static final String ROUTING_KEY = MqName.routing_goods.toString();
public static void main(String args[]) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//獲取channel
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//設置預讀取數
channel.basicQos(1);
//綁定交換機和路由器,可以綁定多個路由
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY + ".*");
//定義消息消費者
Consumer consumer = new DefaultConsumer(channel) {
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println(envelope.getRoutingKey() + " [2] Receive1 msg:" + msg);
}
};
//接收消息
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
3、測試結果
消費者1只能接收到.add 和 .update
的消息
消費者2可以接收到.add .publish 和 .update
的消息
與路由模式相似,但是,主題模式是一種模糊的匹配方式。
6.工作模式總結
這五種工作模式,可以歸為三類:
- 生產者,消息隊列,一個消費者;
- 生產者,消息隊列,多個消費者;
- 生產者,交換機,多個消息隊列,多個消費者;
7、四種交換器
1、direct 如果路由鍵完全匹配的話,消息才會被投放到相應的隊列。
2、fanout 當發送一條消息到fanout交換器上時,它會把消息投放到所有附加在此交換器上的隊列。
3、topic 設置模糊的綁定方式,“*”操作符將“.”視為分隔符,匹配單個字符;“#”操作符沒有分塊的概念,它將任意“.”均視為關鍵字的匹配部分,能夠匹配多個字符。
4、header headers 交換器允許匹配 AMQP 消息的 header 而非路由鍵,除此之外,header 交換器和 direct 交換器完全一致,但是性能卻差很多,因此基本上不會用到該交換器