手動提交offset
手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。
相同點:都會將本次提交的一批數據最高的偏移量提交
不同點:
同步提交:阻塞當前線程,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);
異步提交:沒有失敗重試機制,故有可能提交失敗。
• commitSync(同步提交):必須等待offset提交完畢,再去消費下一批數據。
• commitAsync(異步提交):發送完提交offset請求后,就開始消費下一批數據。
關鍵字
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 是否開啟自動提交offset
kafkaConsumer.commitAsync(); // 異步提交
kafkaConsumer.commitSync(); // 同步提交
實現代碼
package com.lzh.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.ArrayList; import java.util.Properties; // kafka消費者手動提交offset public class CustomConsumer手動同步提交offset { public static void main(String[] args) { // 0 配置 Properties properties = new Properties(); // 連接到服務器 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); // 添加消費者組groupid,必須 properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); /* 自動提交配置 // 是否自動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); // 提交 offset 的時間周期 1000ms,默認 5s properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); */ // 是否手動提交 offset properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); // 1 創建一個消費者對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties); // 2 訂閱主題 ArrayList<String> topics = new ArrayList<String>(); topics.add("Mytopic"); kafkaConsumer.subscribe(topics); // 注冊要消費的主題(可以消費多個主題) // 3 消費數據 // 一直獲取消費數據 while (true){ ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } // 手動提交offset // kafkaConsumer.commitAsync(); // 異步提交 kafkaConsumer.commitSync(); // 同步提交 } } }