如何讓kafka consumer從指定的位置開始消費


 

 

String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs)) {
    consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            // Move to the desired start offset 
            consumer.seek(tp, 100L);
        }
    });
    boolean run = true;
    long lastOffset = 200L;
    while (run) {
        ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
        for (ConsumerRecord<String, String> record : crs) {
            System.out.println(record);
            if (record.offset() == lastOffset) {
                // Reached the end offsey, stop consuming
                run = false;
                break;
            }
        }
    }
}

 

 

參考1

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM