kafka-獨立消費者,可以消費指定partition的方式


import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/**
 * @Description kafka消費者
 */
public class KafkaConsumerTest {
    
    /**
     * 獨立消費者
     * 消費者可以為自己分配分區,不需要訂閱主題,不會發生再均衡,沒有群組概念
     * 弊端:如果主題新增了分區,消費者不會受到通知,所以,要么周期性的調用consumer.partitionsFor()方法來檢查是否有新分區加入,要么在添加新分區后重啟應用程序
     */
    public static void singleConsumer(){
        consumer = new KafkaConsumer<String, String>(Properties);
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("test_topic");
        List<TopicPartition> partitions = new ArrayList<>();

        if (null != partitionInfos){
            for (PartitionInfo partitionInfo : partitionInfos) {
               //添加需要的partition到集合中
                partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
            //手動為消費者指定partition列表
            consumer.assign(partitions);
            try {
                while (true){
                    //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。
                    //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞)
                    //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據
                    //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                    System.out.println("==== data get ====");
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                    }
                }
            } catch(Exception e){
                e.printStackTrace();
            } finally {
                //退出應用前使用close方法關閉消費者。
                //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息。
                consumer.close();
            }
        }
    }

}  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM