1.在kafka中如果不設置消費的信息的話,一個消息只能被一個group.id消費一次,而新加如的group.id則會被“消費管理”記錄,並指定從當前記錄的消息位置開始向后消費。如果有段時間消費者關閉了,並有發送者發送消息那么下次這個消費者啟動時也會接收到,但是我們如果想要從這個topic的第一條消息消費呢?
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest112"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要發送自定義對象,需要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>(); hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0)); consumer.commitSync(hashMaps); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
首先我們在consumer.subscribe(Arrays.asList(topicName));訂閱一個topic之前要設置從這個topic的offset為0的地方獲取。
注意:這樣的方法要保證這個group.id是新加入,如果是以前存在的,那么會拋異常。
2.如果以前就存在的groupid想要獲取指定的topic的offset為0開始之后的消息:
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest002"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要發送自定義對象,需要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); // consumer.subscribe(Arrays.asList(topicName)); consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset // consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));來分配topic和partition,
而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定從這個topic和partition的開始位置獲取。
3.存在的groupid獲取指定的topic任意的offset
上面的代碼放開 consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
並注釋 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset;
其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示從這個topic的partition中的offset為10的開始獲取消息。
需要注意的是 consumer.assign()是不會被消費者的組管理功能管理的,他相對於是一個臨時的,不會改變當前group.id的offset,比如:
在使用consumer.subscribe(Arrays.asList(topicName));時offset為20,如果通過2和3,已經獲取了最新的消息offset是最新的,
在下次通過 consumer.subscribe(Arrays.asList(topicName));來獲取消息時offset還是20.還是會獲取20以后的消息。
其實在2、3的結果截圖中我們也可以發現沒有1中結果圖的joining group的日志輸出,表示沒有加入到group中。
————————————————
版權聲明:本文為CSDN博主「也是右移」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/u014104286/article/details/77103541/