能與原理介紹
在Kafka官網中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為准):
- 創建Topic:createTopics(Collection<NewTopic> newTopics)
- 刪除Topic:deleteTopics(Collection<String> topics)
- 羅列所有Topic:listTopics()
- 查詢Topic:describeTopics(Collection<String> topicNames)
- 查詢集群信息:describeCluster()
- 查詢ACL信息:describeAcls(AclBindingFilter filter)
- 創建ACL信息:createAcls(Collection<AclBinding> acls)
- 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
- 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
- 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
- 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
- 查詢節點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
- 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
- 增加分區:createPartitions(Map<String, NewPartitions> newPartitions)
- 其內部原理是使用Kafka自定義的一套二進制協議來實現,詳細可以參見Kafka協議。主要實現步驟:
客戶端根據方法的調用創建相應的協議請求,比如創建Topic的createTopics方法,其內部就是發送CreateTopicRequest請求。
客戶端發送請求至Kafka Broker。
Kafka Broker處理相應的請求並回執,比如與CreateTopicRequest對應的是CreateTopicResponse。
客戶端接收相應的回執並進行解析處理。
和協議有關的請求和回執的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執類的兩個基本父類。
@Component public class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); /* props.put("retries", 2); // 重試次數 props.put("batch.size", 16384); // 批量發送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據本機內存大小配置 props.put("linger.ms", 1000); // 發送頻率,滿足任務一個條件發送*/ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
@RestController public class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping("createTopic") public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic("test1",4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping("deleteTopic") public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList("test1")); adminClient.close(); } @GetMapping("listAllTopic") public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{ System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping("getTopic") public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test")); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println("找不到描述信息"); }else{ for (KafkaFuture<TopicDescription> value : values) { System.out.println(value); } } adminClient.close(); } }
