Kafka 消費者API


消費者api,自動提交offset

public class MyConsumer {

    public static void main(String[] args) {

        Properties props = new Properties();
        //連接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //開啟自動提交(消費偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        //自動提交的延遲
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
        //KV的反序列化類
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc");

        //消費者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //訂閱主題
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //獲取數據
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析數據
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }
        }

    }
}

 

手動提交offset,同步提交

public class ConsumerOffsetSync {
    public static void main(String[] args) {

        Properties props = new Properties();
        //連接的集群
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        //關閉自動提交(消費偏移量)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        //KV的反序列化類
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        //消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG,"gc1");

        //重置offset。
        //earliest:從頭開始消費,觸發的條件1,換組;條件2:保留的offset指向的數據已經不存在
        //latest:默認值,消費最新的數據。
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        //消費者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        //訂閱主題
        kafkaConsumer.subscribe(Collections.singletonList("first"));

        while (true){
            //獲取數據
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //解析數據
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key()+"-"+consumerRecord.value());
            }

            //同步提交,當前線程會阻塞直到 offset 提交成功
 kafkaConsumer.commitSync();
        }

    }
}

 

手動提交offset,異步提交

//異步提交
kafkaConsumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        System.err.println("Commit failed for" +
                offsets);
    }
});

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM