很多用戶都有直接使用程序API操作Kafka集群的需求。在0.11版本之前,kafka的服務器端代碼(即添加kafka_2.**依賴)提供了AdminClient和AdminUtils可以提供部分的集群管理操作,但社區官網主頁並沒有給出這兩個類的使用文檔。用戶只能自行查看源代碼和測試用例才能了解具體的使用方法。倘若使用客戶端API的話(即添加kafka_clients依賴),用戶必須構造特定的請求並自行編寫代碼向指定broker創建Socket連接並發送請求,同樣是十分繁瑣。故Kafka 0.11版本引入了客戶端的AdminClient工具。注意,雖然和原先服務器端的AdminClient類同名,但這個工具是屬於客戶端的,因此需要在程序中添加kafka_clients依賴,比如Gradle的話則增加 compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
該工具提供的所有功能包括:
- 創建topic
- 查詢所有topic
- 查詢單個topic詳情
- 刪除topic
- 修改config(包括BROKER和TOPIC資源的config)
- 查詢資源config詳情
- 創建ACL
- 查詢ACL詳情
- 刪除ACL
- 查詢整個集群詳情
用戶使用該類的方式與Java clients的使用方式一致,不用連接Zookeeper,而是直接給定集群中的broker列表。另外該類是線程安全的,因此可以放心地在多個線程中使用該類的實例。AdminClient的實現機制與《Java API方式調用Kafka各種協議》一文中的方式完全一樣:都是在后台自行構建Kafka的各種請求然后發送,只不過所有的細節AdminClient都幫用戶實現了,用戶不再自己編寫底層的各種功能代碼了。
下面給出一個該類的測試實例,列出了除ACL操作之外的所有操作樣例代碼,如下所示:
public class AdminClientTest {
private static final String TEST_TOPIC = "test-topic";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
try (AdminClient client = AdminClient.create(props)) {
describeCluster(client);
createTopics(client);
listAllTopics(client);
describeTopics(client);
alterConfigs(client);
describeConfig(client);
deleteTopics(client);
}
}
/**
* describe the cluster
* @param client
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeCluster(AdminClient client) throws ExecutionException, InterruptedException {
DescribeClusterResult ret = client.describeCluster();
System.out.println(String.format("Cluster id: %s, controller: %s", ret.clusterId().get(), ret.controller().get()));
System.out.println("Current cluster nodes info: ");
for (Node node : ret.nodes().get()) {
System.out.println(node);
}
}
/**
* describe topic's config
* @param client
*/
public static void describeConfig(AdminClient client) throws ExecutionException, InterruptedException {
DescribeConfigsResult ret = client.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC)));
Map<ConfigResource, Config> configs = ret.all().get();
for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
ConfigResource key = entry.getKey();
Config value = entry.getValue();
System.out.println(String.format("Resource type: %s, resource name: %s", key.type(), key.name()));
Collection<ConfigEntry> configEntries = value.entries();
for (ConfigEntry each : configEntries) {
System.out.println(each.name() + " = " + each.value());
}
}
}
/**
* alter config for topics
* @param client
*/
public static void alterConfigs(AdminClient client) throws ExecutionException, InterruptedException {
Config topicConfig = new Config(Arrays.asList(new ConfigEntry("cleanup.policy", "compact")));
client.alterConfigs(Collections.singletonMap(
new ConfigResource(ConfigResource.Type.TOPIC, TEST_TOPIC), topicConfig)).all().get();
}
/**
* delete the given topics
* @param client
*/
public static void deleteTopics(AdminClient client) throws ExecutionException, InterruptedException {
KafkaFuture<Void> futures = client.deleteTopics(Arrays.asList(TEST_TOPIC)).all();
futures.get();
}
/**
* describe the given topics
* @param client
* @throws ExecutionException
* @throws InterruptedException
*/
public static void describeTopics(AdminClient client) throws ExecutionException, InterruptedException {
DescribeTopicsResult ret = client.describeTopics(Arrays.asList(TEST_TOPIC, "__consumer_offsets"));
Map<String, TopicDescription> topics = ret.all().get();
for (Map.Entry<String, TopicDescription> entry : topics.entrySet()) {
System.out.println(entry.getKey() + " ===> " + entry.getValue());
}
}
/**
* create multiple sample topics
* @param client
*/
public static void createTopics(AdminClient client) throws ExecutionException, InterruptedException {
NewTopic newTopic = new NewTopic(TEST_TOPIC, 3, (short)3);
CreateTopicsResult ret = client.createTopics(Arrays.asList(newTopic));
ret.all().get();
}
/**
* print all topics in the cluster
* @param client
* @throws ExecutionException
* @throws InterruptedException
*/
public static void listAllTopics(AdminClient client) throws ExecutionException, InterruptedException {
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true); // includes internal topics such as __consumer_offsets
ListTopicsResult topics = client.listTopics(options);
Set<String> topicNames = topics.names().get();
System.out.println("Current topics in this cluster: " + topicNames);
}
}
最后提一句,由於該類本質上是異步發送請求然后等待操作處理結果,因此每個返回的結果都使用了KafkaFuture進行了封裝——KafkaFuture實現了Java的Future接口。既然是Future,那么用戶在具體實現上便可以自行決定是異步接收結果還是同步等待。本例中大量使用了KafkaFuture.get(),即同步等待結果。
