消费端消费重试和死信队列


       当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。

    Spring-Kafka 封装消费重试和死信队列, 将正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

      在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。

配置类

  写一个配置类,用于处理消费异常 ErrorHandler

@Slf4j
@Configuration
public class KafkaConfiguration {

    @Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
        log.info("kafkaErrorHandler begin to handle");
        DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(template);
        /**
         * 创建 FixedBackOff 对象   设置重试间隔 10秒 次数为 3次
         */
        FixedBackOff fixedBackOff = new FixedBackOff(10 * 1000L, 3);
        /**
         * 创建 SeekToCurrentErrorHandler 对象
         */
        return new SeekToCurrentErrorHandler(recover, fixedBackOff);
    }

}

       Spring-Kafka 通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理:

  重试小于最大次数时,重新投递该消息给 Consumer
  重试到达最大次数时,如果Consumer 还是消费失败时,该消息就会发送到死信队列。 死信队列的 命名规则为: 原有 Topic + .DLT 后缀 = 其死信队列的 Topic

 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

       创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。

BackOff backOff = new FixedBackOff(10 * 1000L, 3L);

  也可以选择 BackOff 的另一个子类 ExponentialBackOff 实现,提供指数递增的间隔时间

new SeekToCurrentErrorHandler(recoverer, backOff);

  创建 SeekToCurrentErrorHandler 对象,负责处理异常,串联整个消费重试的整个过程。

SeekToCurrentErrorHandler
  在消息消费失败时,SeekToCurrentErrorHandler 会将 调用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。

  这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。

  同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。

  另外,在 FailedRecordTracker 中,会调用 BackOff 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。
  SeekToCurrentErrorHandler 是只针对消息的单条消费失败的消费重试处理。如果想要有消息的批量消费失败的消费重试处理,可以使用 SeekToCurrentBatchErrorHandler 。配置方式如下

@Bean
@Primary
public BatchErrorHandler kafkaBatchErrorHandler() {
    // 创建 SeekToCurrentBatchErrorHandler 对象
    SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
    // 创建 FixedBackOff 对象
    BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
    batchErrorHandler.setBackOff(backOff);
    // 返回
    return batchErrorHandler;
}

  SeekToCurrentBatchErrorHandler 暂时不支持死信队列的机制。

生产者

@Component
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public ListenableFuture<SendResult<String, Object>> sendMessageAsync() {
        Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build();
        ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order);
        return result;
    }
}

消费者

@Slf4j
@Component
public class CommentConsumer {

    private static final String COMMENT_GROUP = "comment-group";

    @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP)
    public void onMessage(Order order) {
        log.info("【评论】接受消息内容:{}", order);

        throw new RuntimeException("handle exception");
    }
}

进入死信队列

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM