前言:
本章主要解决MQ中一般情况下丢数据要采取的措施以及Confirm的应用
一、简介
在RabbitMQ中,消息确认主要有生产者发送确认和消费者接收确认:
(一)生产者发送确认:指生产者发送消息后到RabbitMQ服务器,如果RabbitMQ服务器收到消息,
则会给我们生产者一个应答,用于告诉生产者该条消息已经成功到达RabbitMQ服务器中。

(二)消费者接收确认:用于确认消费者是否成功消费了该条消息。
消息确认的实现方式主要有两种,一种是通过事务的方式(channel.txSelect()、channel.txCommit()、
channel.txRollback()),另外一种是confirm确认机制。因为事务模式比较消耗性能,在实际工作中也用的不多,
这里主要介绍通过confirm机制来实现消息的确认,保证消息的准确性。
因此,按照博主的经验,生产上用confirm模式的居多。
一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),
一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),
这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个
Nack消息给你,你可以进行重试操作。
二、生产者发送确认
在RabbitMQ中实现生产者发送确认的方法(本文使用springcloud项目),主要有两点:
【a】配置文件中配置消息发送确认
spring rabbitm qpublisher-confirms true
【b】生产者实现 RabbitTemplate.ConfirmCallback接口,重写方法
confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
当然也可以通过注入的方式自定义confirm listener.
@Component public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{ @Override public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) { ......... } }
三、消费者接收确认
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。
确认模式主要分为下面三种:
AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
注意:在springcloud项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式:
spring: application: name: consumer1-service rabbitmq: listener: simple: acknowledge-mode: manual
手动确认与自动确认的区别:
自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。
这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失。
手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),
RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。
这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况。
(一)手动确认主要使用的方法有下面几个:
public void basicAck(long deliveryTag, boolean multiple):
deliveryTag 表示该消息的index(long类型的数字);
multiple 表示是否批量(true:将一次性ack所有小于deliveryTag的消息);
如果成功消费消息,一般调用下面的代码用于确认消息成功处理完
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
public void basicNack(long deliveryTag, boolean multiple, boolean requeue):
告诉服务器这个消息我拒绝接收,basicNack可以一次性拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);
requeue:指定被拒绝的消息是否重新回到队列;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
public void basicReject(long deliveryTag, boolean requeue):也是用于拒绝消息,
但是只能拒绝一条消息,不能同时拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
requeue:指定被拒绝的消息是否重新回到队列;
四、案例
为了更和实际开发接近,本案例创建服务中心、生产者、两个消费者
(一)服务中心
1、依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.lanpo</groupId> <artifactId>fenbushiserver</artifactId> <version>0.0.1-SNAPSHOT</version> <name>fenbushiserver</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR3</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.配置yml
server: port: 8081 eureka: instance: hostname: localhost client: registerWithEureka: false fetchRegistry: false serviceUrl: defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
3.在启动类上加注解@EnableEurekaServer
(二)生产者
1、添加pom.xml依赖文件
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
2添加配置
spring: application: name: product-service rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / #消息发送确认回调,不加不执行confirm方法 publisher-confirms: true #发送返回监听回调 publisher-returns: true #手动回复ack listener: simple: acknowledge-mode: manual server: port: 8082 #指向注册中心8081 eureka: client: service-url: defaultZone: http://localhost:8081/eureka/
3.自定义消息发送,返回回调监听(初始化rabbitTemplate)
package com.lanpo.fenbushiproduct.config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.nio.charset.StandardCharsets; /* ConfirmCallback:只确认消息是否正确到达交换机,不管是否正确到达,该回调都会执行 ReturnCallback:如果消息未正确从交换机到达队列中将会执行,正确到达不会执行 */ @Component public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { private static final Logger Log = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class); @Autowired private RabbitTemplate rabbitTemplate; //用于在依赖关系注入完成之后,需要执行的方法上,以执行任何初始化 @PostConstruct public void init() { //setConfirmCallback(ConfirmCallback confirmCallback) rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 如果消息没有到达交换机,则该方法中ack = false,error为错误信息; * 如果消息正确到达交换机,则该方法中ack = true; */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { Log.info("confirm 回调方法,回调消息id:" + correlationData.getId()); if (ack) { Log.info("confirm 消息发送到交换机成功。。"); } else { Log.info("confirm 消息发送到交换机失败,原因是:[{}]", cause); } } /** * 消息从交换机成功到达队列,则returnedMessage方法不会执行; * 消息从交换机未能成功到达队列,则returnedMessage方法会执行; */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { Log.info("returnedMessage 消息没有达到队列: " + new String(message.getBody(), StandardCharsets.UTF_8) + " ," + " replyCode= " + replyCode + " , " + " replyText= " + replyText, " exchange= " + exchange + " routingKey= " + routingKey); } }
4.配置rabbitmq配置类
package com.lanpo.fenbushiproduct.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMqConfig { //队列和交换机 public static final String TRAN_TOPIC_QUEUE="TRAN_QUEUE"; public static final String TRAN_TOPIC_EXCHANGE="TRAN_CHANGE"; public static final String ROUTING_KEY = "lanpo.#"; @Bean(TRAN_TOPIC_EXCHANGE)//给每一个bean命名可以在本来配置多个 public Exchange EXCHAGE_TOPIC_TRAN(){ //durable持久化,这是交换机名称持久化 return ExchangeBuilder.topicExchange(TRAN_TOPIC_EXCHANGE).durable(true).build(); } //声明队列 @Bean(TRAN_TOPIC_QUEUE)//给每一个bean命名可以在本来配置多个 public Queue TRAN_TOPIC_QUEUE(){ return new Queue(TRAN_TOPIC_QUEUE); } //绑定交换机和队列 @Bean public Binding BINDING_CHANGE_AND_QUEUE(@Qualifier(TRAN_TOPIC_QUEUE) Queue queue ,@Qualifier(TRAN_TOPIC_EXCHANGE) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs(); } }
4.发送消息服务
package com.lanpo.fenbushiproduct.service; import com.lanpo.fenbushiproduct.config.CustomConfirmAndReturnCallback; import com.lanpo.fenbushiproduct.config.RabbitMqConfig; import com.sun.javafx.scene.control.ReadOnlyUnbackedObservableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; import java.util.UUID; @Service public class HelloSend { private static final Logger Log = LoggerFactory.getLogger(HelloSend.class); public static final String ROUTING_KEY = "lanpo.sha"; @Autowired RabbitTemplate rabbitTemplate; public void sendMyMessage() { for (int i = 0; i <= 3; i++) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); Log.info("【product】发送的消息ID:" + correlationData.getId()); String message = " hello lan po " + i; Log.info("【product】发送的消息:{} ", message); rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE , ROUTING_KEY, message, correlationData); } } }
(三)消费者
1.配置
server: port: 8083 spring: application: name: consumer1-service rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / #手动回复ack listener: simple: acknowledge-mode: manual eureka: client: service-url: defaultZone: http://localhost:8081/eureka/
2.pom.xml文件
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
2.接收消息
package com.lanpo.fenbushiconsumer.config; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ReceviceMessage { private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class); //监听某一个队列 @RabbitListener(queues = "TRAN_QUEUE") public void getRecevice1(String msg, Message message, Channel channel) throws IOException { try { Log.info("【Consumer1】收到消息 " + msg); //确认收到消息,确认当前消费者的一个消息收到 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { Log.info("【Consumer1】的该消息被拒绝过,不再放回到队列:{}", message); //被拒绝一次之后,不再放到队列中 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { Log.info("【Consumer1】的该消息第一次被拒绝,再放回到队列:{}", message); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } e.printStackTrace(); } } }
(四)、正常调用验证
1.正常的调用

