Kafka——SpringBoot整合(消費者位移的提交)


消費者位移的提交方式以及提交時機需要根據不同的業務場景進行選擇,可以看之前的博客kafka消費者相關。
這里只做應用相關,更多的使用場景,該怎么用、何時用要看前面的博客了解原理。

參考博客:https://blog.csdn.net/yy756127197/article/details/103895810

自動提交偏移量

	// 自動提交偏移量
        // 如果設置成true,偏移量由auto.commit.interval.ms控制自動提交的頻率
        // 如果設置成false,不需要定時的提交offset,可以自己控制offset,當消息認為已消費過了,這個時候再去提交它們的偏移量。
        // 這個很有用的,當消費的消息結合了一些處理邏輯,這個消息就不應該認為是已經消費的,直到它完成了整個處理。
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自動提交的頻率
        configProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

手動提交偏移量

主要步驟:
1.消費者配置 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, “false”);
2.消費者配置ack模式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
3.消費者手動提交 consumer.commitSync();

ConsumerConfig

@Configuration
@EnableKafka
public class ManualConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${kafka.topic.manual}")
    private String topic;

    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-group");
        // 手動提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        // ack模式,詳細見下文注釋
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }

    /**
     * AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有以下幾種:
     *
     * RECORD
     * 每處理一條commit一次
     *
     * BATCH(默認)
     * 每次poll的時候批量提交一次,頻率取決於每次poll的調用頻率
     *
     * TIME
     * 每次間隔ackTime的時間去commit(跟auto commit interval有什么區別呢?)
     *
     * COUNT
     * 累積達到ackCount次的ack去commit
     *
     * COUNT_TIME
     * ackTime或ackCount哪個條件先滿足,就commit
     *
     * MANUAL
     * listener負責ack,但是背后也是批量上去
     *
     * MANUAL_IMMEDIATE
     * listner負責ack,每調用一次,就立即commit
     *
     */

}

Consumer

@Component
@Slf4j
public class ManualConsumer {

    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void receive(@Payload String message,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        Consumer consumer,
                        Acknowledgment ack) {
        System.out.println(String.format("From partition %d : %s", partition, message));
        // 同步提交
        consumer.commitSync();

        // ack這種方式提交也可以
        // ack.acknowledge();
    }

    /**
     * commitSync和commitAsync組合使用
     * <p>
     * 手工提交異步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     * <p>
     * commitSync()方法提交最后一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,
     * commitSync()會一直重試,但是commitAsync()不會。
     * <p>
     * 一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,
     * 那么后續的提交總會有成功的。但如果這是發生在關閉消費者或再均衡前的最后一次提交,就要確保能夠提交成功。
     * 因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void manual(@Payload String message,
                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                       Consumer consumer,
                       Acknowledgment ack) {
        try {
            System.out.println(String.format("From partition %d : %s", partition, message));
            // 同步提交
            consumer.commitSync();
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }

    }


    /**
     * 手動提交,指定偏移量
     *
     * @param record
     * @param consumer
     */
//    @KafkaListener(topics = "${kafka.topic.manual}", containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        System.out.println(String.format("From partition %d : %s", record.partition(), record.value()));

        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
    }
    
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM