1. RabbitMQ簡介
RabbitMQ是由erlang語言開發,基於AMQP(高級消息隊列協議)協議實現的消息隊列,它是一種應用程序之間的通信方法,消息隊列在分布式系統開發中應用非常廣泛。
RabbitMQ官方地址:http://www.rabbitmq.com
1.1 消息隊列
MQ全稱為Message Queue,即消息隊列。
“消息隊列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。可以看出消息的生產和消費都是異步的,生產者和消費者只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。
1.2 消息隊列使用場景
- 任務異步處理:
高並發環境下,由於來不及同步處理,請求往往會發生堵塞。我們可以異步處理請求,從而緩解系統的壓力。將不需要同步處理的並且耗時長的操作由消息隊列通知消息接收方進行異步處理。減少了應用程序的響應時間。
場景:
用戶注冊后,需要發送注冊郵件和注冊成功短信通知【這兩個操作都不是必須的,只是一個通知,沒必要讓用戶等待收到郵箱和短信后才算注冊成功】。引入消息隊列后,將發送郵件和短信的業務交給消息隊列,實現異步處理。
- 應用程序解耦合:
MQ相當於一個中介,生產方通過MQ與消費方交互,不直接進行較胡,它將生產方與消費方進行解耦合,不至於當時功能里面的某一個操作因為宕機了導致后續操作無法進行。
場景:
雙十一購物狂歡節,用戶下單后,訂單系統需要通知庫存系統減少響應庫存量,若庫存系統出現故障,此筆訂單就不能成功。
引入消息隊列后,訂單系統向消息隊列發送用戶下單的消息,並持久化數據到rabbitMQ【防止rabbitMQ宕機后消息丟失或庫存系統宕機沒消費消息】,庫存系統監聽消息隊列的消息。
- 流量削峰:
場景:
秒殺活動,一般流量會很大,可能導致某個系統直接扛不住而掛掉。
引入消息隊列(一般會在前端系統引入),用戶發起請求時,先來到消息隊列再去秒殺系統。在消息隊列中對消息進行處理(比如請求達到某閾值時,直接拋棄那些請求或跳轉錯誤頁面),如此一來可緩解因高並發請求所導致秒殺系統扛不住掛掉的問題。
2. 為什么要學習RabbitMQ
先來看下面一個電商項目的場景:
商品的原始數據保存在數據庫中,增刪改查都在數據庫中完成。
搜索服務數據來源是索引庫(Elasticsearch),如果數據庫商品發生變化,索引庫數據不能及時更新。
商品詳情做了頁面靜態化處理,靜態頁面數據也不會隨着數據庫商品更新而變化。
如果我們在后台修改了商品的價格,搜索頁面和商品詳情頁顯示的依然是舊的價格,這樣顯然不對。該如何解決?
我們可能會想到這么做:
方案1:每當后台對商品做增刪改操作,同時修改索引庫數據及更新靜態頁面。
方案2:搜索服務和商品頁面靜態化服務對外提供操作接口,后台在商品增刪改后,調用接口。
這兩種方案都有個嚴重的問題:就是代碼耦合,后台服務中需要嵌入搜索和商品頁面服務,違背了微服務的獨立原則。並且是同步操作,耗費時間過長。
這時,我們就會采用另外一種解決辦法,那就是消息隊列:
商品服務對商品增刪改以后,無需去操作索引庫和靜態頁面,只需向MQ發送一條消息(比如包含商品id的消息),也不關心消息被誰接收。
搜索服務和靜態頁面服務監聽MQ,接收消息,然后分別去處理索引庫和靜態頁面(根據商品id去更新索引庫和商品詳情靜態頁面)。
3. 常見MQ產品
- ActiveMQ:基於JMS
- RabbitMQ:基於AMQP協議,erlang語言開發,穩定性好
- RocketMQ:基於JMS,阿里巴巴產品,目前交由Apache基金會
- Kafka:分布式消息系統,高吞吐量
語言的支持:ActiveMQ,RocketMQ只支持Java語言,Kafka可以支持多們語言,RabbitMQ支持多種語言
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒級別,RabbitMQ是微秒級別的
消息丟失,消息重復問題: RabbitMQ針對消息的持久化,和重復問題都有比較成熟的解決方案
4. RabbitMQ架構
4.1 簡單架構圖

4.2 完整架構圖

5. RabbitMQ的通訊方式

6. 用Java方式操作RabbitMQ
6.1 RabbitMQ的連接
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
public static Connection getConnection() {
// 創建Connection工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("47.114.186.28");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/test");
// 創建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
@Test
public void getConnection() throws IOException {
Connection connection = RabbitMQClient.getConnection();
connection.close();
}
6.2 RabbitMQ的“Hello World”通訊方式

生產者
public class Publisher {
@Test
public void publish() throws Exception {
//1. 獲取Connection對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建Channel通道
Channel channel = connection.createChannel();
//3. 發布消息到exchange,發布的同時需要指定路由的規則
// 參數1:指定exchange,如果使用""【表示使用默認exchange】。
// 參數2:指定路由規則【“Hello World”通訊方式 直接使用具體的隊列名稱即可】。
// 參數3:指定傳遞的消息所攜帶的properties【即:傳遞的消息的額外設置】,沒有就寫null。
// 參數4:指定發布的具體消息內容,byte[]類型
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不會幫你將消息持久化到本地的,Queue才會幫你持久化消息。發布者是和交換機打交道的,所以這里不能幫助實現持久化本地
System.out.println("生產者發布消息成功!");
//4. 釋放資源
channel.close();
connection.close();
}
}
消費者
public class Consumer {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明[創建]隊列-HelloWorld
//參數1:queue - 指定隊列的名稱 【若該隊列不存在,則自動創建】
//參數2:durable - 當前隊列是否需要持久化 【持久化:將隊列接收到的消息持久化到硬盤,若隊列中的消息還沒被消費,就算RabbitMQ重啟了該消息也不會丟失】
//參數3:exclusive - 是否獨占 【當前隊列只允許一個消費者連接可用,其他消費者再來連接時不能再用】【當連接對象Connection被close()之后,當前隊列會自動刪除】
//參數4:autoDelete - 是否自動刪除 【如果這個隊列沒有消費者在消費,隊列自動刪除】
//參數5:arguments - 指定當前隊列的其他信息
channel.queueDeclare("HelloWorld", true, false, false, null);
DefaultConsumer consume = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
}
};
//4. 開啟監聽隊列
//參數1:queue - 指定消費哪個隊列
//參數2:autoAck - 指定是否自動ACK (true:接收到消息后,會立即自動告訴RabbitMQ消息已消費了)
//參數3:consumer - 指定消費時的消費回調(收到消息后,消費者要干點什么事)
channel.basicConsume("HelloWorld", true, consume); // 這個才是開啟監聽的方法
System.out.println("消費者開始監聽隊列!");
// 可以實現輸入字符 , 用來將程序在這里等着,相當於debug
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
6.3 RabbitMQ的“Work”通訊方式
“Hello world”那種通訊方式存在一個弊端,若消費者消費消息的速度很慢,可能會導致生產者發布的消息形成堆積。
我們接下來介紹一種通訊方式,就是一個生產者發布消息,有多個消費者監聽,誰收到消息,誰就消費【該通訊方式中,消息一旦被消費了,就消失。所以不會出現重復消費的情況】。這樣就解決了上述所說的問題。

public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
//參數1:queue - 指定隊列的名稱 【若該隊列不存在,則自動創建】
//參數2:durable - 當前隊列是否需要持久化 【持久化:將隊列接收到的消息持久化到硬盤,若隊列中的消息還沒被消費,就算RabbitMQ重啟了該消息也不會丟失】
//參數3:exclusive - 是否獨占 【當前隊列只允許一個消費者連接可用,其他消費者再來連接時不能再用】【當連接對象Connection被close()之后,當前隊列會自動刪除】
//參數4:autoDelete - 是否自動刪除 【如果這個隊列沒有消費者在消費,隊列自動刪除】
//參數5:arguments - 指定當前隊列的其他信息
channel.queueDeclare("Work", true, false, false, null);
//3.1 默認情況下是平均消費,有10個消息,那么2個消費者各消費5個消息。 如果想指定當前消費者一次消費多少個消息,可通過basicQos設置。
//【【如果要想讓某個消費能力強的消費者消費更多的消息,就可以指定該消費者的消費能力。PS: 此時必須改為手動ACK】】
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者1號接收到消息:" + new String(body, "UTF-8"));
// 【【手動ack 表示我已經消費完了】】
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 開啟監聽Queue
// 將自動消費改為false 表示手動消費
//參數1:queue - 指定消費哪個隊列
//參數2:autoAck - 指定是否自動ACK (true:接收到消息后,會立即自動告訴RabbitMQ消息已消費了)
//參數3:consumer - 指定消費時的消費回調(收到消息后,消費者要干點什么事)
channel.basicConsume("Work", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
channel.queueDeclare("Work", true, false, false, null);
//3.1 指定當前消費者,一次消費多少個消息。
channel.basicQos(1);
//4. 開啟監聽Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者2號接收到消息:" + new String(body, "UTF-8"));
// 手動ack,消費完了告訴rabbitMQ我消費完了
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("Work", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 獲取Connection對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建Channel通道
Channel channel = connection.createChannel();
//3. 發布消息到exchange,同時指定路由的規則
for (int i = 0; i < 10; i++) {
// 參數1:指定exchange,如果使用""【表示使用默認exchange】。
// 參數2:指定路由規則【“Hello World”通訊方式 直接使用具體的隊列名稱即可】。
// 參數3:指定傳遞的消息所攜帶的properties【即:傳遞的消息的額外設置】,沒有就寫null。
// 參數4:指定發布的具體消息內容,byte[]類型
String msg = "Hello-World!" + i;
channel.basicPublish("", "Work", null, msg.getBytes());
}
System.out.println("生產者發布消息成功!");
//4. 釋放資源
channel.close();
connection.close();
}
}
6.4 RabbitMQ的“Publish/Subscribe”通訊方式 (廣播通訊方式)

