關於怎么獲取kafka指定位置offset消息(轉)


1.在kafka中如果不設置消費的信息的話,一個消息只能被一個group.id消費一次,而新加如的group.id則會被“消費管理”記錄,並指定從當前記錄的消息位置開始向后消費。如果有段時間消費者關閉了,並有發送者發送消息那么下次這個消費者啟動時也會接收到,但是我們如果想要從這個topic的第一條消息消費呢?

public class SimpleConsumerPerSonIndex2 {
public static void main(String[] args) throws Exception {
 
 
      //Kafka consumer configuration settings
      String topicName = "mypartition001";
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "partitiontest112");
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      
      //要發送自定義對象,需要指定對象的反序列化類
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
        KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
      hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0));
      consumer.commitSync(hashMaps);
      consumer.subscribe(Arrays.asList(topicName));
      while (true) {
      ConsumerRecords<String, Object> records = consumer.poll(100);
         for (ConsumerRecord<String, Object> record : records){
        System.out.println(record.toString());
         }
      }
      
   }
}

 

 

首先我們在consumer.subscribe(Arrays.asList(topicName));訂閱一個topic之前要設置從這個topic的offset為0的地方獲取。
注意:這樣的方法要保證這個group.id是新加入,如果是以前存在的,那么會拋異常。


2.如果以前就存在的groupid想要獲取指定的topic的offset為0開始之后的消息:

public class SimpleConsumerPerSonIndex2 {
public static void main(String[] args) throws Exception {
 
 
      //Kafka consumer configuration settings
      String topicName = "mypartition001";
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "partitiontest002");
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      
      //要發送自定義對象,需要指定對象的反序列化類
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
    KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
// consumer.subscribe(Arrays.asList(topicName));
      consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
      consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset
//       consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
     
      while (true) {
      ConsumerRecords<String, Object> records = consumer.poll(100);
         for (ConsumerRecord<String, Object> record : records){
        System.out.println(record.toString());
         }
      }
      
   }
}

 

 

使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));來分配topic和partition,
 而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定從這個topic和partition的開始位置獲取。

 

 


3.存在的groupid獲取指定的topic任意的offset

上面的代碼放開 consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
並注釋 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset;
其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示從這個topic的partition中的offset為10的開始獲取消息。

 

 

需要注意的是 consumer.assign()是不會被消費者的組管理功能管理的,他相對於是一個臨時的,不會改變當前group.id的offset,比如:
在使用consumer.subscribe(Arrays.asList(topicName));時offset為20,如果通過2和3,已經獲取了最新的消息offset是最新的,
在下次通過 consumer.subscribe(Arrays.asList(topicName));來獲取消息時offset還是20.還是會獲取20以后的消息。
其實在2、3的結果截圖中我們也可以發現沒有1中結果圖的joining group的日志輸出,表示沒有加入到group中。

 

 

————————————————
版權聲明:本文為CSDN博主「也是右移」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/u014104286/article/details/77103541/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM