消費者api,自動提交offset
public class MyConsumer { public static void main(String[] args) { Properties props = new Properties(); //連接的集群 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //開啟自動提交(消費偏移量) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //自動提交的延遲 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000); //KV的反序列化類 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //消費者組 props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc"); //消費者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //訂閱主題 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true){ //獲取數據 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); //解析數據 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()+"-"+consumerRecord.value()); } } } }
手動提交offset,同步提交
public class ConsumerOffsetSync { public static void main(String[] args) { Properties props = new Properties(); //連接的集群 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092"); //關閉自動提交(消費偏移量) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); //KV的反序列化類 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //消費者組 props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1"); //重置offset。 //earliest:從頭開始消費,觸發的條件1,換組;條件2:保留的offset指向的數據已經不存在 //latest:默認值,消費最新的數據。 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); //消費者 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); //訂閱主題 kafkaConsumer.subscribe(Collections.singletonList("first")); while (true){ //獲取數據 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000); //解析數據 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord.key()+"-"+consumerRecord.value()); } //同步提交,當前線程會阻塞直到 offset 提交成功 kafkaConsumer.commitSync(); } } }
手動提交offset,異步提交
//異步提交 kafkaConsumer.commitAsync((offsets, exception) -> { if (exception != null) { System.err.println("Commit failed for" + offsets); } });