需求
在生產環境中,會遇到最近消費的幾個小時數據異常,想重新按照時間消費。
例如,要求按照時間,消費前一天的數據。
關鍵字
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition); // 時間轉offset
實現代碼
package com.lzh.kafka; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.*; // 指定時間開始消費 public class CustomConsumer指定時間開始消費 { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 連接到服務器 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 添加groupid,必須 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); // 1 創建一個消費者對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<String>(); topics.add("Mytopic"); kafkaConsumer.subscribe(topics); // 注冊要消費的主題(可以消費多個主題) // 指定位置開始消費 Set<TopicPartition> assignment= new HashSet<>(); while (assignment.size() == 0) { kafkaConsumer.poll(Duration.ofSeconds(1)); // 獲取消費者分區分配信息(有了分區分配信息才能開始消費) assignment = kafkaConsumer.assignment(); } // 把時間轉換為對應的offset HashMap<TopicPartition, Long> partitionLongHashMap = new HashMap<>(); // 封裝對應的集合 for (TopicPartition topicPartition : assignment) { partitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1*24*3600*1000); // 一天前的時間System.currentTimeMillis() - 1*24*3600*100 } Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(partitionLongHashMap); // 遍歷所有分區,並指定 offset 從前一天開始消費 for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition); kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); } // 3 消費數據 // 一直獲取消費數據 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } } } }