Kafka的運維利器-AdminClient


前言

一般情況下,我們都習慣使用kafka-topics.sh腳本來管理主題,但有些時候我們希望將主題管理類的功能集成到公司內部的系統中,打造集管理、監控、運維、告警為一體的生態平台,那么就需要以程序調用API的方式去實現。

Kafka社區於0.11版本正式推出了Java客戶端版的AdminClient,並不斷地在后續的版本中對它進行完善。

本文主要介紹KafkaAdminClient 的基本使用方式,以及采用這種調用API方式下的創建主題時的合法性驗證。

功能

鑒於社區還在不斷地完善 AdminClient 的功能,AdminClient 提供的功能有以下幾個大類。

  • 主題管理:包括主題的創建、刪除和查詢。
  • 權限管理:包括具體權限的配置與刪除。
  • 配置參數管理:包括 Kafka 各種資源的參數設置、詳情查詢。所謂的 Kafka 資源,主要有 Broker、主題、用戶、Client-id 等。
  • 副本日志管理:包括副本底層日志路徑的變更和詳情查詢。
  • 分區管理:即創建額外的主題分區。
  • 消息刪除:即刪除指定位移之前的分區消息。
  • Delegation Token 管理:包括 Delegation Token 的創建、更新、過期和詳情查詢。
  • 消費者組管理:包括消費者組的查詢、位移查詢和刪除。
  • Preferred 領導者選舉:推選指定主題分區的 Preferred Broker 為領導者。

工作原理

AdminClient 是一個雙線程的設計:前端主線程和后端 I/O 線程。

前端線程負責將用戶要執行的操作轉換成對應的請求,然后再將請求發送到后端 I/O 線程的隊列中。

而后端 I/O 線程(kafka-admin-client-thread)從隊列中讀取相應的請求,然后發送到對應的 Broker 節點上,之后把執行結果保存起來,以便等待前端線程的獲取。

使用

如果你使用的是 Maven,需要增加以下依賴項:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.5</version>
</dependency>

構建AdminClient

/**
 * 創建AdminClient客戶端對象
 */
public static AdminClient createAdminClientByProperties() {

  Properties prop = new Properties();

  // 配置Kafka服務的訪問地址及端口號
  prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

  // 創建AdminClient實例
  return AdminClient.create(prop);
}

/**
 * 創建AdminClient的第二種方式
 */
public static AdminClient createAdminClientByMap(){

  Map<String, Object> conf = Maps.newHashMap();

  // 配置Kafka服務的訪問地址及端口號
  conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");

  // 創建AdminClient實例
  return AdminClient.create(conf);
}

創建Topic實例

private static final String TOPIC_NAME = "test_topic";

/**
 * 創建Topic實例
 */
public static void createTopic(){
    AdminClient adminClient = AdminSample.adminClient();
    //副本因子
    Short re = 1;
    NewTopic newTopic = new NewTopic(TOPIC_NAME,1,re);
    CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("CreateTopicsResult : " + createTopicsResult);
    adminClient.close();
}

查詢Topic列表

private static final String TOPIC_NAME = "test_topic";

/**
 * 獲取topic列表
 */
public static void topicList() throws Exception {
    AdminClient adminClient = adminClient();

    //是否查看Internal選項
    ListTopicsOptions options = new ListTopicsOptions();
    options.listInternal(true);

    //ListTopicsResult listTopicsResult = adminClient.listTopics();
    ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    Set<String> names = listTopicsResult.names().get();

    //打印names
    names.stream().forEach(System.out::println);

    Collection<TopicListing> topicListings = listTopicsResult.listings().get();
    //打印TopicListing
    topicListings.stream().forEach((topicList) -> {
        System.out.println(topicList.toString());
    });
    adminClient.close();
}

刪除topic

private static final String TOPIC_NAME = "test_topic";

/**
 * 刪除topic
 */
public static void delTopic() throws Exception {
    AdminClient adminClient = adminClient();
    DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
    deleteTopicsResult.all().get();
}

描述topic

/**
 * 獲取topic的描述信息
 */
public static void describeTopics(List<String> topics) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();

    // 獲取Topic的描述信息
    DescribeTopicsResult result = adminClient.describeTopics(topics);

    // 解析描述信息結果, Map<String, TopicDescription> ==> topicName:topicDescription
    Map<String, TopicDescription> topicDescriptionMap = result.all().get();
    topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s \n", topicName, description));

    // 關閉資源
    adminClient.close();
}

查看 Topic 的配置信息

除了Kafka自身的配置項外,其內部的Topic也會有非常多的配置項,我們可以通過describeConfigs方法來獲取某個Topic中的配置項信息。代碼示例:

/**
 * 獲取topic的配置信息
 */
public static void describeConfigTopics(List<String> topicNames) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
    topicNames.forEach(topicName -> configResources.add(
            // 指定要獲取的源
            new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

    // 獲取topic的配置信息
    DescribeConfigsResult result = adminClient.describeConfigs(configResources);

    // 解析topic的配置信息
    Map<ConfigResource, Config> resourceConfigMap = result.all().get();
    resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s \n", configResource, config));

    // 關閉資源
    adminClient.close();
}

修改 Topic 的分區數量

在創建Topic時我們需要設定Partition的數量,但如果覺得初始設置的Partition數量太少了,那么就可以使用createPartitions方法來調整Topic的Partition數量,但是需要注意在Kafka中Partition只能增加不能減少。代碼示例:

/**
 * 修改topic的分區數量
 * 只能增加不能減少
 */
public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    // 構建修改分區的topic請求參數
    Map<String, NewPartitions> newPartitions = Maps.newHashMap();
    topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum)));

    // 執行修改
    CreatePartitionsResult result = adminClient.createPartitions(newPartitions);

    // get方法是一個阻塞方法,一定要等到createPartitions完成之后才進行下一步操作
    result.all().get();

    // 關閉資源
    adminClient.close();
}

社區於 0.11 版本正式推出了 Java 客戶端版的 AdminClient 工具,該工具提供了幾十種運維操作,而且它還在不斷地演進着。如果可以的話,你最好統一使用 AdminClient 來執行各種 Kafka 集群管理操作,摒棄掉連接 ZooKeeper 的那些工具。另外,建議時刻關注該工具的功能完善情況,畢竟,目前社區對 AdminClient 的變更頻率很高。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM