Kafka API: TopicMetadata


Jusfr 原創,轉載請注明來自博客園

TopicMetadataRequest/TopicMetadataResponse

前文簡單說過“Kafka是自描述的”,是指其broker、topic、partition 信息可以通過 TopicMetadata API 獲取。

TopicMetadataRequest 的內容非常簡單,是一個包含 TopicName 的數組,TopicMetadataResponse 則告訴使用者 Broker、Topic、Partition 的分布情況。

使用空數組可以獲取完整數據。

在 Chuye.Kafka 里,使用 Connection/Router 對應的發起一個請求:

    var section = new KafkaConfigurationSection("jusfr.redis", 9092);
    var demoTopics = new String[0];
    var connection = new Router(section);    
    connection.TopicMetadata(demoTopics).Dump("Metadata");

Connection.TopicMetadata() 使用 TopicName 數組作為參數構造了一個 TopicMetadataRequest 實例,將其序列化,發送 KafkaConfigurationSection 指向的主機和端口,讀取響應再解析為 TopicMetadataResponse 對象,單機部署的 TopicMetadataResponse 可能有如下結構:

TopicMetadata

當 Kafka 服務的啟動參數auto.create.topics.enable設置為true的時候,TopicMetadataRequest 傳遞的 TopicName 不存在時將被自動創建;

集群模式下 Topic 的自動創建復雜一些,Kafka 攜帶的 bin/kafka-topics.sh 提供了再多參數。


Zookeeper

  • 如何使用程序查詢、刪除 Topic? 如何徹底刪除 Topic ?
  • 如何在集群模式下管理 Topic

源碼閱讀得知,Kafka 對 TopicMetadataRequest 的響應是通過引用 Zookeeper 來完成的。Zookeeper 在 .Net 上的實現有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。

Zookeeper 編程又是一大塊內容,這里只是略加提及。

ZooKeeper 的兩個方法最重要:GetChildren()GetData(),前者提供了路徑查詢,后者提供了節點數據獲取,可以使用以下代碼遞歸訪問:

void Main() {
	ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);
	var paths = zk.GetChildren("/", false).ToArray();
	foreach (var path in paths) {
		GetChildren(zk, "/" + path);
	}
}

void GetChildren(ZooKeeper zk, String path) {
    var data = zk.GetData(path, null, null); 
	var paths = zk.GetChildren(path, false).ToArray();
	if (paths.Length > 0) {
		foreach (var p in paths) {
			GetChildren(zk, path + "/" + p);
		}
	}
}

在集群環境下部分響應示例

topics

// /brokers/topics/demoTopic1
{"version":1,"partitions":{"0":[2]}}


// /brokers/topics/demoTopic1/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]} 

// /brokers/ids/1
{"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093} 

路徑 /brokers/topics 存儲了topic 信息,/admin/delete_topics 存儲了被刪除的 topic,這只是一個標記,由於 Kafka 是基於文件系統的,你需要等待 Kafka 在某個時機真正移除它們。部分參考

delete_topics

由於 Kafka 通過 Zookeeper 返回元數據,故任何 Broker 節點都能應答 TopicMetadataRequest 並提供完整響應;

TopicMeatadata in cluster

可以看到 demoTopic3 的 PartitionId=0 分區所在 Leader=1,即 Broker NodeId=1 的節點 jusfr.kafka-1:9093 ,PartitionId=1 分區所在 Leader=2,即 Broker NodeId=2 的節點 jusfr.kafka-2:9094。讀寫 demoTopic3 的分區0 需要連接到主機 jusfr.kafka-1、端口9093,讀寫 demoTopic3 的分區1 需要連接到主機 jusfr.kafka-2、端口9094,此過程我稱為 Broker route。錯誤的 Broker 訪問、不正確的 server.properties 配置可能觸發狀態碼為 UnknownTopicOrPartition 的響應。

Chuye.Kafka 的 Router 對象從 IRouter 定義,繼續自Connection,重寫了 Route 方法,內部便是 Partition-Broker 檢查邏輯。集群模式下涉及到 Zookeeper 編程,Chuye.Kafka 可能未能給予支持。

Jusfr 原創,轉載請注明來自博客園


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM