文章目錄
1. 事務機制2. Confirm模式2.1 生產者2.1.1 普通Confirm模式2.1.2 批量Confirm模式2.1.3 異步Confirm模式2.2 消費者3. 其他
消費者如何確保消息一定能夠消費成功呢?
由於在前面工作隊列模式里面我們了解了應答模式,所以我們可以很自信的回答如上題目。
通過應答形式,默認自動應答,可以修改為手動應答來保證消息消費成功。
其實應答形式就是 RabbitMQ 消息確認機制的一種體現,我們再來看看問題的產生背景:
生產者發送消息出去之后,不知道到底有沒有發送到 RabbitMQ 服務器, 默認是不知道的。而且有的時候我們在發送消息之后,后面的邏輯出問題了,我們不想要發送之前的消息了,需要撤回該怎么做。
兩種解決方案:
- AMQP 事務機制
- Confirm 模式
1. 事務機制
事務機制分為三部分,開啟事務,提交事務,事務回滾,如下:
- txSelect 將當前 channel 通道設置為 transaction 模式(開啟事務)
- txCommit 提交當前事務
- txRollback 事務回滾
我們通過一個例子模擬消息生產者發送消息過程發生異常,進行事務回滾的過程。
public class Producer {
/** 隊列名稱 */
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.獲取連接 */
Connection newConnection = MQConnectionUtils.newConnection();
/** 2.創建通道 */
Channel channel = newConnection.createChannel();
/** 3.創建隊列聲明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.發送消息 */
try {
/** 4.1 開啟事務 */
channel.txSelect();
String msg = "我是生產者生成的消息";
System.out.println("生產者發送消息:"+msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
/** 4.2 提交事務 - 模擬異常 */
int i = 1/0;
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
System.out.println("發生異常,我要進行事務回滾了!");
/** 4.3 事務回滾 */
channel.txRollback();
}finally {
channel.close();
newConnection.close();
}
}
}
打印結果:
生產者發送消息:我是生產者生成的消息
java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37)
發生異常,我要進行事務回滾了!
2. Confirm模式
像上方這種采用 AMQP 事務機制來保證消息的准確到達,在一定程度上是消耗了性能的,所以我們再來看看 Confirm 模式。
Confirm 模式分為兩塊,一是生產者的 Confirm 模式,再就是消費者的 Confirm 模式。
2.1 生產者
通過生產者的確認模式我們是要保證消息准確達到客戶端,而與 AMQP 事務不同的是 Confirm 是針對一條消息的,而事務是可以針對多條消息的。
Confirm 模式最大的好處在於它是異步的,一旦發布一條消息,生產者應用程序就可以在等信道返回確認的同時繼續發送下一條消息,當消息最終得到確認之后,生產者應用便可以通過回調方法來處理該確認消息。
Confirm 的三種實現方式:
- channel.waitForConfirms() 普通發送方確認模式;
- channel.waitForConfirmsOrDie() 批量確認模式;
- channel.addConfirmListener() 異步監聽發送方確認模式
2.1.1 普通Confirm模式
public class Producer11 {
/** 隊列名稱 */
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.獲取連接 */
Connection newConnection = MQConnectionUtils.newConnection();
/** 2.創建通道 */
Channel channel = newConnection.createChannel();
/** 3.創建隊列聲明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.開啟發送方確認模式 */
channel.confirmSelect();
/** 5.發送消息 */
for (int i = 0; i < 5; i++) {
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條消息").getBytes());
try {
if (channel.waitForConfirms()) {
System.out.println("發送成功");
}else{
System.out.println("進行消息重發");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** 5.關閉通道、連接 */
channel.close();
newConnection.close();
}
}
在推送消息之前,channel.confirmSelect() 聲明開啟發送方確認模式,再使用channel.waitForConfirms() 等待消息被服務器確認即可。
2.1.2 批量Confirm模式
public class Producer22 {
/** 隊列名稱 */
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
/** 1.獲取連接 */
Connection newConnection = MQConnectionUtils.newConnection();
/** 2.創建通道 */
Channel channel = newConnection.createChannel();
/** 3.創建隊列聲明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.開啟發送方確認模式 */
channel.confirmSelect();
/** 5.發送消息 */
for (int i = 0; i < 5; i++) {
channel.basicPublish("", QUEUE_NAME, null, (" Confirm模式, 第" + (i + 1) + "條消息").getBytes());
}
/** 6.直到所有信息都發布,只要有一個未確認就會IOException */
channel.waitForConfirmsOrDie();
System.out.println("全部執行完成");
/** 5.關閉通道、連接 */
channel.close();
newConnection.close();
}
}
channel.waitForConfirmsOrDie() 使用同步方式等所有的消息發送之后才會執行后面代碼,只要有一個消息未被確認就會拋出 IOException 異常。
2.1.3 異步Confirm模式
public class Producer33 {
/** 隊列名稱 */
private static final String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
/** 1.獲取連接 */
Connection newConnection = MQConnectionUtils.newConnection();
/** 2.創建通道 */
Channel channel = newConnection.createChannel();
/** 3.創建隊列聲明 */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** 4.開啟發送方確認模式 */
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
String message = "我是生產者生成的消息:" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
/** 5.發送消息 異步監聽確認和未確認的消息 */
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未確認消息,標識:" + deliveryTag);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple));
}
});
/** 6.關閉通道、連接 */
/** channel.close();*/
/** newConnection.close();*/
}
}
異步模式的優點,就是執行效率高,不需要等待消息執行完,只需要監聽消息即可,以上異步返回的信息如下:

可以看出,代碼是異步執行的,消息確認有可能是批量確認的,是否批量確認在於返回的 multiple 的參數,此參數為 bool 值,如果 true 表示批量執行了 deliveryTag 這個值以前的所有消息,如果為 false 的話表示單條確認。
維持異步調用要求我們不能斷掉連接,因此注釋掉第6步。
2.2 消費者
為了保證消息從隊列可靠地到達消費者,RabbitMQ 提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,可以指定 noAck 參數,當 noAck=false 時, RabbitMQ 會等待消費者顯式發回ack信號后才從內存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ 會在隊列中消息被消費后立即刪除它。
在消費者中 Confirm 模式又分為手動確認和自動確認。
關於兩者的介紹:
自動確認: 在自動確認模式下,消息在發送后立即被認為是發送成功。 這種模式可以提高吞吐量(只要消費者能夠跟上),不過會降低投遞和消費者處理的安全性。 這種模式通常被稱為“發后即忘”。 與手動確認模式不同,如果消費者的TCP連接或信道在成功投遞之前關閉,該消息則會丟失。
手動確認: 使用自動確認模式時需要考慮的另一件事是消費者過載。 手動確認模式通常與有限的信道預取一起使用,限制信道上未完成(“進行中”)傳送的數量。 然而,對於自動確認,根據定義沒有這樣的限制。 因此,消費者可能會被交付速度所壓倒,可能積壓在內存中,堆積如山,或者被操作系統終止。 某些客戶端庫將應用TCP反壓(直到未處理的交付積壓下降超過一定的限制時才停止從套接字讀取)。 因此,只建議當消費者可以有效且穩定地處理投遞時才使用自動投遞方式。
綜上:盡量選擇手動確認方式。
主要實現代碼:
// 手動確認消息
channel.basicAck(envelope.getDeliveryTag(), false);
// 關閉自動確認
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
3. 其他
1、如果 RabbitMQ 服務器宕機了,消息會丟失嗎?
不會丟失,RabbitMQ 服務器支持消息持久化機制,會把消息持久化到硬盤上。
2、如何確保消息正確地發送至RabbitMQ?
RabbitMQ 使用發送方確認模式,確保消息正確地發送到 RabbitMQ。
發送方確認模式:將信道設置成 confirm 模式(發送方確認模式),則所有在信道上發布的消息都會被指派一個唯一的ID。一旦消息被投遞到目的隊列后,或者消息被寫入磁盤后(可持久化的消息),信道會發送一個確認給生產者(包含消息唯一ID)。如果RabbitMQ發生內部錯誤從而導致消息丟失,會發送一條nack(not acknowledged,未確認)消息。
發送方確認模式是異步的,生產者應用程序在等待確認的同時,可以繼續發送消息。當確認消息到達生產者應用程序,生產者應用程序的回調方法就會被觸發來處理確認消息。
我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下微信公眾號哈:niceyoo