public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列要消費的隊列:pubsub-queue1
//參數1:queue - 指定隊列的名稱 【若該隊列不存在,則自動創建】
//參數2:durable - 當前隊列是否需要持久化 【持久化:將隊列接收到的消息持久化到硬盤,若隊列中的消息還沒被消費,就算RabbitMQ重啟了該消息也不會丟失】
//參數3:exclusive - 是否獨占 【當前隊列只允許一個消費者連接可用,其他消費者再來連接時不能再用】【當連接對象Connection被close()之后,當前隊列會自動刪除】
//參數4:autoDelete - 是否自動刪除 【如果這個隊列沒有消費者在消費,隊列自動刪除】
//參數5:arguments - 指定當前隊列的其他信息
channel.queueDeclare("pubsub-queue1", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者1號接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 開啟監聽Queue
//參數1:queue - 指定消費哪個隊列
//參數2:autoAck - 指定是否自動ACK (true:接收到消息后,會立即自動告訴RabbitMQ消息已消費了)
//參數3:consumer - 指定消費時的消費回調(收到消息后,消費者要干點什么事)
channel.basicConsume("pubsub-queue1", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列要消費的隊列:pubsub-queue1
channel.queueDeclare("pubsub-queue2", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
//4. 開啟監聽Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者1號接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("pubsub-queue2", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 獲取Connection對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建Channel
Channel channel = connection.createChannel();
//3. 創建exchange
//參數1: exchange的名稱 若該交換機不存在,則創建
//參數2: 指定exchange的類型 常用的有默認的,direct,fanout,topic。前面2種我們都是使用的默認的交換機。 FANOUT:Publish/Subscribe通訊方式 DIRECT:Routing通訊方式 TOPIC:Topic通訊方式
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
//3.1 exchange綁定某一個隊列 該操作可以在生產者干 也 可以在消費者干
//參數1:指定隊列
//參數2:指定交換機
//參數3:指定routingKey規則
channel.queueBind("pubsub-queue1", "pubsub-exchange", "");
channel.queueBind("pubsub-queue2", "pubsub-exchange", "");
//4. 發布消息到exchange,同時指定路由的規則
for (int i = 0; i < 10; i++) {
String msg = "Hello-World!" + i;
// 注意這里,之前發布消息采用默認的交換機 我們使用的是 ""。 現在我們要指定交換機了。
// 參數1:指定exchange,如果使用""【表示使用默認exchange】。
// 參數2:指定路由規則【“Hello World”通訊方式 直接使用具體的隊列名稱即可】。
// 參數3:指定傳遞的消息所攜帶的properties【即:傳遞的消息的額外設置】,沒有就寫null。
// 參數4:指定發布的具體消息內容,byte[]類型
channel.basicPublish("pubsub-exchange", "", null, msg.getBytes()); // 表示綁定到pubsub-exchange交換機,該交換機有兩個隊列:pubsub-queue1和pubsub-queue2
}
System.out.println("生產者發布消息成功!");
//5. 釋放資源
channel.close();
connection.close();
}
}
6.5 RabbitMQ的“Routing”通訊方式

public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
//參數1:queue - 指定隊列的名稱 【若該隊列不存在,則自動創建】
//參數2:durable - 當前隊列是否需要持久化 【持久化:將隊列接收到的消息持久化到硬盤,若隊列中的消息還沒被消費,就算RabbitMQ重啟了該消息也不會丟失】
//參數3:exclusive - 是否獨占 【當前隊列只允許一個消費者連接可用,其他消費者再來連接時不能再用】【當連接對象Connection被close()之后,當前隊列會自動刪除】
//參數4:autoDelete - 是否自動刪除 【如果這個隊列沒有消費者在消費,隊列自動刪除】
//參數5:arguments - 指定當前隊列的其他信息
channel.queueDeclare("routing-queue-error", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者ERROR接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
//4. 開啟監聽Queue
//參數1:queue - 指定消費哪個隊列
//參數2:autoAck - 指定是否自動ACK (true:接收到消息后,會立即自動告訴RabbitMQ消息已消費了)
//參數3:consumer - 指定消費時的消費回調(收到消息后,消費者要干點什么事)
channel.basicConsume("routing-queue-error", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
channel.queueDeclare("routing-queue-info", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
//4. 開啟監聽Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消費者INFO接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("routing-queue-info", false, consumer);
System.out.println("開始消費消息。。。。");
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 獲取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 創建Channel
Channel channel = connection.createChannel();
//3. 創建exchange, 綁定隊列:routing-queue-error 和 routing-queue-info
//參數1: exchange的名稱 若該交換機不存在,則創建
//參數2: 指定exchange的類型 常用的有默認的,direct,fanout,topic。前面2種我們都是使用的默認的交換機。 FANOUT:Publish/Subscribe通訊方式 DIRECT:Routing通訊方式 TOPIC:Topic通訊方式
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
//3.1 exchange綁定某一個隊列 該操作可以在生產者干 也 可以在消費者干
//參數1:指定隊列
//參數2:指定交換機
//參數3:指定routingKey規則
channel.queueBind("routing-queue-error", "routing-exchange", "ERROR");
channel.queueBind("routing-queue-info", "routing-exchange", "INFO");
//4. 發布消息到exchange,同時指定路由的規則
// 參數1:指定exchange,如果使用""【表示使用默認exchange】。
// 參數2:指定路由規則【“Hello World”通訊方式 直接使用具體的隊列名稱即可】。
// 參數3:指定傳遞的消息所攜帶的properties【即:傳遞的消息的額外設置】,沒有就寫null。
// 參數4:指定發布的具體消息內容,byte[]類型
channel.basicPublish("routing-exchange", "ERROR", null, "ERROR".getBytes());// 發布到"routing-exchange",並且要求該交換機的隊列的routingKey是 ERROR
channel.basicPublish("routing-exchange", "INFO", null, "INFO-1".getBytes());
channel.basicPublish("routing-exchange", "INFO", null, "INFO-2".getBytes());
channel.basicPublish("routing-exchange", "INFO", null, "INFO-3".getBytes());
System.out.println("生產者發布消息成功!");
//4. 釋放資源
channel.close();
connection.close();
}
}
6.6 RabbitMQ的“Topic”通訊方式



public class Consumer1 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
channel.queueDeclare("topic-queue-1", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
//4. 開啟監聽Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("對紅色動物感興趣接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("topic-queue-1", false, consumer);
System.out.println("開始消費消息。。。。");
// System.in.read();
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Consumer2 {
@Test
public void consume() throws Exception {
//1. 獲取連接對象
Connection connection = RabbitMQClient.getConnection();
//2. 創建channel
Channel channel = connection.createChannel();
//3. 聲明隊列-HelloWorld
channel.queueDeclare("topic-queue-2", true, false, false, null);
//3.5 指定當前消費者,一次消費多少個消息
channel.basicQos(1);
//4. 開啟監聽Queue
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("對快的和兔子感興趣接收到消息:" + new String(body, "UTF-8"));
// 手動ack
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("topic-queue-2", false, consumer);
System.out.println("開始消費消息。。。。");
// System.in.read();
System.in.read();
//5. 釋放資源
channel.close();
connection.close();
}
}
public class Publisher {
@Test
public void publish() throws Exception {
//1. 獲取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 創建Channel
Channel channel = connection.createChannel();
// 3. 創建exchange綁定隊列 topic-queue-1 topic-queue-2
//參數1: exchange的名稱 若該交換機不存在,則創建
//參數2: 指定exchange的類型 常用的有默認的,direct,fanout,topic。前面2種我們都是使用的默認的交換機。 FANOUT:Publish/Subscribe通訊方式 DIRECT:Routing通訊方式 TOPIC:Topic通訊方式
channel.exchangeDeclare("topic-exchange", BuiltinExchangeType.TOPIC);
//3.1 exchange綁定某一個隊列 該操作可以在生產者干 也 可以在消費者干
//參數1:指定隊列
//參數2:指定交換機
//參數3:指定routingKey規則
// 該模式與routing模式的區別就在於 他們的routingKey有所不同
// 需求:我們要發布動物的信息,該動物有3個屬性 <speed>.<color>.<what>
// *.red.* -> *占位符 即:一個*代表一個內容
// fast.# -> #通配符 即:一個#代表多個內容
// 如: *.*.rabbit 等同於 #.rabbit
channel.queueBind("topic-queue-1", "topic-exchange", "*.red.*"); // topic-queue-1隊列只對紅色動物感興趣
channel.queueBind("topic-queue-2", "topic-exchange", "fast.#");
channel.queueBind("topic-queue-2", "topic-exchange", "*.*.rabbit"); // topic-queue-2隊列對快的動物感興趣 和 對兔子感興趣。 注意: 一個隊列可以被多次綁定
//4. 發布消息到exchange,同時指定路由的規則
// 參數1:指定exchange,如果使用""【表示使用默認exchange】。
// 參數2:指定路由規則【“Hello World”通訊方式 直接使用具體的隊列名稱即可】。
// 參數3:指定傳遞的消息所攜帶的properties【即:傳遞的消息的額外設置】,沒有就寫null。
// 參數4:指定發布的具體消息內容,byte[]類型
channel.basicPublish("topic-exchange", "fast.red.monkey", null, "紅快猴子".getBytes());
channel.basicPublish("topic-exchange", "slow.black.dog", null, "黑漫狗".getBytes());
channel.basicPublish("topic-exchange", "fast.white.cat", null, "快白貓".getBytes());
System.out.println("生產者發布消息成功!");
//4. 釋放資源
channel.close();
connection.close();
}
}

7. SpringBoot整合RabbitMQ[基於配置類]
整合各種模式可以參考:https://juejin.cn/post/6976033887449251876
7.1 導入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2 編寫配置文件
spring:
rabbitmq:
host: 47.114.216.28
port: 5672
username: admin
password: admin
virtual-host: /test
7.3 聲明exchange,queue
@Configuration
public class RabbitMQConfig {
// @Bean即表示聲明該方法返回的實例是受 Spring 管理的 Bean。 如果想自定義Bean名稱,可以再@Bean()注解里面配置。
//1. 創建exchange - topic 以topic通訊方式為例,因為其最靈活
@Bean
public TopicExchange getTopicExchange(){
// 參數1:交換機名稱 參數2:是否持久化數據 參數3:是否自動刪除
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 創建queue
@Bean
public Queue getQueue(){
// 參數1:隊列名稱 參數2:是否持久化 參數3:是否獨占 參數4:是否自動刪除 參數5:指定當前隊列的其他信息
//參數1:queue - 指定隊列的名稱 【若該隊列不存在,則自動創建】
//參數2:durable - 當前隊列是否需要持久化 【持久化:將隊列接收到的消息持久化到硬盤,若隊列中的消息還沒被消費,就算RabbitMQ重啟了該消息也不會丟失】
//參數3:exclusive - 是否獨占 【當前隊列只允許一個消費者連接可用,其他消費者再來連接時不能再用】【當連接對象Connection被close()之后,當前隊列會自動刪除】
//參數4:autoDelete - 是否自動刪除 【如果這個隊列沒有消費者在消費,隊列自動刪除】
//參數5:arguments - 指定當前隊列的其他信息
return new Queue("boot-queue",true,false,false,null);
}
//3. 綁定在一起【創建Binding】
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue){
// .bind() 綁定哪個隊列 .to() 到哪個交換機 .with() 指定routingKey
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
// 上述寫法可以寫成下面這種
@Configuration
public class RabbitMQConfig {
public static final String TOPIC_NAME = "topicExchange";
public static final String QUEUE_NAME = "topicQueue";
// @Bean即表示聲明該方法返回的實例是受 Spring 管理的 Bean。 如果想自定義Bean名稱,可以再@Bean()注解里面配置。
//1. 創建exchange - topic 以topic通訊方式為例,因為其最靈活
@Bean("exchange")
public Exchange getTopicExchange(){
return ExchangeBuilder.topicExchange(TOPIC_NAME).durable(true).build(); // 它是這種構建者的方式構建來指定 是否持久化 是否自動刪除 ...
}
//2. 創建queue
@Bean("queue")
public Queue getQueue(){
return QueueBuilder.durable(QUEUE_NAME).build(); // 他也是這種構建者
}
//3. 綁定在一起【創建Binding】
@Bean
public Binding getBinding(@Qualifier("exchange")Exchange exchange,@Qualifier("queue")Queue queue){ // 這個配置類可能又很多交換機和配置類,所以一般都會用 @Bean注解指定它的名稱。而這里在綁定的時候,也要區分不同的交換機與隊列.
// .bind() 綁定哪個隊列 .to() 到哪個交換機 .with() 指定routingKey .noargs() 不需要指定參數-如果不寫這個也代表不需要指定參數
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*").noargs();
}
}
7.4 發布者發布消息到rabbitMQ
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() throws IOException {
// 參數1:指定交換機 參數2:指定routingKey 參數3:指定發送的數據內容
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!");
}
}
7.5 消費者監聽rabbitMQ接收消息
// 編寫監聽類
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue") // 指定要監聽的隊列
public void getMessage(Message message) throws IOException { // 將來來消息了,就會被這個Message對象接收
System.out.println("接收到消息:" + message);
}
}
8. 持久化
思考幾個問題?
- 如果消息已經到達了RabbitMQ,還沒發送給消費者時,RabbitMQ宕機了,消息會丟失嗎?————————————————————不會,因為RabbitMQ的隊列有持久化機制,若消息到了RabbitMQ已經到了隊列那里了,就能持久化。當RabbitMQ重連的時候消息就能發送給消費者了。
- 消費者在消費消息的時候,還沒消費完,此時消費者宕機了,消息會丟失嗎?————————————————————不會,因為RabbitMQ提供了手動ACK。當成功消費完消息的時候再手動ACK告訴生產者我消費完了。
- 生產者在發布消息到RabbitMQ的交換機時,由於網絡問題,導致沒有真發送成功到交換機,消息會丟失嗎?————————————————————會,因為生產者已經執行了發布消息的方法,就會認為已經發布過去了。可利用Confirm(確認)機制實現或利用其提供的事務操作機制【影響效率,這里不介紹它】。 PS:Confirm機制是保證了消息發送到Exchange上。而消費者監聽的不是Exchange,而是隊列。
- 生產者成功發布消息到交換機了,但是交換機分發消息到隊列的時候出現了問題,導致沒有真分發成功,消息會丟失嗎?————————————————————會,因為消費者是與隊列交互的,如果消息沒有分發到隊列,隊列就沒有消息。可利用Return機制實現。 PS:Return機制是保證Exchange的消息分發到隊列。
持久化可以提高 RabbitMQ 的可靠性,以防在異常情況(重啟、關閉、宕機等)下的數據丟失。
持久化可分為以下幾種情況:
- 交換機的持久化
- 隊列的持久化
- 消息的持久化
8.1 交換機的持久化
交換器的持久化是在聲明交換器的時候,將 durable 屬性設置為 true。如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換器就會被刪除。對於長期使用的交換器來說,建議將其置為持久化。
//原生Api
/**
* 參數1:交換機名稱
* 參數2:交換機類型
* 參數3:是否持久化 默認 false
*/
channel.exchangeDeclare("logs_direct", BuiltinExchangeType.DIRECT,true);
// springboot方式
@Bean
public TopicExchange payTopicExchange(){
/**
* 參數1:交換機名稱
* 參數2:是否持久化 true是, 默認為 true
* 參數3:是否自動刪除 true是, 默認為 false
*/
return new TopicExchange(exchangeMame,true,false);
}
8.2 隊列的持久化
隊列的持久化也是在聲明隊列的時候,將durable參數設置為true。如果隊列不設置持久化,那么 RabbitMQ服務重啟之后,隊列就會被刪除,既然隊列都不存在了,隊列中的消息也會丟失。
// 原生Api
/**
* 參數1:String queue 隊列名稱 如果隊列不存在會自動創建
* 參數2:boolean durable 隊列是否持久化 true 持久化 false 不持久化 默認:false
* 參數3:boolean exclusive 是否獨占隊列 true 獨占隊列 false 不獨占 默認:true
* 參數4:boolean autoDelete 是否在消費完成后自動刪除 true 自動刪除 默認:true
* 參數5:Map<String, Object> arguments 額外附加參數
*/
channel.queueDeclare("hello-1",true,false,false,null);
@Bean
public Queue dlQueue(){
/**
* 參數1:隊列名稱
* 參數2:是否持久化 默認:true
*/
return new Queue(dlQueue,true);
}
8.3 消息的持久化
隊列的持久化能保證其本身不會因重啟、關閉、宕機的情況而丟失,但是並不能保證內部所存儲的消息不會丟失。要確保消息不會丟失,需要將消息設置為持久化。
在發送消息的時候,通過將BasicProperties中的屬性deliveryMode(投遞模式)設置為 2 即可實現消息的持久化。
// 原生Api
channel.basicPublish("exchangeName" , "routingKey",
new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build(),
"ddf".getBytes());
// springboot方式
MessagePostProcessor messagePostProcessor = message -> {
MessageProperties messageProperties = message.getMessageProperties();
//設置消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};
rabbitTemplate.convertAndSend("exchangeName","routingKey","消息內容",messagePostProcessor);
可以將所有的消息都設置為持久化,但是這樣會嚴重影響 RabbitMQ 的性能。寫入磁盤的速度比寫入內存的速度慢得不只一點點。對於可靠性不是那么高的消息可以不采用持久化處理,以提高整體的吞吐量。在選擇是否要將消息持久化時,需要在可靠性和吞吐量之間做權衡。
一般的系統也用不到對消息進行持久化。不過交換機和隊列的持久化還是要支持的。
上述幾種方式我們只是保證了消息發送到交換機、隊列時不會由於RabbitMQ的重啟、關閉、宕機的情況而丟失消息。但如果消費者在消費的時候出現問題了呢?
對於消費者來說,如果在訂閱消息的時候,將autoAck設置為了true,消費者接收到相關消息后,還沒有正式處理消息邏輯之前,就出現了異常掛掉了,但消息已經被自動確認了,這樣也算數據丟失。
對此有如下幾種方式解決:
1. 可以用手動Ack,消費者成功消費后告訴mq我成功消費了。
2. 將消息重試並設置死信隊列
9. SpringBoot實現手動ACK
RabbitMQ的確認機制是自動確認的,消費者收到消息后立馬確認。自動確認可能會出現消費者最后沒有成功消費信息的可能,所以我們一般需要手動確認,在成功消費后再告訴MQ。
如果消費者在消費過程中,出現了異常,我們可以調用basicNack或basicReject拒絕消息,讓MQ重新發送。
在上述操作基礎上更改
8.1 更改配置文件
spring:
rabbitmq:
host: 47.174.116.28
port: 5672
username: admin
password: admin
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 默認是auto manual:表示手動ACK
8.2 更改消費者
如果此時不更改消費者,雖然消費者能拿到消息消費,但是在MQ中會顯示該消息未被消費。
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Message message, Channel channel,String msg) throws IOException {
System.out.println("接收到消息:" + message);
// ======== 手動ACK 告訴rabbitMQ我消費成功 ===========
// void basicAck(long deliveryTag, boolean multiple) throws IOException; 用於確認當前消息
// deliveryTag :消息的編號 multiple:是否批量進行簽收。設置為 false 時,則表示確認消費編號為 deliveryTag 的這一條消息,該參數為 true 時,則可以一次性確認消費小於等於 deliveryTag 值的所有消息。
// void basicNack(long deliveryTag, boolean multiple, boolean requeue); 用於否打當前消息
// deliveryTag:消息的編號
// multiple:設置為 false 時,則表示拒絕編號為 deliveryTag 的這一條消息,這時候 basicNack 方法和 basicReject 方法一樣, multiple 參數設置為 true 則表示拒絕小於 deliveryTag 編號之前所有未被當前消費者確認的消息。
// 如:channel.BasicNack(3, true, false); 第一個參數DeliveryTag中如果輸入3,則消息DeliveryTag小於等於3的,這個Channel的,都會被拒收
// requeue:參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果 requeue 參數設置為 false,則 RabbitMQ 會立即把消息從隊列中移除。
// void basicReject(long deliveryTag, boolean requeue); 用於拒絕當前消息
// deliveryTag: 消息的編號 requeue: 參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果 requeue 參數設置為 false,則 RabbitMQ 會立即把消息從隊列中移除。
// 注:Basic.Reject命令一次只能拒絕一條消息,如果想要批量拒絕消息,則可以使用 Basic.Nack 這個命令。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
10. 如何保證RabbitMQ的可靠性【confirm機制與return機制】
10.1 可靠性
這里的可靠性是指:保證生產者每次消息最終都能成功發送給消費者。
實際上消息隊列可以說是沒法百分之百保證可靠性的!RabbitMQ 提供的相關機制也只是在於縮小消息丟失的概率,或者說提供了消息丟失后的我們可以記錄日志的功能。 在解決這些問題時有必要明白一點,其實小公司業務量不大,並發量不高的情況下這些問題是幾乎不會發生的......即使偶爾出現,開發人員手動修復數據處理就好。所以可結合公司實際業務場景看有沒有必要解決這些問題。
思考幾個問題?
- 如果消息已經到達了RabbitMQ,還沒發送給消費者時,RabbitMQ宕機了,消息會丟失嗎?————————————————————不會,因為RabbitMQ的隊列有持久化機制,若消息到了RabbitMQ已經到了隊列那里了,就能持久化。當RabbitMQ重連的時候消息就能發送給消費者了。
- 消費者在消費消息的時候,還沒消費完,此時消費者宕機了,消息會丟失嗎?————————————————————不會,因為RabbitMQ提供了手動ACK。當成功消費完消息的時候再手動ACK告訴生產者我消費完了。
- 生產者在發布消息到RabbitMQ的交換機時,由於網絡問題,導致沒有真發送成功到交換機,消息會丟失嗎?————————————————————會,因為生產者已經執行了發布消息的方法,就會認為已經發布過去了。可利用Confirm(確認)機制實現或利用其提供的事務操作機制【影響效率,這里不介紹它】。 PS:Confirm機制是保證了消息發送到Exchange上。而消費者監聽的不是Exchange,而是隊列。
- 生產者成功發布消息到交換機了,但是交換機分發消息到隊列的時候出現了問題,導致沒有真分發成功,消息會丟失嗎?————————————————————會,因為消費者是與隊列交互的,如果消息沒有分發到隊列,隊列就沒有消息。可利用Return機制實現。 PS:Return機制是保證Exchange的消息分發到隊列。
10.2 SpringBoot整合RabbitMQ實現可靠性


10.2.1 Java方式實現RabbitMQ的Confirm機制

異步 confirm 方法的編程實現最為復雜,也是最高效的。
在客戶端 Channel 接口中提供的addConfirmListener方法可以添加 ConfirmListener這個回調接口.
這個ConfirmListener 接口包含兩個方法: handleAck、handleNack,分別用來處理 RabbitMQ 回傳的 Basic.Ack、Basic.Nack 。
在這兩個方法中都包含有兩個參數 deliveryTag(標記消息的唯一有序序號) 、multiple(是否批量confirm true代表是)
普通confirm:同步等待確認,簡單,但吞吐量非常有限。
批量confirm:批量同步等待確認,簡單,合理的吞吐量,一旦出現問題但很難推斷出是哪條消息出現了問題。
異步confirm:最佳性能和資源使用,在出現錯誤的情況下可以很好地控制,但是實現起來稍微有些麻煩。
10.2.2 Java方式實現RabbitMQ的Return機制

在客戶端 Channel 接口中提供的addReturnListener方法,可以添加 ReturnListener這個回調接口。
這個ReturnListener接口包含一個方法:handleReturn,用來處理交換機發送消息到隊列失敗,則執行此方法。
/*
* 參數1:響應code
* 參數2:響應文本
* 參數3:交換機名稱
* 參數4:路由key
* 參數5:消息的基本屬性集
* 參數6:消息內容
*/
public void handleReturn(int replyCode, String replyText,
String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
這里的basicPublish()方法要選帶有mandatory的那個重載,並設置為true。mandatory的默認值是:false。
當 mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵找到一個符合條件的隊列時, 那么 RabbitMQ 會調用 Basic.Return 命令將消息返回給生產者。
當 mandatory參數設置為 false 時,出現上述情形,則消息直接被丟棄。
10.2.3 SpringBoot方式實現RabbitMQ的confirm與return機制
編寫配置文件
spring:
rabbitmq:
host: 47.144.116.28
port: 5672
username: admin
password: admin
virtual-host: /test
listener:
simple:
acknowledge-mode: manual # 默認是auto manual:表示手動ACK
publisher-confirm-type: simple # 開啟confirm機制
publisher-returns: true # 開啟return機制
# 關於publisher-confirm-type的取值
NONE: 禁用發布確認模式,是默認值
CORRELATED: 發布消息成功到交換器后會觸發回調方法
SIMPLE: 經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在發布消息成功后使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待
broker節點返回發送結果,根據返回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果返回false則會關閉channel,則接下來無法發送消息到broker。
//創建關於 SpringBoot 實現可靠性 的配置類
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // 表示在初始化PublisherConfirmAndReturnConfig對象時,會執行該方法。
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
// 基於confirm的回調 此方法用於監聽消息是否發送到交換機
// correlationData:對象內部只有一個 id 屬性,用來表示當前消息的唯一性。
// ack:消息投遞到broker 的狀態,true表示成功。
// cause:表示投遞失敗的原因
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已經送達到Exchange");
}else{
System.out.println("消息沒有送達到Exchange");
}
}
@Override // 基於return的回調 // 消息沒有送達隊列時才會執行
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息沒有送達到Queue");
}
}
生產者發布消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() throws IOException {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!");
}
消費者消費消息
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Message message, Channel channel,String msg) throws IOException {
System.out.println("接收到消息:" + message);
// 手動ACK 告訴rabbitMQ我消費成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
11. 如何對消息設置過期時間
https://www.cnblogs.com/itlihao/p/14961534.html
12. 什么是死信交換機與死信隊列
出處:https://juejin.cn/post/6976778266472366087
DLX ,全稱為 Dead-Letter-Exchange ,可以稱之為死信交換機。它其實也是一個正常的交換機,和一般交換機沒什么區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列中存在死信時 RabbitMQ 就會自動地將這個消息新發布到設置的 DLX 上去,進而被路由到另一個隊列,即死信隊列。我們可以監聽這個死信交換機的死信隊列中的消息、以進行相應的處理。
當消息在一個隊列中變成死信之后,如果你對該隊列配置了死信隊列,那么它能被重新發送到另一個交換機中,這個交換器就是 DLX(死信交換機),而於 DLX 綁定的隊列就稱之為死信隊列。
那種消息會變成死信?
- 消息過期,也就是筆者在上篇提到的 TTL。消息在隊列的存活時間超過所設置的 TTL 時間。
- 消息被拒絕。調用了 channel.basicNack 或 channel.basicReject方法,井且設置 requeue 參數為false。 requeue: 參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者;如果 requeue 參數設置為 false,則 RabbitMQ 會立即把消息從隊列中移除。
- 隊列的接收消息數長度達到最大長度。
基於消息過期配置死信隊列案例:
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
key: prod_pay
dlRoutingKey: dl-routing-key
//==============創建死信交換機並於死信隊列進行綁定====================
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
//創建死信交換機 【可以是任意類型的交換機,這里采用的是topic類型的】
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//創建死信隊列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//死信隊列與死信交換機進行綁定
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
//==============創建要執行我們義務的交換機與隊列====================
//==============我們要基於隊列設置過期時間=========================
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
private final String dle = "x-dead-letter-exchange"; // 必須叫這個名稱
private final String dlk = "x-dead-letter-routing-key"; // 必須叫這個名稱
private final String ttl = "x-message-ttl";
/**
* 業務隊列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//設置隊列的過期時間
//隊列中所有消息都有相同的過期時間
params.put(ttl,10000);
//==============================================================聲明當前隊列綁定的死信交換機============================================================================================
params.put(dle,dlTopicExchange);
//聲明當前隊列的死信路由鍵 如果沒有指定,則使用原隊列的路由鍵。因為我們指定的死信交換機是topic,所以會有路由鍵。如果是finaot模式,就可不配路由鍵。
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeName,true,false);
}
//隊列與交換機進行綁定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
// 生產者發送消息
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// 將消息發送給業務交換機
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
啟動服務,可以看到同時創建了業務隊列、業務交換機以及死信隊列、死信交換機。而且可以看到業務隊列上帶了 DLX、DLK標簽。


然后調用接口:http://localhost:8080/?msg=紅紅火火 ,消息會被發送到 prod_queue_pay這

如果 10s 內沒有消費者消費這條消息,那么判定這條消息為過期消息。由於設置了 DLX ,過期時消息被丟給 dlxExchange 交換機中,根據所配置的dlRoutingKey 找到與 dlxExchange 匹配的隊列 dlQueue后,消息被存儲在 dlxQueue這個死信隊列中。

13. RabbitMQ 的重試機制
原文:https://juejin.cn/post/6979390382371143694#heading-0
前言:以下2種情況,是沒有開啟手動ack的前提下。
情況一:消費者在處理消息的過程中可能會發生異常,此時,rabbitMQ會不斷重試。由於沒有我們沒有給rabbirMQ明確重試次數,會造就無限重試,這是一個致命的問題,最終導致宕機。
情況二:手動告訴MQ拒絕消息,channel.basicNack()並設置requeue為true。 requeue:參數設置為 true,則 RabbitMQ 會重新將這條消息存入隊列,以便可以發送給下一個訂閱的消費者。
但是為了保證消息被消費與解決上述問題,我們控制MQ讓它知道他該重試幾次。所以一般有如下方式解決:
- 我們一般用Springboot提供retry功能告訴rabbitMQ重試幾次,還不行最后將配合死信隊列來解決。【默認重試后還是失敗時,會自動刪除該消息,就導致消息丟失了】
- 用redis,使用redis記數,若超過指定次數,直接拒絕消息,並且設置不讓其回到隊列。並把該消息記錄下,后期來由人工處理。==
下面展示retry功能的開啟:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者進行重試 默認情況下是 false ====配置yml重試策略=====
max-attempts: 5 # 最大重試次數
initial-interval: 3000 # 重試時間間隔
default-requeue-rejected: false # 重試次數超過上面的設置后,是否丟棄 默認true,指定為false時,應編寫相應代碼將該消息加入死信隊列
# 重試並不是RabbitMQ重新發送了消息,僅僅是消費者內部進行的重試,換句話說就是重試跟mq沒有任何關系
14. 如何保證RabbitMQ重復消費消息
14.1 為什么要解決重復消費問題
冪等性操作:就是指比如刪除操作,這類操作執行多少次都沒影響。
非冪等性操作:添加操作,而且數據庫還是自增的,這類操作執行多次和執行一次差別是很大的!
所以,針對非冪等性操作,一定要保證消息不被重復消費。
14.2 重復消費消息的原因
- 生產時消息重復
- 消費時消息重復
生產消息時重復生產
原文:https://www.cnblogs.com/zhixie/p/13444213.html
由於生產者發送消息給MQ,在MQ確認的時候出現了網絡波動,生產者沒有收到確認,實際上MQ已經接收到了消息。這時候生產者就會重新發送一遍這條消息。
生產者中如果消息未被確認,或確認失敗,我們可以使用定時任務+(redis/db)來進行消息重試。
消費消息時重復消費
情況一:消費者消費成功后,再給MQ確認的時候出現了網絡波動,MQ沒有接收到確認【手動ack】。為了保證消息被消費(我們配置了重試機制),MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。
情況二:生產者將消息發送給消費者后,斷開了連接,等連接恢復后,生產者又重新發送消息給消費者。這時候消費者就接收到了兩條一樣的消息。
解決思路:
讓每個消息攜帶一個全局的唯一ID,引入redis,在消息被消費之前,將消息的唯一ID放到redis中,並設置它的值,如值為0:正在執行業務,值為1:執行業務成功。
注意:一個比較極端的情況,消費者設置redis值為0后,執行業務,出現死鎖,一直執行下去。所以,我們可以為這個redis設置一個過期時間,比如10秒之后,這個redis就消失。
具體消費過程為:
- 消費者獲取到消息后先根據id去查詢redis/db是否存在該消息
- 如果不存在,則設置redis的值為0(執行業務中,並設置過期時間),消費完畢后設置redis值為1(執行業務成功,並設置過期時間),並ack告訴MQ。
- 如果存在,判斷其狀態,若為1則證明消息被消費過,直接ack,若為0不執行任何操作。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 開啟消費者進行重試
max-attempts: 5 # 最大重試次數
initial-interval: 3000 # 重試時間間隔
redis:
host: 192.168.199.109
port: 6379
// 生產者發布消息
@Test
void contextLoads() throws IOException {
// 用於創建消息的唯一標識
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
// rabbitMQ的convertAndSend() 方法就有一個帶消息唯一標識的重載
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","紅色大狼狗!!",messageId);
System.in.read();
}
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 獲取MessageId 是從消息頭里面的 spring_returned_message_correlation 獲得的
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 設置key到Redis setIfAbsent:就相當於 setnx【在指定的 key 不存在時,為 key 設置指定的值】
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消費消息
System.out.println("接收到消息:" + msg);
//3. 設置key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 獲取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
5. 延遲隊列
延遲隊列:消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。
注意:在RabbitMQ中,沒有延遲隊列的概念,但是我們可以利用ttl和死信隊列達到延遲的效果。這種需求往往在某些應用場景中出現。當然還可以使用插件。

實現原理:
- 生產者生產一個消息到隊列1
- 隊列1中的消息過期轉發到死信隊列
- 消費者獲取死信隊列的信息進行消費
場景舉例:
下訂單后,30分鍾如果還沒支付,則取消訂單回滾庫存。
5.1 使用插件實現延遲隊列
如果我現在有不同的場景,比如分別5s、10s、15s之后延遲消費,那就需要創建三個隊列。每次有一個不同的時間段的需求過來,我都需要創建一個隊列,這肯定不行。
https://juejin.cn/post/6977516798828609567#heading-9
