當消息消費失敗的時候,Spring-Kafka 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 重新消費消息 。默認情況下,Spring-Kafka 達到配置的重試次數時,【每條消息的失敗重試時間,由配置的時間隔決定】Consumer 如果依然消費失敗 ,那么該消息就會進入到死信隊列。
Spring-Kafka 封裝消費重試和死信隊列, 將正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
在應用中可以對死信隊列中的消息進行監控重發,來使得消費者實例再次進行消費,消費端需要做冪等性的處理。
配置類
@Slf4j @Configuration public class KafkaConfiguration { @Bean @Primary public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) { log.info("kafkaErrorHandler begin to handle"); DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(template); /** * 創建 FixedBackOff 對象 設置重試間隔 10秒 次數為 3次 */ FixedBackOff fixedBackOff = new FixedBackOff(10 * 1000L, 3); /** * 創建 SeekToCurrentErrorHandler 對象 */ return new SeekToCurrentErrorHandler(recover, fixedBackOff); } }
Spring-Kafka 通過實現自定義的 SeekToCurrentErrorHandler ,當 Consumer 消費消息異常的時候,進行攔截處理:
重試小於最大次數時,重新投遞該消息給 Consumer
重試到達最大次數時,如果Consumer 還是消費失敗時,該消息就會發送到死信隊列。 死信隊列的 命名規則為: 原有 Topic + .DLT 后綴 = 其死信隊列的 Topic
ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
創建 DeadLetterPublishingRecoverer 對象,它負責實現,在重試到達最大次數時,Consumer 還是消費失敗時,該消息就會發送到死信隊列。
BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
也可以選擇 BackOff 的另一個子類 ExponentialBackOff 實現,提供指數遞增的間隔時間
new SeekToCurrentErrorHandler(recoverer, backOff);
創建 SeekToCurrentErrorHandler 對象,負責處理異常,串聯整個消費重試的整個過程。
SeekToCurrentErrorHandler
在消息消費失敗時,SeekToCurrentErrorHandler 會將 調用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,將 Consumer 對於該消息對應的 TopicPartition 分區的本地進度設置成該消息的位置。
這樣,Consumer 在下次從 Kafka Broker 拉取消息的時候,又能重新拉取到這條消費失敗的消息,並且是第一條。
同時,Spring-Kafka 使用 FailedRecordTracker 對每個 Topic 的每個 TopicPartition 消費失敗次數進行計數,這樣相當於對該 TopicPartition 的第一條消費失敗的消息的消費失敗次數進行計數。
另外,在 FailedRecordTracker 中,會調用 BackOff 來進行計算,該消息的下一次重新消費的時間,通過 Thread#sleep(...) 方法,實現重新消費的時間間隔。
SeekToCurrentErrorHandler 是只針對消息的單條消費失敗的消費重試處理。如果想要有消息的批量消費失敗的消費重試處理,可以使用 SeekToCurrentBatchErrorHandler 。配置方式如下
@Bean @Primary public BatchErrorHandler kafkaBatchErrorHandler() { // 創建 SeekToCurrentBatchErrorHandler 對象 SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler(); // 創建 FixedBackOff 對象 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); batchErrorHandler.setBackOff(backOff); // 返回 return batchErrorHandler; }
SeekToCurrentBatchErrorHandler
暫時不支持死信隊列的機制。
生產者
@Component public class OrderProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; public ListenableFuture<SendResult<String, Object>> sendMessageAsync() { Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build(); ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order); return result; } }
消費者
@Slf4j @Component public class CommentConsumer { private static final String COMMENT_GROUP = "comment-group"; @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP) public void onMessage(Order order) { log.info("【評論】接受消息內容:{}", order); throw new RuntimeException("handle exception"); } }
進入死信隊列