Java API獲取topic所占磁盤空間(Kafka 1.0.0)


很多用戶都有這樣的需求:實時監控某個topic各分區在broker上所占的磁盤空間大小總和。Kafka並沒有提供直接的腳本工具用於統計這些數據。

 

如果依然要實現這個需求,一種方法是通過監控JMX指標得到分區當前總的日志大小,然后手動相加所有分區的值得出;另一種方法就是使用1.0.0引入的DescribeLogDirsRequest請求。本文即介紹如何通過Java API獲取某broker上某topic總的空間大小,代碼如下:

 

package huxihx;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicDiskSizeSummary {

    private static AdminClient admin;

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String brokers = "localhost:9092";
        initialize(brokers);
        try {
            long topic1InBroker1 = getTopicDiskSizeForSomeBroker("t2", 1);
            long topic2InBroker0 = getTopicDiskSizeForSomeBroker("t1", 0);
            System.out.println(topic1InBroker1);
            System.out.println(topic2InBroker0);
        } finally {
            shutdown();
        }
    }

    public static long getTopicDiskSizeForSomeBroker(String topic, int brokerID)
            throws ExecutionException, InterruptedException {
        long sum = 0;
        DescribeLogDirsResult ret = admin.describeLogDirs(Collections.singletonList(brokerID));
        Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> tmp = ret.all().get();
        for (Map.Entry<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> entry : tmp.entrySet()) {
            Map<String, DescribeLogDirsResponse.LogDirInfo> tmp1 = entry.getValue();
            for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> entry1 : tmp1.entrySet()) {
                DescribeLogDirsResponse.LogDirInfo info = entry1.getValue();
                Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfoMap = info.replicaInfos;
                for (Map.Entry<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicas : replicaInfoMap.entrySet()) {
                    if (topic.equals(replicas.getKey().topic())) {
                        sum += replicas.getValue().size;
                    }
                }
            }
        }
        return sum;
    }

    private static void initialize(String bootstrapServers) {
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        admin = AdminClient.create(props);
    }

    private static void shutdown() {
        if (admin != null) {
            admin.close();
        }
    }
}

其中主要的方法是AdminClient.describeLogDirs(),它返回DescribeLogDirsResult實例,里面封裝了給定broker上所有log.dirs路徑下對應的分區的日志大小,將它們加到一起即可實現統計topic磁盤空間占用的功能。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM