Jusfr 原創,轉載請注明來自博客園
TopicMetadataRequest/TopicMetadataResponse
前文簡單說過“Kafka是自描述的”,是指其broker、topic、partition 信息可以通過 TopicMetadata API 獲取。
TopicMetadataRequest 的內容非常簡單,是一個包含 TopicName 的數組,TopicMetadataResponse 則告訴使用者 Broker、Topic、Partition 的分布情況。
使用空數組可以獲取完整數據。
在 Chuye.Kafka 里,使用 Connection/Router 對應的發起一個請求:
var section = new KafkaConfigurationSection("jusfr.redis", 9092);
var demoTopics = new String[0];
var connection = new Router(section);
connection.TopicMetadata(demoTopics).Dump("Metadata");
Connection.TopicMetadata() 使用 TopicName 數組作為參數構造了一個 TopicMetadataRequest 實例,將其序列化,發送 KafkaConfigurationSection 指向的主機和端口,讀取響應再解析為 TopicMetadataResponse 對象,單機部署的 TopicMetadataResponse 可能有如下結構:
當 Kafka 服務的啟動參數auto.create.topics.enable
設置為true的時候,TopicMetadataRequest 傳遞的 TopicName 不存在時將被自動創建;
集群模式下 Topic 的自動創建復雜一些,Kafka 攜帶的 bin/kafka-topics.sh 提供了再多參數。
Zookeeper
- 如何使用程序查詢、刪除 Topic? 如何徹底刪除 Topic ?
- 如何在集群模式下管理 Topic
源碼閱讀得知,Kafka 對 TopicMetadataRequest 的響應是通過引用 Zookeeper 來完成的。Zookeeper 在 .Net 上的實現有 ZooKeeperNet, NuGet 上是3.4.6.2 版本。
Zookeeper 編程又是一大塊內容,這里只是略加提及。
ZooKeeper 的兩個方法最重要:GetChildren()
和 GetData()
,前者提供了路徑查詢,后者提供了節點數據獲取,可以使用以下代碼遞歸訪問:
void Main() {
ZooKeeper zk = new ZooKeeper("jusfr.mac", TimeSpan.FromSeconds(10), null);
var paths = zk.GetChildren("/", false).ToArray();
foreach (var path in paths) {
GetChildren(zk, "/" + path);
}
}
void GetChildren(ZooKeeper zk, String path) {
var data = zk.GetData(path, null, null);
var paths = zk.GetChildren(path, false).ToArray();
if (paths.Length > 0) {
foreach (var p in paths) {
GetChildren(zk, path + "/" + p);
}
}
}
在集群環境下部分響應示例
// /brokers/topics/demoTopic1
{"version":1,"partitions":{"0":[2]}}
// /brokers/topics/demoTopic1/partitions/0/state
{"controller_epoch":1,"leader":2,"version":1,"leader_epoch":0,"isr":[2]}
// /brokers/ids/1
{"jmx_port":-1,"timestamp":"1457431238732","endpoints":["PLAINTEXT://jusfr.kafka-1:9093"],"host":"jusfr.kafka-1","version":2,"port":9093}
路徑 /brokers/topics 存儲了topic 信息,/admin/delete_topics 存儲了被刪除的 topic,這只是一個標記,由於 Kafka 是基於文件系統的,你需要等待 Kafka 在某個時機真正移除它們。部分參考
由於 Kafka 通過 Zookeeper 返回元數據,故任何 Broker 節點都能應答 TopicMetadataRequest 並提供完整響應;
可以看到 demoTopic3 的 PartitionId=0 分區所在 Leader=1,即 Broker NodeId=1 的節點 jusfr.kafka-1:9093 ,PartitionId=1 分區所在 Leader=2,即 Broker NodeId=2 的節點 jusfr.kafka-2:9094。讀寫 demoTopic3 的分區0 需要連接到主機 jusfr.kafka-1、端口9093,讀寫 demoTopic3 的分區1 需要連接到主機 jusfr.kafka-2、端口9094,此過程我稱為 Broker route。錯誤的 Broker 訪問、不正確的 server.properties 配置可能觸發狀態碼為 UnknownTopicOrPartition 的響應。
Chuye.Kafka 的 Router 對象從 IRouter 定義,繼續自Connection,重寫了 Route 方法,內部便是 Partition-Broker 檢查邏輯。集群模式下涉及到 Zookeeper 編程,Chuye.Kafka 可能未能給予支持。
Jusfr 原創,轉載請注明來自博客園