当消息消费失败的时候,Spring-Kafka 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 重新消费消息 。默认情况下,Spring-Kafka 达到配置的重试次数时,【每条消息的失败重试时间,由配置的时间隔决定】Consumer 如果依然消费失败 ,那么该消息就会进入到死信队列。
Spring-Kafka 封装消费重试和死信队列, 将正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
在应用中可以对死信队列中的消息进行监控重发,来使得消费者实例再次进行消费,消费端需要做幂等性的处理。
配置类
@Slf4j @Configuration public class KafkaConfiguration { @Bean @Primary public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) { log.info("kafkaErrorHandler begin to handle"); DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(template); /** * 创建 FixedBackOff 对象 设置重试间隔 10秒 次数为 3次 */ FixedBackOff fixedBackOff = new FixedBackOff(10 * 1000L, 3); /** * 创建 SeekToCurrentErrorHandler 对象 */ return new SeekToCurrentErrorHandler(recover, fixedBackOff); } }
Spring-Kafka 通过实现自定义的 SeekToCurrentErrorHandler ,当 Consumer 消费消息异常的时候,进行拦截处理:
重试小于最大次数时,重新投递该消息给 Consumer
重试到达最大次数时,如果Consumer 还是消费失败时,该消息就会发送到死信队列。 死信队列的 命名规则为: 原有 Topic + .DLT 后缀 = 其死信队列的 Topic
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
创建 DeadLetterPublishingRecoverer 对象,它负责实现,在重试到达最大次数时,Consumer 还是消费失败时,该消息就会发送到死信队列。
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
也可以选择 BackOff 的另一个子类 ExponentialBackOff 实现,提供指数递增的间隔时间
new SeekToCurrentErrorHandler(recoverer, backOff);
创建 SeekToCurrentErrorHandler 对象,负责处理异常,串联整个消费重试的整个过程。
SeekToCurrentErrorHandler
在消息消费失败时,SeekToCurrentErrorHandler 会将 调用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的本地进度设置成该消息的位置。
这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。
同时,Spring-Kafka 使用 FailedRecordTracker 对每个 Topic 的每个 TopicPartition 消费失败次数进行计数,这样相当于对该 TopicPartition 的第一条消费失败的消息的消费失败次数进行计数。
另外,在 FailedRecordTracker 中,会调用 BackOff 来进行计算,该消息的下一次重新消费的时间,通过 Thread#sleep(...) 方法,实现重新消费的时间间隔。
SeekToCurrentErrorHandler 是只针对消息的单条消费失败的消费重试处理。如果想要有消息的批量消费失败的消费重试处理,可以使用 SeekToCurrentBatchErrorHandler 。配置方式如下
@Bean @Primary public BatchErrorHandler kafkaBatchErrorHandler() { // 创建 SeekToCurrentBatchErrorHandler 对象 SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler(); // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); batchErrorHandler.setBackOff(backOff); // 返回 return batchErrorHandler; }
SeekToCurrentBatchErrorHandler
暂时不支持死信队列的机制。
生产者
@Component public class OrderProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public ListenableFuture<SendResult<String, Object>> sendMessageAsync() { Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build(); ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order); return result; } }
消费者
@Slf4j @Component public class CommentConsumer { private static final String COMMENT_GROUP = "comment-group"; @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP) public void onMessage(Order order) { log.info("【评论】接受消息内容:{}", order); throw new RuntimeException("handle exception"); } }
进入死信队列