offset提交: Consumer消費數據后需要向kafka匯報消費的數據在partition offset位置
offset提交方式:自動提交、手動提交
1.自動提交
kafka一poll就自動提交offset
默認情況或者將props.put("enable.auto.commit", "true") --自動提交
自動提交流程:
consumer提交上次poll消息的偏移量——>consumer從kafka拉取新的消息
通過設置props.put("auto.commit.interval.ms", "2000")來決定自動提交間隔
自動提交優缺點:
優點:速度快
缺點:容易造成數據丟失。原因:消息poll下來后(還沒有消費)直接提交offset,速度很快,可能出現消費失敗。
自動提交示例代碼:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "2000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); //kafka一poll就提交offset了 for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
2.手動提交 commitAsync()
手動提交一般放到代碼最后,這樣上邊程序都執行完后執行offset手動提交可以避免數據丟失問題
properties.put("enable.auto.commit", "false"); --offset手動提交
手動提交可以細分為同步提交(commitSync)交和異步提交(commitAsync),同步/異步提交都會將本次poll的一批數據最高的偏移量提交。不同點
同步提交:提交時阻塞當前線程,一直等到提交成功,並且會自動失敗重試
異步提交:提交完成后不需要等待broker返回ack直接往下走
手動提交必要性:
很多時候並不是說拉取到消息就算消費完成,而是需要將消息寫入數據庫、寫入本地緩存,或者是更加復雜的業務處理。在這些場景下,所有的業務處理完成才能認為消息被成功消費,手動的提交方式可以讓開發人員根據程序的邏輯在合適的地方進行位移提交
指定時間、分區 offset開始消費
指定時間
List<PartitionInfo> partitions=consumer.partitionsFor("topicName");
long fetchDataTime=new Date().getTime()-60*1000*100;
Map<TopicPartition,Long> timeToSearch=new HashMap<>();
List<TopicPartition> topicPartitions=new ArrayList<>();
for(PartitionInfo partitionInfo:partitions){
topicPartitions.add(new TopicPartition("TopicName",partitionInfo.partition()));
timeToSearch.put(new TopicPartition("TopicName",partitionInfo.partition()),fetchDataTime);
}
consumer.assign(Arrays.asList(new TopicPartition("TopicName",0));
consumer.offsetsForTimes(timeToSearch);
指定offset
int offset=10;
consumer.assign(Arrays.asList(new TopicPartition("TopicName",0))); //指定topic和partition=0
consumer.seekToBeginning(Arrays.asList(new TopicPartition("TopicName",0)));
consumer.seek(new TopicPartition("TopicName",0),offset);
參考:
指定時間消費
https://blog.csdn.net/lixinkuan328/article/details/120730555?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-120730555-blog-120139470.pc_relevant_3mothn_strategy_and_data_recovery&spm=1001.2101.3001.4242.1&utm_relevant_index=3
指定offset消費
https://blog.csdn.net/weixin_57128596/article/details/127355594