很多用戶都有這樣的需求:實時監控某個topic各分區在broker上所占的磁盤空間大小總和。Kafka並沒有提供直接的腳本工具用於統計這些數據。
如果依然要實現這個需求,一種方法是通過監控JMX指標得到分區當前總的日志大小,然后手動相加所有分區的值得出;另一種方法就是使用1.0.0引入的DescribeLogDirsRequest請求。本文即介紹如何通過Java API獲取某broker上某topic總的空間大小,代碼如下:
package huxihx; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeLogDirsResult; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; public class TopicDiskSizeSummary { private static AdminClient admin; public static void main(String[] args) throws ExecutionException, InterruptedException { String brokers = "localhost:9092"; initialize(brokers); try { long topic1InBroker1 = getTopicDiskSizeForSomeBroker("t2", 1); long topic2InBroker0 = getTopicDiskSizeForSomeBroker("t1", 0); System.out.println(topic1InBroker1); System.out.println(topic2InBroker0); } finally { shutdown(); } } public static long getTopicDiskSizeForSomeBroker(String topic, int brokerID) throws ExecutionException, InterruptedException { long sum = 0; DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID)); Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get(); for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) { Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue(); for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) { DescribeLogDirsResponse.LogDirInfo info = entry1.getValue(); Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos; for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) { if (topic.equals(replicas.getKey().topic())) { sum += replicas.getValue().size; } } } } return sum; } private static void initialize(String bootstrapServers) { Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); admin = AdminClient.create(props); } private static void shutdown() { if (admin != null) { admin.close(); } } }
其中主要的方法是AdminClient.describeLogDirs(),它返回DescribeLogDirsResult實例,里面封裝了給定broker上所有log.dirs路徑下對應的分區的日志大小,將它們加到一起即可實現統計topic磁盤空間占用的功能。