Kafka獲取訂閱某topic的所有consumer group【客戶端版】


之前寫過如何用服務器端的API代碼來獲取訂閱某topic的所有consumer group,參見這里。使用服務器端的API需要用到kafka.admin.AdminClient類,但是這個類在0.11.0.0版本已經被標記為不推薦使用了,故目前最合適的方式還是通過客戶端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人問這個問題,我就嘗試寫了一個。使用之前你需要引入kafka client包依賴(以2.2.0版本為例)

Maven:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>

Gradle:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.0'

 

下面是代碼:

 1 private static List<String> getGroupsForTopic(String brokerServers, String topic)  2             throws ExecutionException, InterruptedException, TimeoutException {  3         Properties props = new Properties();  4  props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers);  5 
 6         try (AdminClient client = AdminClient.create(props)) {  7             List<String> allGroups = client.listConsumerGroups()  8  .valid()  9                     .get(10, TimeUnit.SECONDS) 10  .stream() 11  .map(ConsumerGroupListing::groupId) 12  .collect(Collectors.toList()); 13 
14             Map<String, ConsumerGroupDescription> allGroupDetails =
15                     client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS); 16 
17             final List<String> filteredGroups = new ArrayList<>(); 18             allGroupDetails.entrySet().forEach(entry -> { 19                 String groupId = entry.getKey(); 20                 ConsumerGroupDescription description = entry.getValue(); 21                 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment) 22  .map(MemberAssignment::topicPartitions) 23                         .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet())) 24                         .anyMatch(tps -> tps.contains(topic)); 25                 if (topicSubscribed) 26  filteredGroups.add(groupId); 27  }); 28             return filteredGroups; 29  } 30     }

我會假設你的集群中沒有配置安全認證和授權機制或者發起此AdminClient的用戶是合法用戶且有CLUSTER以及GROUP的DESCRIBE權限。

另外值得注意的是,上面這個函數無法獲取非運行中的consumer group,即雖然一個group訂閱了某topic,但是若它所有的consumer成員都關閉的話這個函數是不會返回該group的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM