分布式Confirm模式解决消息丢失问题


前言:

      本章主要解决MQ中一般情况下丢数据要采取的措施以及Confirm的应用

 一、简介

      在RabbitMQ中,消息确认主要有生产者发送确认和消费者接收确认

(一)生产者发送确认:指生产者发送消息后到RabbitMQ服务器,如果RabbitMQ服务器收到消息

会给我们生产者一个应答,用于告诉生产者该条消息已经成功到达RabbitMQ服务器中。

 (二)消费者接收确认:用于确认消费者是否成功消费了该条消息。

       消息确认的实现方式主要有两种,一种是通过事务的方式(channel.txSelect()、channel.txCommit()、

channel.txRollback()),另外一种是confirm确认机制。因为事务模式比较消耗性能,在实际工作中也用的不多,

这里主要介绍通过confirm机制来实现消息的确认,保证消息的准确性。

    因此,按照博主的经验,生产上用confirm模式的居多
一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),
一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),
这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个
Nack消息给你,你可以进行重试操作。

二、生产者发送确认

在RabbitMQ中实现生产者发送确认的方法(本文使用springcloud项目),主要有两点:

【a】配置文件中配置消息发送确认

spring
   rabbitm
     qpublisher-confirms   true

【b】生产者实现 RabbitTemplate.ConfirmCallback接口,重写方法

confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
当然也可以通过注入的方式自定义confirm listener.

@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
 
    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
        .........
    }
}

 

三、消费者接收确认

    为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。

确认模式主要分为下面三种:

AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认
注意:在springcloud项目中通过在配置文件中指定消息确认的模式,如下指定手动确认模式

spring:
  application:
    name: consumer1-service
  rabbitmq: listener: simple: acknowledge-mode: manual

 

手动确认与自动确认的区别:

       自动确认:这种模式下,当发送者发送完消息之后,它会自动认为消费者已经成功接收到该条消息。

这种方式效率较高,当时如果在发送过程中,如果网络中断或者连接断开,将会导致消息丢失。
手动确认:消费者成功消费完消息之后,会显式发回一个应答(ack信号),

RabbitMQ只有成功接收到这个应答消息,才将消息从内存或磁盘中移除消息。

这种方式效率较低点,但是能保证绝大部分的消息不会丢失,当然肯定还有一些小概率会发生消息丢失的情况。


(一)手动确认主要使用的方法有下面几个:

public void basicAck(long deliveryTag, boolean multiple):
deliveryTag 表示该消息的index(long类型的数字);
multiple 表示是否批量(true:将一次性ack所有小于deliveryTag的消息);

如果成功消费消息,一般调用下面的代码用于确认消息成功处理完

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 

        public void basicNack(long deliveryTag, boolean multiple, boolean requeue):

告诉服务器这个消息我拒绝接收,basicNack可以一次性拒绝多个消息
  deliveryTag: 表示该消息的index(long类型的数字);
  multiple: 是否批量(true:将一次性拒绝所有小于deliveryTag的消息);
  requeue:指定被拒绝的消息是否重新回到队列;

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

 

       public void basicReject(long deliveryTag, boolean requeue):也是用于拒绝消息,

但是只能拒绝一条消息,不能同时拒绝多个消息。
deliveryTag: 表示该消息的index(long类型的数字);
 requeue:指定被拒绝的消息是否重新回到队列;

四、案例

为了更和实际开发接近,本案例创建服务中心、生产者、两个消费者

(一)服务中心

1、依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lanpo</groupId>
    <artifactId>fenbushiserver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>fenbushiserver</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

2.配置yml

server:
  port: 8081
eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

 

3.在启动类上加注解@EnableEurekaServer

(二)生产者

1、添加pom.xml依赖文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

 

2添加配置

spring:
  application:
    name: product-service
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: / #消息发送确认回调,不加不执行confirm方法
    publisher-confirms: true #发送返回监听回调
    publisher-returns: true #手动回复ack
    listener:
      simple:
        acknowledge-mode: manual
server:
  port: 8082 #指向注册中心8081
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

3.自定义消息发送,返回回调监听(初始化rabbitTemplate)

package com.lanpo.fenbushiproduct.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
/*
ConfirmCallback:只确认消息是否正确到达交换机,不管是否正确到达,该回调都会执行
ReturnCallback:如果消息未正确从交换机到达队列中将会执行,正确到达不会执行
 */
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback,
        RabbitTemplate.ReturnCallback {
    private static final Logger Log = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //用于在依赖关系注入完成之后,需要执行的方法上,以执行任何初始化
    @PostConstruct
    public void init() {
        //setConfirmCallback(ConfirmCallback confirmCallback)
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 如果消息没有到达交换机,则该方法中ack = false,error为错误信息;
     * 如果消息正确到达交换机,则该方法中ack = true;
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Log.info("confirm 回调方法,回调消息id:" + correlationData.getId());
        if (ack) {
            Log.info("confirm 消息发送到交换机成功。。");
        } else {
            Log.info("confirm 消息发送到交换机失败,原因是:[{}]", cause);
        }
    }
    /**
     * 消息从交换机成功到达队列,则returnedMessage方法不会执行;
     * 消息从交换机未能成功到达队列,则returnedMessage方法会执行;
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        Log.info("returnedMessage 消息没有达到队列: " + new String(message.getBody(), StandardCharsets.UTF_8) + " ," +
                " replyCode= " + replyCode + " , " + " replyText= " + replyText, " exchange= " + exchange + " routingKey= " + routingKey);
    }
}

 

4.配置rabbitmq配置类

package com.lanpo.fenbushiproduct.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    //队列和交换机
    public static  final String TRAN_TOPIC_QUEUE="TRAN_QUEUE";
    public static  final String TRAN_TOPIC_EXCHANGE="TRAN_CHANGE";
    public static final String ROUTING_KEY = "lanpo.#";
    @Bean(TRAN_TOPIC_EXCHANGE)//给每一个bean命名可以在本来配置多个
   public Exchange EXCHAGE_TOPIC_TRAN(){
        //durable持久化,这是交换机名称持久化
       return ExchangeBuilder.topicExchange(TRAN_TOPIC_EXCHANGE).durable(true).build();
   }
   //声明队列
    @Bean(TRAN_TOPIC_QUEUE)//给每一个bean命名可以在本来配置多个
    public Queue TRAN_TOPIC_QUEUE(){
        return new Queue(TRAN_TOPIC_QUEUE);
    }
    //绑定交换机和队列
    @Bean
    public Binding BINDING_CHANGE_AND_QUEUE(@Qualifier(TRAN_TOPIC_QUEUE) Queue queue
            ,@Qualifier(TRAN_TOPIC_EXCHANGE) Exchange exchange){
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
    }


}

 

4.发送消息服务

package com.lanpo.fenbushiproduct.service;
import com.lanpo.fenbushiproduct.config.CustomConfirmAndReturnCallback;
import com.lanpo.fenbushiproduct.config.RabbitMqConfig;
import com.sun.javafx.scene.control.ReadOnlyUnbackedObservableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.UUID;
@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】发送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】发送的消息:{} ", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

(三)消费者

1.配置

server:
  port: 8083
spring:
  application:
    name: consumer1-service
  rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtual-host: /
        #手动回复ack
      listener:
        simple:
          acknowledge-mode: manual
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

2.pom.xml文件

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

 

2.接收消息

package com.lanpo.fenbushiconsumer.config;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ReceviceMessage {
    private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class);
    //监听某一个队列
    @RabbitListener(queues = "TRAN_QUEUE")
    public void getRecevice1(String msg, Message message, Channel channel) throws IOException {
        try {
            Log.info("【Consumer1】收到消息 " + msg);
            //确认收到消息,确认当前消费者的一个消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                Log.info("【Consumer1】的该消息被拒绝过,不再放回到队列:{}", message);
                //被拒绝一次之后,不再放到队列中
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                Log.info("【Consumer1】的该消息第一次被拒绝,再放回到队列:{}", message);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

 

(四)、正常调用验证

1.正常的调用

 

 2.查看消费端

 (五)、当Routing key 不正确导致消息未从交换机到达队列

1.解除交换机和routing key解绑

 

 2.修改routing key


@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo2.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】发送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】发送的消息:{} ", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

3.四个消息未达到队列

 

 

(六)、当交换机不正确时

@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】发送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】发送的消息:{} ", message);
            rabbitTemplate.convertAndSend("CHANGE_ERROR"
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

 

 这里主要修改一个不存在的交换机名称,这样消息就不能正确到达消费者监听队列所在的交换机

CHANGE_ERROR,从而触发confirmCallback中发送失败的情况,error为错误原因。

 

(七)、创建Consumer2消费2服务

1.

server:
  port: 8084
spring:
  application:
    name: consumer2-service
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #手动回复ack
    listener:
      simple:
      acknowledge-mode: manual
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

2.其他参考消费者1

package com.lanpo.fenbushiconsumer.config;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ReceviceMessage {
    private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class);
    //监听某一个队列
    @RabbitListener(queues = "TRAN_QUEUE")
    public void getRecevice2(String msg, Message message, Channel channel) throws IOException {
        try {//报错
          String po=null; po.hashCode(); //确认收到消息,确认当前消费者的一个消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                Log.info("【Consumer2】的该消息被拒绝过,不再放回到队列:{}", message);
                //被拒绝一次之后,不再放到队列中
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                Log.info("【Consumer2】的该消息第一次被拒绝,再放回到队列:{}", message);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

 

 

五、防止消息丢失

(一)、生产者丢数据问题

confirm模式是解决生产者消息丢失的有效手段

(二)消息队列数据消失

      处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以
和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。
这样,如果消息持久化磁盘

       之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢,这里顺便说一下吧,其实也很容易,就下面两步

①、将queue的持久化标识durable设置为true,则代表是一个持久的队列

②、发送消息的时候将deliveryMode=2

        这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据。在消息还没有持久化到硬盘时,
可能服务已经死掉,这种情况可以通过引入mirrored-queue即镜像队列,
但也不能保证消息百分百不丢失(整个集群都挂掉)

 

/**
     * 第二个参数:queue的持久化是通过durable=true来实现的。
     * 第三个参数:exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。这里需要注意三点:
    1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
    2.“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
    3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。
     * 第四个参数:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
     * @param
     * @return
     * @Author zxj
     */
    @Bean
    public Queue queue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 25000);//25秒自动删除
        Queue queue = new Queue("topic.messages", true, false, true, arguments);
        return queue;
    }

 

 

 

MessageProperties properties=new MessageProperties();
        properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
        properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化设置
        properties.setExpiration("2018-12-15 23:23:23");//设置到期时间
        Message message=new Message("hello".getBytes(),properties);
        this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

 

(本章案例已经实现了消息持久化)

(三)、消费者丢数据

启用手动确认模式可以解决这个问题 ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直到处理成功。
不会丢失消息,即便服务挂掉,没有处理完成的消息会重回队列,但是异常会让 消息不断重试。 ②手动确认模式 ③不确认模式,acknowledge
="none" 不使用确认机制,只要消息发送完成会立即在队列移除,无论客户端异常还是断开,只要发送完就移除,不会重发。

 

指定Acknowledge的模式: spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示该监听器手动应答消息 针对手动确认模式,有以下特点: 1.使用手动应答消息,有一点需要特别注意,那就是不能忘记应答消息,因为对于RabbitMQ来说处理消息没有超时,
只要不应答消息,他就会认为仍在正常处理消息,导致消息队列出现阻塞,影响 业务执行。
2.如果消费者来不及处理就死掉时,没有响应ack时,会项目启动后会重复发送一条信息给其他消费者; 3.可以选择丢弃消息,这其实也是一种应答,如下,这样就不会再次收到这条消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 4.如果消费者设置了手动应答模式,并且设置了重试,出现异常时无论是否捕获了异常,都是不会重试的 5.如果消费者没有设置手动应答模式,并且设置了重试,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

 

重试机制:

spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重试次数
spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false试次数超过面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列

 

       如果设置了重试模式,那么在出现异常时没有捕获异常会进行重试,如果捕获了异常不会重试。

当出现异常时,我们需要把这个消息回滚到消息队列,有两种方式:

//ack返回false,并重新回到队列,api里面解释得很清楚

//ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

         经过开发中的实际测试,当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,

这时消费者会立马又接收到这条消息进行处理,接着抛出异常,进行回滚如此反复进行。

           这种情况会导致消息队列处理出现阻塞,消息堆积,导致正常消息也无法运行。对于消息回滚到消息队列,

我们希望比较理想的方式时出现异常的消息  达消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行,

因此我们采取的解决方案是,将消息进行应答,这时消息队列会删除该消息,同时我们再次发送该消息      

  到消息队列,这时就实现了错误消息进行消息队列尾部的方案

                //手动进行应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //重新发送消息到队尾
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(new Object()));

 

      如果一个消息体本身有误,会导致该消息体,一直无法进行处理,而服务器中刷出大量无用日志。

解决这个问题可以采取两种方案:

1.一种是对于日常细致处理,分清哪些是可以恢复的异常,哪些是不可以恢复的异常。

对于可以恢复的异常我们采取第三条中的解决方案,对于不可以处理的异常,我们采用记录日志,

直接丢弃该消息方案。

2.另一种是我们对每条消息进行标记,记录每条消息的处理次数,当一条消息,多次处理仍不能成功时,

处理次数到达我们设置的值时,我们就丢弃该消息,但需要记录详细的日志。

消息监听内的异常处理有两种方式:

1.内部catch后直接处理,然后使用channel对消息进行确认

 2.配置RepublishMessageRecoverer将处理异常的消息发送到指定队列专门处理或记录。
监听的方法内抛出异常貌似没有太大用处。因为 抛出异常就算是重试也非常有可能会继续出现异常,
当重试次数完了之后消息就只有重启应用才能接收到了,很有可能导致消息消费不及时。
当然可以配置RepublishMessageRecoverer来解决,但是万一RepublishMessageRecoverer发送失败了呢。。
那就可能造成消息消费不及时了。所以 即使需要将处理出现异常的消息统一放到另外队列去处理,个人建议两种方式

①catch异常后,手动发送到指定队列,然后使用channel给rabbitmq确认消息已消费
②给Queue绑定死信队列,使用nack(requque为false)确认消息消费失败

本文参考1

本文参考2

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM