Kafka 核心 API ==> AdminClient


一、Kafka 核心 API

  上文中對 Kafka 做了一些簡單的介紹,那么在開發過程中我們如何去訪問 Kafka 呢?這就需要使用到本文將要介紹的Kafka客戶端API。下圖是官方文檔中的一個圖,形象的描述了能與 Kafka集成的客戶端類型

Kafka的五類客戶端API類型如下:

  • AdminClient API:允許管理和檢測Topic、broker以及其他Kafka實例,與Kafka自帶的腳本命令作用類似
  • Producer API:發布消息到1個或多個Topic,也就是生產者或者說發布方需要用到的API
  • Consumer API:訂閱1個或多個Topic,並處理產生的消息,也就是消費者或者說訂閱方需要用到的API
  • Stream API:高效地將輸入流轉換到輸出流,通常應用在一些流處理場景
  • Connector API:從一些源系統或應用程序拉取數據到Kafka,如上圖中的DB

本文中,我們將主要介紹 AdminClient API。

二、AdminClient API

顯然,操作AdminClient API的前提是需要創建一個 AdminClient 實例。代碼示例:

/**
 * 創建AdminClient客戶端對象
 */
public static AdminClient createAdminClientByProperties() {

  Properties prop = new Properties();

  // 配置Kafka服務的訪問地址及端口號
  prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092");

  // 創建AdminClient實例
  return AdminClient.create(prop);
}

/**
 * 創建AdminClient的第二種方式
 */
public static AdminClient createAdminClientByMap(){

  Map<String, Object> conf = Maps.newHashMap();

  // 配置Kafka服務的訪問地址及端口號
  conf.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.182.128:9092");

  // 創建AdminClient實例
  return AdminClient.create(conf);
}

創建了 AdminClient 的實例對象后,我們就可以通過它提供的方法操作 Kafka,常用的方法如下:

方法名稱 作用
createTopics 創建一個或多個Topic
listTopics  查詢Topic列表 
deleteTopics  刪除一個或多個Topic 
describeTopics  查詢Topic的描述信息 
describeConfigs  查詢Topic、Broker等的所有配置項信息 
alterConfigs  用於修改Topic、Broker等的配置項信息(該方法在新版本中被標記為已過期)
incrementalAlterConfigs  同樣也是用於修改Topic、Broker等的配置項信息,但功能更多、更靈活,用於代替alterConfigs 
createPartitions  用於調整Topic的Partition數量,只能增加不能減少或刪除,也就是說新設置的Partition數量必須大於等於之前的Partition數量 

Tips:

  • describeTopics describeConfigs 的意義主要是在監控上,很多用於監控Kafka的組件都會使用到這兩個API,因為通過這兩個API可以獲取到Topic自身和周邊的詳細信息

三、創建 Topic

使用createTopics方法可以創建Topic,傳入的參數也與kafka-topics.sh命令腳本的參數一樣。代碼示例: 

/**
 * 創建一個或多個topic
 *
 * @param topicNames topic名稱的集合
 */
public static void createTopic(List<String> topicNames) throws Exception {
  // 創建AdminClient客戶端對象
  AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

  List<NewTopic> topicList = Lists.newArrayList();
  /**
   * 定義topic信息
   * String name                topic名
   * int numPartitions          分區數
   * short replicationFactor    副本數,必須不能大於broker數量
   */
  topicNames.forEach(topicName -> topicList.add(
          new NewTopic(topicName, 1, Short.parseShort("1"))));

  // 創建topic
  CreateTopicsResult result = adminClient.createTopics(topicList);

  // get方法是一個阻塞方法,一定要等到createTopics完成之后才進行下一步操作
  result.all().get();

  // 打印新創建的topic名
  result.values().forEach((name, future) -> System.out.println("topicName:" + name));

  // 關閉資源
  adminClient.close();
}

四、刪除 Topic

deleteTopics 方法可以刪除一個或多個Topic,代碼示例:

/**
 * 刪除一個或多個topic
 *
 * @param topicNames topic名稱的集合
 */
public static void removeTopic(List<String> topicNames) throws Exception {

    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();

    // 刪除topic集合
    DeleteTopicsResult result = adminClient.deleteTopics(topicNames);

    // get方法是一個阻塞方法,一定要等到deleteTopics完成之后才進行下一步操作
    result.all().get();

    // 關閉資源
    adminClient.close();
}

五、查看 Topics 列表

listTopics 方法用於查詢Topic列表,通過傳入 ListTopicsOptions 參數可以設置一些可選項。代碼示例:

/**
 * 獲取所有的topic信息,包括Kafka內部的topic
 * 如:__consumer_offsets,internal=true
 */
public static void listTopicsWithOptions() throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();

    ListTopicsOptions options = new ListTopicsOptions();
    // 列出內部的Topic
    options.listInternal(true);

    // 列出所有的topic
    ListTopicsResult result = adminClient.listTopics(options);

    // 獲取所有topic的名字,返回的是一個Set集合
    Set<String> topicNames = result.names().get();

    // 打印所有topic的名字
    topicNames.forEach(System.out::println);

    // 獲取所有topic的信息,返回的是一個Collection集合
    // (name=hello-kafka, internal=false),internal代表是否為內部的topic
    Collection<TopicListing> topicListings = result.listings().get();

    // 打印所有topic的信息
    topicListings.forEach(System.out::println);

    // 關閉資源
    adminClient.close();
}

六、查看 Topic 的描述信息

一個 Topic 會有自身的描述信息,例如:partition 的數量,副本集的數量,是否為 internal 等等。AdminClient 中提供了 describeTopics 方法來查詢這些描述信息。代碼示例:

/**
 * 獲取topic的描述信息
 * 
 * topic name = a-topic, desc = (name=a-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null)
 * topic name = b-topic, desc = (name=b-topic, internal=false, partitions=(partition=0, leader=192.168.182.128:9092 (id: 0 rack: null), replicas=192.168.182.128:9092 (id: 0 rack: null), isr=192.168.182.128:9092 (id: 0 rack: null)), authorizedOperations=null)
 */
public static void describeTopics(List<String> topics) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByProperties();

    // 獲取Topic的描述信息
    DescribeTopicsResult result = adminClient.describeTopics(topics);

    // 解析描述信息結果, Map<String, TopicDescription> ==> topicName:topicDescription
    Map<String, TopicDescription> topicDescriptionMap = result.all().get();
    topicDescriptionMap.forEach((topicName, description) -> System.out.printf("topic name = %s, desc = %s \n", topicName, description));

    // 關閉資源
    adminClient.close();
}

七、查看 Topic 的配置信息

除了Kafka自身的配置項外,其內部的Topic也會有非常多的配置項,我們可以通過describeConfigs方法來獲取某個Topic中的配置項信息。代碼示例:

/**
 * 獲取topic的配置信息
 */
public static void describeConfigTopics(List<String> topicNames) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
    topicNames.forEach(topicName -> configResources.add(
            // 指定要獲取的源
            new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

    // 獲取topic的配置信息
    DescribeConfigsResult result = adminClient.describeConfigs(configResources);

    // 解析topic的配置信息
    Map<ConfigResource, Config> resourceConfigMap = result.all().get();
    resourceConfigMap.forEach((configResource, config) -> System.out.printf("topic config ConfigResource = %s, Config = %s \n", configResource, config));

    // 關閉資源
    adminClient.close();
}

八、修改 Topic 的分區數量

在創建Topic時我們需要設定Partition的數量,但如果覺得初始設置的Partition數量太少了,那么就可以使用 createPartitions方法來調整Topic的Partition數量,但是需要注意在Kafka中Partition只能增加不能減少。代碼示例:
/**
 * 修改topic的分區數量
 * 只能增加不能減少
 */
public static void updateTopicPartition(List<String> topicNames, Integer partitionNum) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    // 構建修改分區的topic請求參數
    Map<String, NewPartitions> newPartitions = Maps.newHashMap();
    topicNames.forEach(topicName -> newPartitions.put(topicName, NewPartitions.increaseTo(partitionNum)));

    // 執行修改
    CreatePartitionsResult result = adminClient.createPartitions(newPartitions);

    // get方法是一個阻塞方法,一定要等到createPartitions完成之后才進行下一步操作
    result.all().get();

    // 關閉資源
    adminClient.close();
}

 Tips:

  • Partition的索引從0開始,所以第一個partition=0,第二個partition=1

九、修改 Topic 配置信息

除了可以查看Topic的配置項信息外,AdminClient還提供了相關方法來修改Topic配置項的值。在早期版本中,使用alterConfigs方法來修改配置項。代碼示例:

/**
 * 修改topic的配置信息
 * 使用舊版api:alterConfigs
 */
public static void updateTopicConfigOld(List<String> topicNames) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
    // 指定要修改的ConfigResource類型及名稱
    topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

    // 建立修改的配置項,配置項以ConfigEntry形式存在
    Config config = new Config(Collections.singletonList(new ConfigEntry("preallocate", "true")));

    // 參數構造
    Map<ConfigResource, Config> configMap = Maps.newHashMap();
    configResources.forEach(configResource -> configMap.put(configResource, config));

    // 修改topic 配置,用的是老api,已經過時
    AlterConfigsResult result = adminClient.alterConfigs(configMap);

    // get方法是一個阻塞方法,一定要等到alterConfigs完成之后才進行下一步操作
    result.all().get();

    // 關閉資源
    adminClient.close();
}

執行以上代碼,成功將 topic 的配置項 preallocate 的值改為了 true。

在新版本中則是使用 incrementalAlterConfigs 方法來修改Topic的配置項,該方法使用起來相對於 alterConfigs 要略微復雜一些,但因此功能更多、更靈活。代碼示例:

/**
 * 修改topic的配置信息
 * 使用新版api:incrementalAlterConfigs
 */
public static void updateTopicConfigNew(List<String> topicNames) throws Exception {
    // 創建AdminClient客戶端對象
    AdminClient adminClient = BuildAdminClient.createAdminClientByMap();

    List<ConfigResource> configResources = Lists.newArrayListWithCapacity(64);
    // 指定要修改的ConfigResource類型及名稱
    topicNames.forEach(topicName -> configResources.add(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));

    // 配置項同樣以ConfigEntry形式存在,只不過增加了操作類型
    // 以及能夠支持操作多個配置項,相對來說功能更多、更靈活
    Collection<AlterConfigOp> configs = Lists.newArrayList(
            new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET),
            new AlterConfigOp(new ConfigEntry("min.cleanable.dirty.ratio", "0.5"), AlterConfigOp.OpType.SET),
            new AlterConfigOp(new ConfigEntry("unclean.leader.election.enable", "false"), AlterConfigOp.OpType.SET));
    // 參數構造
    Map<ConfigResource, Collection<AlterConfigOp>> configMaps = Maps.newHashMap();
    configResources.forEach(configResource -> configMaps.put(configResource, configs));

    // 下面這個是新api.但是有些麻煩
    // 在某些版本中,incrementalAlterConfigs方法可能會存在些問題,對單實例的Kafka支持得不是很好,會出現無法成功修改配置項的情況
    AlterConfigsResult result = adminClient.incrementalAlterConfigs(configMaps);

    // get方法是一個阻塞方法,一定要等到incrementalAlterConfigs完成之后才進行下一步操作
    result.all().get();

    // 關閉資源
    adminClient.close();
}
Tips:
  • 在某些版本中,incrementalAlterConfigs 方法可能會存在些問題,對單實例的 Kafka 支持得不是很好,會出現無法成功修改配置項的情況,此時就可以使用alterConfigs方法來代替。

執行以上代碼,修改了三個配置項的值:preallocate、min.cleanable.dirty.ratio 和 unclean.leader.election.enable。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM