消費端消費重試和死信隊列


       當消息消費失敗的時候,Spring-Kafka 會通過消費重試機制,重新投遞該消息給 Consumer ,讓 Consumer 重新消費消息 。默認情況下,Spring-Kafka 達到配置的重試次數時,【每條消息的失敗重試時間,由配置的時間隔決定】Consumer 如果依然消費失敗 ,那么該消息就會進入到死信隊列。

    Spring-Kafka 封裝消費重試和死信隊列, 將正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。

      在應用中可以對死信隊列中的消息進行監控重發,來使得消費者實例再次進行消費,消費端需要做冪等性的處理。

配置類

  寫一個配置類,用於處理消費異常 ErrorHandler

@Slf4j
@Configuration
public class KafkaConfiguration {

    @Bean
    @Primary
    public ErrorHandler kafkaErrorHandler(KafkaTemplate<?, ?> template) {
        log.info("kafkaErrorHandler begin to handle");
        DeadLetterPublishingRecoverer recover = new DeadLetterPublishingRecoverer(template);
        /**
         * 創建 FixedBackOff 對象   設置重試間隔 10秒 次數為 3次
         */
        FixedBackOff fixedBackOff = new FixedBackOff(10 * 1000L, 3);
        /**
         * 創建 SeekToCurrentErrorHandler 對象
         */
        return new SeekToCurrentErrorHandler(recover, fixedBackOff);
    }

}

       Spring-Kafka 通過實現自定義的 SeekToCurrentErrorHandler ,當 Consumer 消費消息異常的時候,進行攔截處理:

  重試小於最大次數時,重新投遞該消息給 Consumer
  重試到達最大次數時,如果Consumer 還是消費失敗時,該消息就會發送到死信隊列。 死信隊列的 命名規則為: 原有 Topic + .DLT 后綴 = 其死信隊列的 Topic

 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(template);

       創建 DeadLetterPublishingRecoverer 對象,它負責實現,在重試到達最大次數時,Consumer 還是消費失敗時,該消息就會發送到死信隊列。

BackOff backOff = new FixedBackOff(10 * 1000L, 3L);

  也可以選擇 BackOff 的另一個子類 ExponentialBackOff 實現,提供指數遞增的間隔時間

new SeekToCurrentErrorHandler(recoverer, backOff);

  創建 SeekToCurrentErrorHandler 對象,負責處理異常,串聯整個消費重試的整個過程。

SeekToCurrentErrorHandler
  在消息消費失敗時,SeekToCurrentErrorHandler 會將 調用 Kafka Consumer 的 seek(TopicPartition partition, long offset) 方法,將 Consumer 對於該消息對應的 TopicPartition 分區的本地進度設置成該消息的位置。

  這樣,Consumer 在下次從 Kafka Broker 拉取消息的時候,又能重新拉取到這條消費失敗的消息,並且是第一條。

  同時,Spring-Kafka 使用 FailedRecordTracker 對每個 Topic 的每個 TopicPartition 消費失敗次數進行計數,這樣相當於對該 TopicPartition 的第一條消費失敗的消息的消費失敗次數進行計數。

  另外,在 FailedRecordTracker 中,會調用 BackOff 來進行計算,該消息的下一次重新消費的時間,通過 Thread#sleep(...) 方法,實現重新消費的時間間隔。
  SeekToCurrentErrorHandler 是只針對消息的單條消費失敗的消費重試處理。如果想要有消息的批量消費失敗的消費重試處理,可以使用 SeekToCurrentBatchErrorHandler 。配置方式如下

@Bean
@Primary
public BatchErrorHandler kafkaBatchErrorHandler() {
    // 創建 SeekToCurrentBatchErrorHandler 對象
    SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler();
    // 創建 FixedBackOff 對象
    BackOff backOff = new FixedBackOff(10 * 1000L, 3L);
    batchErrorHandler.setBackOff(backOff);
    // 返回
    return batchErrorHandler;
}

  SeekToCurrentBatchErrorHandler 暫時不支持死信隊列的機制。

生產者

@Component
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public ListenableFuture<SendResult<String, Object>> sendMessageAsync() {
        Order order = Order.builder().orderId(120000L).memberId(10002L).payType("WeChat").payTime(new Date()).build();
        ListenableFuture<SendResult<String, Object>> result = kafkaTemplate.send(ORDER_TOPIC, order);
        return result;
    }
}

消費者

@Slf4j
@Component
public class CommentConsumer {

    private static final String COMMENT_GROUP = "comment-group";

    @KafkaListener(topics = ORDER_TOPIC, groupId = COMMENT_GROUP)
    public void onMessage(Order order) {
        log.info("【評論】接受消息內容:{}", order);

        throw new RuntimeException("handle exception");
    }
}

進入死信隊列

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM