關鍵字
kafkaConsumer.seek(topicPartition,100); // 指定offset
實現代碼
package com.lzh.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.HashSet; import java.util.Properties; import java.util.Set; // kafka消費者 任意指定 offset 位移開始消費 public class CustomConsumer指定offset位置開始消費 { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 連接到服務器 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 添加groupid,必須 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 1 創建一個消費者對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<String>(); topics.add("Mytopic"); kafkaConsumer.subscribe(topics); // 注冊要消費的主題(可以消費多個主題) // 指定位置開始消費 Set<TopicPartition> assignment= new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 獲取消費者分區分配信息(有了分區分配信息才能開始消費) assignment = kafkaConsumer.assignment(); } // 遍歷所有分區,並指定 offset 從 100 的位置開始消費 for (TopicPartition topicPartition : assignment) { kafkaConsumer.seek(topicPartition,100); // 指定offset } // 3 消費數據 // 一直獲取消費數據 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }
