Kafka官方提供了兩個腳本來管理topic,包括topic的增刪改查。其中kafka-topics.sh負責topic的創建與刪除;kafka-configs.sh腳本負責topic的修改和查詢,但很多用戶都更加傾向於使用程序API的方式對topic進行操作。
上一篇文章中提到了如何使用客戶端協議(client protocol)來創建topic,本文則使用服務器端的Java API對topic進行增刪改查。
開始之前,需要明確的是,下面的代碼需要引入kafka-core的依賴,以kafka 0.10.2版本為例:
Maven版本
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
</dependency>
Gradle版本
compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.2.0'
創建topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 創建一個單分區單副本名為t1的topic AdminUtils.createTopic(zkUtils, "t1", 1, 1, new Properties(), RackAwareMode.Enforced$.MODULE$); zkUtils.close();
刪除topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 刪除topic 't1' AdminUtils.deleteTopic(zkUtils, "t1"); zkUtils.close();
比較遺憾地是,不管是創建topic還是刪除topic,目前Kafka實現的方式都是后台異步操作的,而且沒有提供任何回調機制或返回任何結果給用戶,所以用戶除了捕獲異常以及查詢topic狀態之外似乎並沒有特別好的辦法可以檢測操作是否成功。
查詢topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); // 獲取topic 'test'的topic屬性屬性 Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test"); // 查詢topic-level屬性 Iterator it = props.entrySet().iterator(); while(it.hasNext()){ Map.Entry entry=(Map.Entry)it.next(); Object key = entry.getKey(); Object value = entry.getValue(); System.out.println(key + " = " + value); } zkUtils.close();
修改topic
ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled()); Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test"); // 增加topic級別屬性 props.put("min.cleanable.dirty.ratio", "0.3"); // 刪除topic級別屬性 props.remove("max.message.bytes"); // 修改topic 'test'的屬性 AdminUtils.changeTopicConfig(zkUtils, "test", props); zkUtils.close();