分布式Confirm模式解決消息丟失問題


前言:

      本章主要解決MQ中一般情況下丟數據要采取的措施以及Confirm的應用

 一、簡介

      在RabbitMQ中,消息確認主要有生產者發送確認和消費者接收確認

(一)生產者發送確認:指生產者發送消息后到RabbitMQ服務器,如果RabbitMQ服務器收到消息

會給我們生產者一個應答,用於告訴生產者該條消息已經成功到達RabbitMQ服務器中。

 (二)消費者接收確認:用於確認消費者是否成功消費了該條消息。

       消息確認的實現方式主要有兩種,一種是通過事務的方式(channel.txSelect()、channel.txCommit()、

channel.txRollback()),另外一種是confirm確認機制。因為事務模式比較消耗性能,在實際工作中也用的不多,

這里主要介紹通過confirm機制來實現消息的確認,保證消息的准確性。

    因此,按照博主的經驗,生產上用confirm模式的居多
一旦channel進入confirm模式,所有在該信道上面發布的消息都將會被指派一個唯一的ID(從1開始),
一旦消息被投遞到所有匹配的隊列之后,rabbitMQ就會發送一個Ack給生產者(包含消息的唯一ID),
這就使得生產者知道消息已經正確到達目的隊列了.如果rabiitMQ沒能處理該消息,則會發送一個
Nack消息給你,你可以進行重試操作。

二、生產者發送確認

在RabbitMQ中實現生產者發送確認的方法(本文使用springcloud項目),主要有兩點:

【a】配置文件中配置消息發送確認

spring
   rabbitm
     qpublisher-confirms   true

【b】生產者實現 RabbitTemplate.ConfirmCallback接口,重寫方法

confirm(CorrelationData correlationData, boolean isSendSuccess, String error)
當然也可以通過注入的方式自定義confirm listener.

@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback{
 
    @Override
    public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
        .........
    }
}

 

三、消費者接收確認

    為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。

確認模式主要分為下面三種:

AcknowledgeMode.NONE:不確認
AcknowledgeMode.AUTO:自動確認
AcknowledgeMode.MANUAL:手動確認
注意:在springcloud項目中通過在配置文件中指定消息確認的模式,如下指定手動確認模式

spring:
  application:
    name: consumer1-service
  rabbitmq: listener: simple: acknowledge-mode: manual

 

手動確認與自動確認的區別:

       自動確認:這種模式下,當發送者發送完消息之后,它會自動認為消費者已經成功接收到該條消息。

這種方式效率較高,當時如果在發送過程中,如果網絡中斷或者連接斷開,將會導致消息丟失。
手動確認:消費者成功消費完消息之后,會顯式發回一個應答(ack信號),

RabbitMQ只有成功接收到這個應答消息,才將消息從內存或磁盤中移除消息。

這種方式效率較低點,但是能保證絕大部分的消息不會丟失,當然肯定還有一些小概率會發生消息丟失的情況。


(一)手動確認主要使用的方法有下面幾個:

public void basicAck(long deliveryTag, boolean multiple):
deliveryTag 表示該消息的index(long類型的數字);
multiple 表示是否批量(true:將一次性ack所有小於deliveryTag的消息);

如果成功消費消息,一般調用下面的代碼用於確認消息成功處理完

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

 

        public void basicNack(long deliveryTag, boolean multiple, boolean requeue):

告訴服務器這個消息我拒絕接收,basicNack可以一次性拒絕多個消息
  deliveryTag: 表示該消息的index(long類型的數字);
  multiple: 是否批量(true:將一次性拒絕所有小於deliveryTag的消息);
  requeue:指定被拒絕的消息是否重新回到隊列;

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);

 

       public void basicReject(long deliveryTag, boolean requeue):也是用於拒絕消息,

但是只能拒絕一條消息,不能同時拒絕多個消息。
deliveryTag: 表示該消息的index(long類型的數字);
 requeue:指定被拒絕的消息是否重新回到隊列;

四、案例

為了更和實際開發接近,本案例創建服務中心、生產者、兩個消費者

(一)服務中心

1、依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.lanpo</groupId>
    <artifactId>fenbushiserver</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>fenbushiserver</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

2.配置yml

server:
  port: 8081
eureka:
  instance:
    hostname: localhost
  client:
    registerWithEureka: false
    fetchRegistry: false
    serviceUrl:
      defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

 

3.在啟動類上加注解@EnableEurekaServer

(二)生產者

1、添加pom.xml依賴文件

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

 

2添加配置

spring:
  application:
    name: product-service
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: / #消息發送確認回調,不加不執行confirm方法
    publisher-confirms: true #發送返回監聽回調
    publisher-returns: true #手動回復ack
    listener:
      simple:
        acknowledge-mode: manual
server:
  port: 8082 #指向注冊中心8081
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

3.自定義消息發送,返回回調監聽(初始化rabbitTemplate)

package com.lanpo.fenbushiproduct.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
/*
ConfirmCallback:只確認消息是否正確到達交換機,不管是否正確到達,該回調都會執行
ReturnCallback:如果消息未正確從交換機到達隊列中將會執行,正確到達不會執行
 */
@Component
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback,
        RabbitTemplate.ReturnCallback {
    private static final Logger Log = LoggerFactory.getLogger(CustomConfirmAndReturnCallback.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //用於在依賴關系注入完成之后,需要執行的方法上,以執行任何初始化
    @PostConstruct
    public void init() {
        //setConfirmCallback(ConfirmCallback confirmCallback)
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    /**
     * 如果消息沒有到達交換機,則該方法中ack = false,error為錯誤信息;
     * 如果消息正確到達交換機,則該方法中ack = true;
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        Log.info("confirm 回調方法,回調消息id:" + correlationData.getId());
        if (ack) {
            Log.info("confirm 消息發送到交換機成功。。");
        } else {
            Log.info("confirm 消息發送到交換機失敗,原因是:[{}]", cause);
        }
    }
    /**
     * 消息從交換機成功到達隊列,則returnedMessage方法不會執行;
     * 消息從交換機未能成功到達隊列,則returnedMessage方法會執行;
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        Log.info("returnedMessage 消息沒有達到隊列: " + new String(message.getBody(), StandardCharsets.UTF_8) + " ," +
                " replyCode= " + replyCode + " , " + " replyText= " + replyText, " exchange= " + exchange + " routingKey= " + routingKey);
    }
}

 

4.配置rabbitmq配置類

package com.lanpo.fenbushiproduct.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {
    //隊列和交換機
    public static  final String TRAN_TOPIC_QUEUE="TRAN_QUEUE";
    public static  final String TRAN_TOPIC_EXCHANGE="TRAN_CHANGE";
    public static final String ROUTING_KEY = "lanpo.#";
    @Bean(TRAN_TOPIC_EXCHANGE)//給每一個bean命名可以在本來配置多個
   public Exchange EXCHAGE_TOPIC_TRAN(){
        //durable持久化,這是交換機名稱持久化
       return ExchangeBuilder.topicExchange(TRAN_TOPIC_EXCHANGE).durable(true).build();
   }
   //聲明隊列
    @Bean(TRAN_TOPIC_QUEUE)//給每一個bean命名可以在本來配置多個
    public Queue TRAN_TOPIC_QUEUE(){
        return new Queue(TRAN_TOPIC_QUEUE);
    }
    //綁定交換機和隊列
    @Bean
    public Binding BINDING_CHANGE_AND_QUEUE(@Qualifier(TRAN_TOPIC_QUEUE) Queue queue
            ,@Qualifier(TRAN_TOPIC_EXCHANGE) Exchange exchange){
      return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY).noargs();
    }


}

 

4.發送消息服務

package com.lanpo.fenbushiproduct.service;
import com.lanpo.fenbushiproduct.config.CustomConfirmAndReturnCallback;
import com.lanpo.fenbushiproduct.config.RabbitMqConfig;
import com.sun.javafx.scene.control.ReadOnlyUnbackedObservableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.UUID;
@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】發送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】發送的消息:{} ", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

(三)消費者

1.配置

server:
  port: 8083
spring:
  application:
    name: consumer1-service
  rabbitmq:
      host: 127.0.0.1
      port: 5672
      username: guest
      password: guest
      virtual-host: /
        #手動回復ack
      listener:
        simple:
          acknowledge-mode: manual
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

2.pom.xml文件

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

 

2.接收消息

package com.lanpo.fenbushiconsumer.config;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ReceviceMessage {
    private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class);
    //監聽某一個隊列
    @RabbitListener(queues = "TRAN_QUEUE")
    public void getRecevice1(String msg, Message message, Channel channel) throws IOException {
        try {
            Log.info("【Consumer1】收到消息 " + msg);
            //確認收到消息,確認當前消費者的一個消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                Log.info("【Consumer1】的該消息被拒絕過,不再放回到隊列:{}", message);
                //被拒絕一次之后,不再放到隊列中
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                Log.info("【Consumer1】的該消息第一次被拒絕,再放回到隊列:{}", message);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

 

(四)、正常調用驗證

1.正常的調用

 

 2.查看消費端

 (五)、當Routing key 不正確導致消息未從交換機到達隊列

1.解除交換機和routing key解綁

 

 2.修改routing key


@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo2.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】發送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】發送的消息:{} ", message);
            rabbitTemplate.convertAndSend(RabbitMqConfig.TRAN_TOPIC_EXCHANGE
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

3.四個消息未達到隊列

 

 

(六)、當交換機不正確時

@Service
public class HelloSend {
    private static final Logger Log = LoggerFactory.getLogger(HelloSend.class);
    public static final String ROUTING_KEY = "lanpo.sha";
    @Autowired
    RabbitTemplate rabbitTemplate;
    public void sendMyMessage() {
        for (int i = 0; i <= 3; i++) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            Log.info("【product】發送的消息ID:" + correlationData.getId());
            String message = " hello lan po " + i;
            Log.info("【product】發送的消息:{} ", message);
            rabbitTemplate.convertAndSend("CHANGE_ERROR"
                    , ROUTING_KEY, message, correlationData);
        }
    }
}

 

 

 這里主要修改一個不存在的交換機名稱,這樣消息就不能正確到達消費者監聽隊列所在的交換機

CHANGE_ERROR,從而觸發confirmCallback中發送失敗的情況,error為錯誤原因。

 

(七)、創建Consumer2消費2服務

1.

server:
  port: 8084
spring:
  application:
    name: consumer2-service
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    #手動回復ack
    listener:
      simple:
      acknowledge-mode: manual
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8081/eureka/

 

2.其他參考消費者1

package com.lanpo.fenbushiconsumer.config;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ReceviceMessage {
    private static final Logger Log = LoggerFactory.getLogger(ReceviceMessage.class);
    //監聽某一個隊列
    @RabbitListener(queues = "TRAN_QUEUE")
    public void getRecevice2(String msg, Message message, Channel channel) throws IOException {
        try {//報錯
          String po=null; po.hashCode(); //確認收到消息,確認當前消費者的一個消息收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                Log.info("【Consumer2】的該消息被拒絕過,不再放回到隊列:{}", message);
                //被拒絕一次之后,不再放到隊列中
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                Log.info("【Consumer2】的該消息第一次被拒絕,再放回到隊列:{}", message);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
            e.printStackTrace();
        }
    }
}

 

 

五、防止消息丟失

(一)、生產者丟數據問題

confirm模式是解決生產者消息丟失的有效手段

(二)消息隊列數據消失

      處理消息隊列丟數據的情況,一般是開啟持久化磁盤的配置。這個持久化配置可以
和confirm機制配合使用,你可以在消息持久化磁盤后,再給生產者發送一個Ack信號。
這樣,如果消息持久化磁盤

       之前,rabbitMQ陣亡了,那么生產者收不到Ack信號,生產者會自動重發。

那么如何持久化呢,這里順便說一下吧,其實也很容易,就下面兩步

①、將queue的持久化標識durable設置為true,則代表是一個持久的隊列

②、發送消息的時候將deliveryMode=2

        這樣設置以后,rabbitMQ就算掛了,重啟后也能恢復數據。在消息還沒有持久化到硬盤時,
可能服務已經死掉,這種情況可以通過引入mirrored-queue即鏡像隊列,
但也不能保證消息百分百不丟失(整個集群都掛掉)

 

/**
     * 第二個參數:queue的持久化是通過durable=true來實現的。
     * 第三個參數:exclusive:排他隊列,如果一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。這里需要注意三點:
    1. 排他隊列是基於連接可見的,同一連接的不同信道是可以同時訪問同一連接創建的排他隊列;
    2.“首次”,如果一個連接已經聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
    3.即使該隊列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用於一個客戶端發送讀取消息的應用場景。
     * 第四個參數:自動刪除,如果該隊列沒有任何訂閱的消費者的話,該隊列會被自動刪除。這種隊列適用於臨時隊列。
     * @param
     * @return
     * @Author zxj
     */
    @Bean
    public Queue queue() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-message-ttl", 25000);//25秒自動刪除
        Queue queue = new Queue("topic.messages", true, false, true, arguments);
        return queue;
    }

 

 

 

MessageProperties properties=new MessageProperties();
        properties.setContentType(MessageProperties.DEFAULT_CONTENT_TYPE);
        properties.setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);//持久化設置
        properties.setExpiration("2018-12-15 23:23:23");//設置到期時間
        Message message=new Message("hello".getBytes(),properties);
        this.rabbitTemplate.sendAndReceive("exchange","topic.message",message);

 

(本章案例已經實現了消息持久化)

(三)、消費者丟數據

啟用手動確認模式可以解決這個問題 ①自動確認模式,消費者掛掉,待ack的消息回歸到隊列中。消費者拋出異常,消息會不斷的被重發,直到處理成功。
不會丟失消息,即便服務掛掉,沒有處理完成的消息會重回隊列,但是異常會讓 消息不斷重試。 ②手動確認模式 ③不確認模式,acknowledge
="none" 不使用確認機制,只要消息發送完成會立即在隊列移除,無論客戶端異常還是斷開,只要發送完就移除,不會重發。

 

指定Acknowledge的模式: spring.rabbitmq.listener.direct.acknowledge-mode=manual,表示該監聽器手動應答消息 針對手動確認模式,有以下特點: 1.使用手動應答消息,有一點需要特別注意,那就是不能忘記應答消息,因為對於RabbitMQ來說處理消息沒有超時,
只要不應答消息,他就會認為仍在正常處理消息,導致消息隊列出現阻塞,影響 業務執行。
2.如果消費者來不及處理就死掉時,沒有響應ack時,會項目啟動后會重復發送一條信息給其他消費者; 3.可以選擇丟棄消息,這其實也是一種應答,如下,這樣就不會再次收到這條消息。 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); 4.如果消費者設置了手動應答模式,並且設置了重試,出現異常時無論是否捕獲了異常,都是不會重試的 5.如果消費者沒有設置手動應答模式,並且設置了重試,那么在出現異常時沒有捕獲異常會進行重試,如果捕獲了異常不會重試。

 

重試機制:

spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重試次數
spring.rabbitmq.listener.simple.retry.enabled=true 是否開啟消費者重試(為false時關閉消費者重試,這時消費端代碼異常會一直重復收到消息)
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重試間隔時間(單位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false試次數超過面的設置之后是否丟棄(false不丟棄時需要寫相應代碼將該消息加入死信隊列

 

       如果設置了重試模式,那么在出現異常時沒有捕獲異常會進行重試,如果捕獲了異常不會重試。

當出現異常時,我們需要把這個消息回滾到消息隊列,有兩種方式:

//ack返回false,並重新回到隊列,api里面解釋得很清楚

//ack返回false,並重新回到隊列,api里面解釋得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒絕消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

         經過開發中的實際測試,當消息回滾到消息隊列時,這條消息不會回到隊列尾部,而是仍是在隊列頭部,

這時消費者會立馬又接收到這條消息進行處理,接着拋出異常,進行回滾如此反復進行。

           這種情況會導致消息隊列處理出現阻塞,消息堆積,導致正常消息也無法運行。對於消息回滾到消息隊列,

我們希望比較理想的方式時出現異常的消息  達消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行,

因此我們采取的解決方案是,將消息進行應答,這時消息隊列會刪除該消息,同時我們再次發送該消息      

  到消息隊列,這時就實現了錯誤消息進行消息隊列尾部的方案

                //手動進行應答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            //重新發送消息到隊尾
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(new Object()));

 

      如果一個消息體本身有誤,會導致該消息體,一直無法進行處理,而服務器中刷出大量無用日志。

解決這個問題可以采取兩種方案:

1.一種是對於日常細致處理,分清哪些是可以恢復的異常,哪些是不可以恢復的異常。

對於可以恢復的異常我們采取第三條中的解決方案,對於不可以處理的異常,我們采用記錄日志,

直接丟棄該消息方案。

2.另一種是我們對每條消息進行標記,記錄每條消息的處理次數,當一條消息,多次處理仍不能成功時,

處理次數到達我們設置的值時,我們就丟棄該消息,但需要記錄詳細的日志。

消息監聽內的異常處理有兩種方式:

1.內部catch后直接處理,然后使用channel對消息進行確認

 2.配置RepublishMessageRecoverer將處理異常的消息發送到指定隊列專門處理或記錄。
監聽的方法內拋出異常貌似沒有太大用處。因為 拋出異常就算是重試也非常有可能會繼續出現異常,
當重試次數完了之后消息就只有重啟應用才能接收到了,很有可能導致消息消費不及時。
當然可以配置RepublishMessageRecoverer來解決,但是萬一RepublishMessageRecoverer發送失敗了呢。。
那就可能造成消息消費不及時了。所以 即使需要將處理出現異常的消息統一放到另外隊列去處理,個人建議兩種方式

①catch異常后,手動發送到指定隊列,然后使用channel給rabbitmq確認消息已消費
②給Queue綁定死信隊列,使用nack(requque為false)確認消息消費失敗

本文參考1

本文參考2

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM