Kafka Java API操作topic


Kafka官方提供了兩個腳本來管理topic,包括topic的增刪改查。其中kafka-topics.sh負責topic的創建與刪除;kafka-configs.sh腳本負責topic的修改和查詢,但很多用戶都更加傾向於使用程序API的方式對topic進行操作。
 
上一篇文章中提到了如何使用客戶端協議(client protocol)來創建topic,本文則使用服務器端的Java API對topic進行增刪改查。 開始之前,需要明確的是,下面的代碼需要引入kafka-core的依賴,以kafka 0.10.2版本為例:
Maven版本
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.10.2.0</version>
</dependency>
 
Gradle版本
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.2.0'
 
創建topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 創建一個單分區單副本名為t1的topic
AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
zkUtils.close();

刪除topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 刪除topic 't1'
AdminUtils.deleteTopic(zkUtils, "t1");
zkUtils.close();

比較遺憾地是,不管是創建topic還是刪除topic,目前Kafka實現的方式都是后台異步操作的,而且沒有提供任何回調機制或返回任何結果給用戶,所以用戶除了捕獲異常以及查詢topic狀態之外似乎並沒有特別好的辦法可以檢測操作是否成功。

查詢topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
// 獲取topic 'test'的topic屬性屬性
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");
// 查詢topic-level屬性
Iterator it = props.entrySet().iterator();
while(it.hasNext()){
    Map.Entry entry=(Map.Entry)it.next();
    Object key = entry.getKey();
    Object value = entry.getValue();
    System.out.println(key + " = " + value);
}
zkUtils.close();

修改topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");
// 增加topic級別屬性
props.put("min.cleanable.dirty.ratio", "0.3");
// 刪除topic級別屬性
props.remove("max.message.bytes");
// 修改topic 'test'的屬性
AdminUtils.changeTopicConfig(zkUtils, "test", props);
zkUtils.close();


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM