RabbitMQ消息確定機制(自動ACK和手動ACK)


原文地址:https://blog.csdn.net/qq_29595463/article/details/109527115

文章目錄
1. 消息確定機制及其配置
2. 自動確認和手動確認細節
2.1 測試代碼配置
2.2 自動確認
2.3 手動確認
2.3.1 常用API
2.3.2 代碼案例
3. 消息重回隊列
3.1 默認—消息nack后重回隊頭
3.2 解決方案—消息重發送到隊尾
3.3 如何處理異常消息
推薦閱讀
1. 消息確定機制及其配置
RabbitMq消費者的消息確定機制:

NONE:無應答,rabbitmq默認consumer正確處理所有請求。
AUTO:consumer自動應答,處理成功(注意:此處的成功確認是沒有發生異常)發出ack,處理失敗發出nack。rabbitmq發出消息后會等待consumer端應答,只有收到ack確定信息后才會將消息在rabbitmq清除掉。收到nack異常信息的處理方法由setDefaultRequeueReject()方法設置,這種模式下,發送錯誤的消息可以恢復。
MANUAL:基本等同於AUTO模式,區別是需要人為調用方法確認。
在配置文件中:

spring:
profiles:
active: test
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
template:
receive-timeout: 2000
reply-timeout: 2000
listener:
simple:
concurrency: 1
max-concurrency: 3
# 消費者預取1條數據到內存,默認為250條
prefetch: 1
# 確定機制
acknowledge-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
可以進行配置,也可以在代碼中進行配置:

@Configuration
public class RabbitConfig {

@Autowired
private CachingConnectionFactory connectionFactory;

@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
/* setConnectionFactory:設置spring-amqp的ConnectionFactory。 */
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(1);
factory.setDefaultRequeueRejected(true);
//手動確認。
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2. 自動確認和手動確認細節
2.1 測試代碼配置
隊列配置:

@Configuration
public class RabbitMqConfig {
// 簡單的聲明一個隊列
@Bean
public Queue kinsonQueue1() {
return new Queue("kinson1");
}
}
1
2
3
4
5
6
7
8
生產者配置:

@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
//直接向隊列中發送數據
@GetMapping("send")
public String send() {
for (int i = 0; i < 10; i++) {
String content = "Date:" + System.currentTimeMillis();
content = content + ":::" + i;
rabbitTemplate.convertAndSend("kinson1",i+"");
}
return "success";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
消費者配置

@Component
@Slf4j
public class CustomerRev {

//消費者處理消息緩慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
Thread.sleep(30000);
log.info("【結束】:{}", message);
} catch (Exception e) {
log.error("錯誤信息:{}", e.getMessage());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2.2 自動確認
生產者產生10筆消息,自動確認模式下,消息處理成功,消費者才會去獲取下一筆消息;消息處理拋出異常,那么將會消息重回隊列。

 

注:此時的prefetch=1,該消費者unack的消息只有一條。其余9條均在MQ中。

若是設置prefetch=250(默認),那么消費者會預期10條消息,unack的消息是10條,但是日志中依舊只是消費了一條消息。

【開始】:Date:1604557983496:::0
1


**總結:**對於自動確認來說,當方法沒有異常執行完畢后,會對MQ發出ACK;若方法出現異常,會對MQ發出nack,消息會重回隊列。要分清哪些是可以恢復的異常,哪些是不可以恢復的異常。不可恢復的異常,在消費者代碼中捕獲異常,並記錄日志表或放入死信隊列。可恢復的異常,那么放入業務隊列中重試。

2.3 手動確認
2.3.1 常用API
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息

channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
Reject表示拒絕消息。requeue:false表示被拒絕的消息是丟棄;true表示重回隊列

channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
nack表示拒絕消息。multiple表示拒絕指定了delivery_tag的所有未確認的消息,requeue表示不是重回隊列

2.3.2 代碼案例
案例:消費者只有發送ACK后才會開始消費下一條消息。

@Component
@Slf4j
public class CustomerRev {

//消費者處理消息緩慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
Thread.sleep(1000);
}else {
Thread.sleep(300000000L);
}
log.info("【結束】:{}", message);
//ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
log.error("錯誤信息:{}", e.getMessage());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
3. 消息重回隊列
自動確認模式下拋出異常,和手動確認模式下nack消息都可以重回隊列。

3.1 默認—消息nack后重回隊頭
@Component
@Slf4j
public class CustomerRev {

//消費者處理消息緩慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消費異常");
}
log.info("【結束】:{}", message);
//ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
(自動ACK下)當消息為0時,拋出異常,生產者產生10筆消息。

 

代碼輪詢打印0的消息的消費異常,其他消息均不消費。


當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,這時消費者會立馬又接收到這條消息進行處理,接着拋出異常,進行 回滾,如此反復進行。這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。

3.2 解決方案—消息重發送到隊尾
//消費者處理消息緩慢
@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消費異常");
}
log.info("【結束】:{}", message);
//ack表示確認消息。multiple:false只確認該delivery_tag的消息,true確認該delivery_tag的所有消息
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
} catch (Exception e) {
//捕獲異常后,重新發送到指定隊列,自動ack不拋出異常即為ack
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
msg.getBody());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
手動確認的模式:

//手動進行應答
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//重新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(new Object()));
1
2
3
4
5
6


這個不會導致其他消息被阻塞消費,但是會大量的拋出:

 

3.3 如何處理異常消息
如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。解決這個問題可以采取兩種方案:

1.一種是對於日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。對於可以恢復的異常我們采取第三條中的解決方案,對於不可以處理的異常,我們采用記錄日志,直接丟棄該消息方案。

2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,處理次數到達我們設置的值時,我們就丟棄該消息,但需要記錄詳細的日志。

將業務隊列綁定死信隊列,當消息被丟棄后,進入到死信隊列(代碼修復后監聽死信隊列補償消息)。可以避免我們手動的恢復消息。

重試機制(自動確認模式):

@Component
@Slf4j
public class CustomerRev {

@RabbitListener(queues = {"kinson1"})
public void receiver3(Message msg, Channel channel) throws IOException {
try {
//打印數據
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【開始】:{}",message);
if("0".equals(message)){
throw new RuntimeException("0的消息消費異常");
}
log.info("【結束】:{}", message);
} catch (Exception e) {
//捕獲異常后,重新發送到指定隊列,自動確認不拋出異常即為ack
Integer retryCount;
Map<String, Object> headers = msg.getMessageProperties().getHeaders();
if(!headers.containsKey("retry-count")){
retryCount=0;
}else {
retryCount = (Integer)headers.get("retry-count");
}
//判斷是否滿足最大重試次數(重試3次)
if(retryCount++<3) {
headers.put("retry-count",retryCount);
//重新發送到MQ中
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().contentType("text/plain").headers(headers).build();
channel.basicPublish(msg.getMessageProperties().getReceivedExchange(),
msg.getMessageProperties().getReceivedRoutingKey(), basicProperties,
msg.getBody());
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
每次收到消息后,修改消息的header信息。來記錄重試的次數。

推薦閱讀
rabbitmq消息重回隊列
————————————————
版權聲明:本文為CSDN博主「小胖學編程」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_29595463/article/details/109527115


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM