前言
一般情況下,我們都習慣使用kafka-topics.sh腳本來管理主題,但有些時候我們希望將主題管理類的功能集成到公司內部的系統中,打造集管理、監控、運維、告警為一體的生態平台,那么就需要以程序調用API的方式去實現。
Kafka社區於0.11版本正式推出了Java客戶端版的AdminClient,並不斷地在后續的版本中對它進行完善。
本文主要介紹KafkaAdminClient 的基本使用方式,以及采用這種調用API方式下的創建主題時的合法性驗證。
功能
鑒於社區還在不斷地完善 AdminClient 的功能,AdminClient 提供的功能有以下幾個大類。
- 主題管理:包括主題的創建、刪除和查詢。
- 權限管理:包括具體權限的配置與刪除。
- 配置參數管理:包括 Kafka 各種資源的參數設置、詳情查詢。所謂的 Kafka 資源,主要有 Broker、主題、用戶、Client-id 等。
- 副本日志管理:包括副本底層日志路徑的變更和詳情查詢。
- 分區管理:即創建額外的主題分區。
- 消息刪除:即刪除指定位移之前的分區消息。
- Delegation Token 管理:包括 Delegation Token 的創建、更新、過期和詳情查詢。
- 消費者組管理:包括消費者組的查詢、位移查詢和刪除。
- Preferred 領導者選舉:推選指定主題分區的 Preferred Broker 為領導者。
工作原理
AdminClient 是一個雙線程的設計:前端主線程和后端 I/O 線程。
前端線程負責將用戶要執行的操作轉換成對應的請求,然后再將請求發送到后端 I/O 線程的隊列中。
而后端 I/O 線程(kafka-admin-client-thread)從隊列中讀取相應的請求,然后發送到對應的 Broker 節點上,之后把執行結果保存起來,以便等待前端線程的獲取。
使用
如果你使用的是 Maven,需要增加以下依賴項:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.5</version>
</dependency>
構建AdminClient
/**
* 創建AdminClient客戶端對象
*/
public static AdminClient createAdminClientByProperties() {
Properties prop = new Properties();
// 配置Kafka服務的訪問地址及端口號
prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// 創建AdminClient實例
return AdminClient.create(prop);
}
/**
* 創建AdminClient的第二種方式
*/
public static AdminClient createAdminClientByMap(){
Map<String, Object> conf = Maps.newHashMap();
// 配置Kafka服務的訪問地址及端口號
conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
// 創建AdminClient實例
return AdminClient.create(conf);
}
創建Topic實例
private static final String TOPIC_NAME = "test_topic";
/**
* 創建Topic實例
*/
public static void createTopic(){
AdminClient adminClient = AdminSample.adminClient();
//副本因子
Short re = 1;
NewTopic newTopic = new NewTopic(TOPIC_NAME,1,re);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
System.out.println("CreateTopicsResult : " + createTopicsResult);
adminClient.close();
}
查詢Topic列表
private static final String TOPIC_NAME = "test_topic";
/**
* 獲取topic列表
*/
public static void topicList() throws Exception {
AdminClient adminClient = adminClient();
//是否查看Internal選項
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
//ListTopicsResult listTopicsResult = adminClient.listTopics();
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> names = listTopicsResult.names().get();
//打印names
names.stream().forEach(System.out::println);
Collection<TopicListing> topicListings = listTopicsResult.listings().get();
//打印TopicListing
topicListings.stream().forEach((topicList) -> {
System.out.println(topicList.toString());
});
adminClient.close();
}
刪除topic
private static final String TOPIC_NAME = "test_topic";
/**
* 刪除topic
*/
public static void delTopic() throws Exception {
AdminClient adminClient = adminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
deleteTopicsResult.all().get();
}
描述topic
/**
* 獲取topic的描述信息
*/
public static void describeTopics(List<String> topics) throws Exception {
// 創建AdminClient客戶端對象
AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();
// 獲取Topic的描述信息
DescribeTopicsResult result = adminClient.describeTopics(topics);
// 解析描述信息結果, Map<String, TopicDescription> ==> topicName:topicDescription
Map<String, TopicDescription> topicDescriptionMap = result.all().get();
topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s \n", topicName, description));
// 關閉資源
adminClient.close();
}
查看 Topic 的配置信息
除了Kafka自身的配置項外,其內部的Topic也會有非常多的配置項,我們可以通過describeConfigs方法來獲取某個Topic中的配置項信息。代碼示例:
/**
* 獲取topic的配置信息
*/
public static void describeConfigTopics(List<String> topicNames) throws Exception {
// 創建AdminClient客戶端對象
AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
topicNames.forEach(topicName -> configResources.add(
// 指定要獲取的源
new ConfigResource(ConfigResource.Type.TOPIC, topicName)));
// 獲取topic的配置信息
DescribeConfigsResult result = adminClient.describeConfigs(configResources);
// 解析topic的配置信息
Map<ConfigResource, Config> resourceConfigMap = result.all().get();
resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s \n", configResource, config));
// 關閉資源
adminClient.close();
}
修改 Topic 的分區數量
在創建Topic時我們需要設定Partition的數量,但如果覺得初始設置的Partition數量太少了,那么就可以使用createPartitions方法來調整Topic的Partition數量,但是需要注意在Kafka中Partition只能增加不能減少。代碼示例:
/**
* 修改topic的分區數量
* 只能增加不能減少
*/
public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception {
// 創建AdminClient客戶端對象
AdminClient adminClient = BuildAdminClient.createAdminClientByMap();
// 構建修改分區的topic請求參數
Map<String, NewPartitions> newPartitions = Maps.newHashMap();
topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum)));
// 執行修改
CreatePartitionsResult result = adminClient.createPartitions(newPartitions);
// get方法是一個阻塞方法,一定要等到createPartitions完成之后才進行下一步操作
result.all().get();
// 關閉資源
adminClient.close();
}
社區於 0.11 版本正式推出了 Java 客戶端版的 AdminClient 工具,該工具提供了幾十種運維操作,而且它還在不斷地演進着。如果可以的話,你最好統一使用 AdminClient 來執行各種 Kafka 集群管理操作,摒棄掉連接 ZooKeeper 的那些工具。另外,建議時刻關注該工具的功能完善情況,畢竟,目前社區對 AdminClient 的變更頻率很高。