本文着重介紹幾個常用的topic命令行命令,包括listTopic,createTopic,deleteTopic和describeTopic等。由於alterTopic並不是很常用,本文中就不涉及了。另外本文的代碼分析是基於kafka_2.10-0.8.2.1的(雖然截圖是Kafka 0.8.1的^_^ )
一. list topic 顯示所有topic

1. 從zookeeper的/brokers/topics節點下獲取所有topic封裝成topic集合
2. 遍歷該集合,查看每個topic是否是待刪除topic——即在/admin/delete_topics下是否存在同名節點。如果是,打印topic已經被標記為刪除;否則直接打印topic名稱
二、create topic 創建topic
1. 從命令行中獲取要創建的topic名稱
2. 解析命令行指定的topic配置(如果存在的話),配置都是x=a的格式
3. 若指定了replica-assignment參數表明用戶想要自己分配分區副本與broker的映射——通常都不這么做,如果不提供該參數Kafka幫你做這件事情
4. 檢查必要的參數是否已指定,包括:zookeeper, replication-factor,partition和topic
5. 獲取/brokers/ids下所有broker並按照broker id進行升序排序
6. 在broker上分配各個分區的副本映射 (沒有指定replica-assignment參數,這也是默認的情況)
7. 檢查topic名字合法性、自定義配置的合法性,並且要保證每個分區都必須有相同的副本數
8. 若zookeeper上已有對應的路徑存在,直接拋出異常表示該topic已經存在
9. 確保某個分區的多個副本不會被分配到同一個broker
10. 若提供了自定義的配置,更新zookeeper的/config/topics/[topic]節點的數據
11. 創建/brokers/topics/[topic]節點,並將分區副本分配映射數據寫入該節點
三、delete topic 刪除topic
1. 獲取待刪除的topic,如果沒有指定--topic就是刪除所有的topic
2. 對於每個要刪除的topic,在zookeeper上的/admin/delete_topics下創建對應的子節點。kafka目前的刪除topic邏輯只是在Zookeeper上標記而已,會有專門的線程負責監聽該路徑下的變更並負責更新zookeeper上其他節點上的數據,但底層的日志文件目前還是需要手動刪除。
四、describe topic 顯示topic詳細信息
1. 上圖可見,如果指定了--topic就是只顯示給定topic的信息,否則顯示所有topic的詳細信息。
2. 如果指定了under-replicated-partitions,那么就顯示那些副本數量不足的分區(ISR size < AR.size)
3. 如果指定了unavailable-partitions,那么就顯示那些leader副本已不可用的分區
4. 從zookeeper上獲取當前所有可用的broker
5. 遍歷每個要describe的topic,
6. 獲取這個topic的分區副本分配信息,若該信息不存在說明topic不存在
7. 否則將分配信息按照分區號進行排序
10. 如果沒有指定步驟2中的參數也沒有指定步驟3中的參數,那么顯示分區數信息、副本系數信息以及配置信息
11. 默認情況下還會顯示各個分區的信息
12. 從zookeeper中獲取每個分區的ISR、Leader、AR信息並顯示