2.查看消费端

(五)、当Routing key 不正确导致消息未从交换机到达队列。
1.解除交换机和routing key解绑

2.修改routing key
@Service public class HelloSend { private static final Logger Log = LoggerFactory.getLogger(HelloSend.class); public static final String ROUTING_KEY = "lanpo2.sha"; @Autowired RabbitTemplate rabbitTemplate; public void sendMyMessage() { for (int i = 0; i <= 3; i++) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); Log.info("【product】发送的消息ID:" + correlationData.getId()); String message = " hello lan po " + i; Log.info("【product】发送的消息:{} ", message); rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE , ROUTING_KEY, message, correlationData); } } }
3.四个消息未达到队列

(六)、当交换机不正确时
@Service public class HelloSend { private static final Logger Log = LoggerFactory.getLogger(HelloSend.class); public static final String ROUTING_KEY = "lanpo.sha"; @Autowired RabbitTemplate rabbitTemplate; public void sendMyMessage() { for (int i = 0; i <= 3; i++) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); Log.info("【product】发送的消息ID:" + correlationData.getId()); String message = " hello lan po " + i; Log.info("【product】发送的消息:{} ", message); rabbitTemplate.convertAndSend("CHANGE_ERROR" , ROUTING_KEY, message, correlationData); } } }

这里主要修改一个不存在的交换机名称,这样消息就不能正确到达消费者监听队列所在的交换机
CHANGE_ERROR,从而触发confirmCallback中发送失败的情况,error为错误原因。
(七)、创建Consumer2消费2服务
1.
server: port: 8084 spring: application: name: consumer2-service rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / #手动回复ack listener: simple: acknowledge-mode: manual eureka: client: service-url: defaultZone: http://localhost:8081/eureka/
2.其他参考消费者1
package com.lanpo.fenbushiconsumer.config; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class ReceviceMessage { private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class); //监听某一个队列 @RabbitListener(queues = "TRAN_QUEUE") public void getRecevice2(String msg, Message message, Channel channel) throws IOException { try {//报错 String po=null; po.hashCode(); //确认收到消息,确认当前消费者的一个消息收到 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { Log.info("【Consumer2】的该消息被拒绝过,不再放回到队列:{}", message); //被拒绝一次之后,不再放到队列中 channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } else { Log.info("【Consumer2】的该消息第一次被拒绝,再放回到队列:{}", message); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } e.printStackTrace(); } } }

五、防止消息丢失
(一)、生产者丢数据问题
confirm模式是解决生产者消息丢失的有效手段
(二)消息队列数据消失
处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以
和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘
之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。
那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步
①、将queue的持久化标识durable设置为true,则代表是一个持久的队列
②、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,
可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,
但也不能保证消息百分百不丢失(整个集群都挂掉)
/** * 第二个参数:queue的持久化是通过durable=true来实现的。 * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点: 1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列; 2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同; 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。 * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 * @param * @return * @Author zxj */ @Bean public Queue queue() { Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 25000);//25秒自动删除 Queue queue = new Queue("topic.messages", true, false, true, arguments); return queue; }
MessageProperties properties=new MessageProperties(); properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE); properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化设置 properties.setExpiration("2018-12-15 23:23:23");//设置到期时间 Message message=new Message("hello".getBytes(),properties); this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);
(本章案例已经实现了消息持久化)
(三)、消费者丢数据
启用手动确认模式可以解决这个问题 ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。
不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让 消息不断重试。 ②手动确认模式 ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。
指定Acknowledge的模式: spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示该监听器手动应答消息 针对手动确认模式,有以下特点: 1.使用手动应答消息,有一点需要特别注意,那就是不能忘记应答消息,因为对于RabbitMQ来说处理消息没有超时,
只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响 业务执行。 2.如果消费者来不及处理就死掉时,没有响应ack时,会项目启动后会重复发送一条信息给其他消费者; 3.可以选择丢弃消息,这其实也是一种应答,如下,这样就不会再次收到这条消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 4.如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的 5.如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。
重试机制:
spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重试次数 spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息) spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒) spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
如果设置了重试模式,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。
当出现异常时,我们需要把这个消息回滚到消息队列,有两种方式:
//ack返回false,并重新回到队列,api里面解释得很清楚
//ack返回false,并重新回到队列,api里面解释得很清楚 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //拒绝消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
经过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,
这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行回滚,如此反复进行。
这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。对于消息回滚到消息队列,
我们希望比较理想的方式时出现异常的消息到 达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,
因此我们采取的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息
到消息队列,这时就实现了错误消息进行消息队列尾部的方案。
//手动进行应答 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //重新发送消息到队尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(new Object()));
如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。
解决这个问题可以采取两种方案:
1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。
对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,
直接丢弃该消息方案。
2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,
处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。
消息监听内的异常处理有两种方式:
1.内部catch后直接处理,然后使用channel对消息进行确认
①catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
②给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败
