kafka 重放 重播 從某個時間點或者offset開始消費


轉自: https://www.jianshu.com/p/932663e9a226

 

consumer.subscribe(topicA);
consumer.poll(100);//正常訂閱topic和poll消息

Set<TopicPartition> assignments = consumer.assignment();//獲取consumer所分配的分區信息
Map<TopicPartition, Long> query = new HashMap<>();//構造offsetsForTimes參數,通過時間戳找到offset
for (TopicPartition topicPartition : assignments) {
    System.out.println(topicPartition);
    query.put(topicPartition, 1550804131000L);
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : result.entrySet()) {
    System.out.println(entry);
    consumer.seek(entry.getKey(), entry.getValue().offset());//每個topic的partition都seek到執行的offset
}
  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM