import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; /** * @Description kafka消費者 */ public class KafkaConsumerTest { /** * 獨立消費者 * 消費者可以為自己分配分區,不需要訂閱主題,不會發生再均衡,沒有群組概念 * 弊端:如果主題新增了分區,消費者不會受到通知,所以,要么周期性的調用consumer.partitionsFor()方法來檢查是否有新分區加入,要么在添加新分區后重啟應用程序 */ public static void singleConsumer(){ consumer = new KafkaConsumer<String, String>(Properties); List<PartitionInfo> partitionInfos = consumer.partitionsFor("test_topic"); List<TopicPartition> partitions = new ArrayList<>(); if (null != partitionInfos){ for (PartitionInfo partitionInfo : partitionInfos) { //添加需要的partition到集合中 partitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition())); } //手動為消費者指定partition列表 consumer.assign(partitions); try { while (true){ //消費者持續對kafka進行輪訓,否則會被認為已經死亡,它的分區會被移交給群組里的其他消費者。 //傳給poll方法的是一個超時時間,用於控制poll()方法的阻塞時間(在消費者的緩沖區里沒有可用數據時會發生阻塞) //如果該參數被設為0,poll會立即返回,否則它會在指定的毫秒數內一直等待broker返回數據 //poll方法返回一個記錄列表。每條記錄包含了記錄所屬主題的信息、記錄所在分區的信息、記錄在分區里的偏移量,以及記錄的鍵值對。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); System.out.println("==== data get ===="); for (ConsumerRecord<String, String> record : records) { System.out.println(String.format("topic=%s, partition=%s, offset=%d, key=%s, value=%s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); } } } catch(Exception e){ e.printStackTrace(); } finally { //退出應用前使用close方法關閉消費者。 //網絡連接和socket也會隨之關閉,並立即觸發一次再均衡,而不是等待群組協調器發現它不在發送心跳並認定它已死亡,因為那樣需要更長的時間,導致整個群組在一段時間內無法讀取消息。 consumer.close(); } } } }