對於 kafka 主題(topic)的管理(增刪改查),使用最多的便是kafka自帶的腳本。
創建主題
kafka提供了自帶的 kafka-topics
腳本,用來幫助用戶創建主題(topic)。
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
create 表明我們要創建主題,而 partitions 和 replication factor 分別設置了主題的分區數以及每個分區下的副本數。
這里為什么用的 --bootstrap-server
參數,而不是 --zookeeper
?
--zookeeper
參數是之前版本的用法,從kafka 2.2 版本開始,社區推薦使用 --bootstrap-server
參數替換 --zoookeeper
,並且顯式地將后者標記為 “已過期”,因此,如果你已經在使用 2.2 版本了,那么創建主題請指定 --bootstrap-server
參數。
推薦使用 --bootstrap-server
而非 --zookeeper
的原因主要有兩個。
- 使用 --zookeeper 會繞過 Kafka 的安全體系。這就是說,即使你為 Kafka 集群設置了安全認證,限制了主題的創建,如果你使用 --zookeeper 的命令,依然能成功創建任意主題,不受認證體系的約束。這顯然是 Kafka 集群的運維人員不希望看到的。
- 使用 --bootstrap-server 與集群進行交互,越來越成為使用 Kafka 的標准姿勢。換句話說,以后會有越來越少的命令和 API 需要與 ZooKeeper 進行連接。這樣,我們只需要一套連接信息,就能與 Kafka 進行全方位的交互,不用像以前一樣,必須同時維護 ZooKeeper 和 Broker 的連接信息。
查詢主題
創建好主題之后,Kafka 允許我們使用相同的腳本查詢主題。你可以使用下面的命令,查詢所有主題的列表。
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
如果要查詢單個主題的詳細數據,你可以使用下面的命令。
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
如果 describe 命令不指定具體的主題名稱,那么 Kafka 默認會返回所有 “可見” 主題的詳細數據給你。
這里的 “可見”,是指發起這個命令的用戶能夠看到的 Kafka 主題。這和前面說到主題創建時,使用 --zookeeper 和 --bootstrap-server 的區別是一樣的。如果指定了 --bootstrap-server,那么這條命令就會受到安全認證體系的約束,即對命令發起者進行權限驗證,然后返回它能看到的主題。否則,如果指定 --zookeeper 參數,那么默認會返回集群中所有的主題詳細數據。基於這些原因,我建議你最好統一使用 --bootstrap-server 連接參數。
修改主題
修改主題分區
其實就是增加分區,目前 Kafka 不允許減少某個主題的分區數。你可以使用 kafka-topics 腳本,結合 --alter 參數來增加某個主題的分區數,命令如下:
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >
這里要注意的是,你指定的分區數一定要比原有分區數大,否則 Kafka 會拋出 InvalidPartitionsException 異常。
修改主題級別參數
在主題創建之后,我們可以使用 kafka-configs 腳本修改對應的參數。
假設我們要設置主題級別參數 max.message.bytes,那么命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
也許你會覺得奇怪,為什么這個腳本就要指定 --zookeeper,而不是 --bootstrap-server 呢?其實,這個腳本也能指定 --bootstrap-server 參數,只是它是用來設置動態參數的。在專欄后面,我會詳細介紹什么是動態參數,以及動態參數都有哪些。現在,你只需要了解設置常規的主題級別參數,還是使用 --zookeeper。
變更副本數
使用自帶的 kafka-reassign-partitions 腳本,幫助我們增加主題的副本數。
假設kafka的內部主題 __consumer_offsets
只有 1 個副本,現在我們想要增加至 3 個副本。下面是操作:
- 創建一個 json 文件,顯式提供 50 個分區對應的副本數。注意,replicas 中的 3 台 Broker 排列順序不同,目的是將 Leader 副本均勻地分散在 Broker 上。該文件具體格式如下
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}
- 執行
kafka-reassign-patitions
腳本,命令如下:
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
除了修改內部主題,我們可能還想查看這些內部主題的消息內容。特別是對於 __consumer_offsets 而言,由於它保存了消費者組的位移數據,有時候直接查看該主題消息是很方便的事情。下面的命令可以幫助我們直接查看消費者組提交的位移數據。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
除了查看位移提交數據,我們還可以直接讀取該主題消息,查看消費者組的狀態信息。
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
對於內部主題 __transaction_state 而言,方法是相同的。你只需要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 即可。
修改主題限速
這里主要是指設置 Leader 副本和 Follower 副本使用的帶寬。有時候,我們想要讓某個主題的副本在執行副本同步機制時,不要消耗過多的帶寬。Kafka 提供了這樣的功能。我來舉個例子。假設我有個主題,名為 test,我想讓該主題各個分區的 Leader 副本和 Follower 副本在處理副本同步時,不得占用超過 100MBps 的帶寬。注意是大寫 B,即每秒不超過 100MB。那么,我們應該怎么設置呢?
要達到這個目的,我們必須先設置 Broker 端參數 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
這條命令結尾處的 --entity-name 就是 Broker ID。倘若該主題的副本分別在 0、1、2、3 多個 Broker 上,那么你還要依次為 Broker 1、2、3 執行這條命令。
設置好這個參數之后,我們還需要為該主題設置要限速的副本。在這個例子中,我們想要為所有副本都設置限速,因此統一使用通配符 * 來表示,命令如下:
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
主題分區遷移
同樣是使用 kafka-reassign-partitions 腳本,對主題各個分區的副本進行 “手術” 般的調整,比如把某些分區批量遷移到其他 Broker 上。
刪除主題
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
刪除主題的命令並不復雜,關鍵是刪除操作是異步的,執行完這條命令不代表主題立即就被刪除了。它僅僅是被標記成 “已刪除” 狀態而已。Kafka 會在后台默默地開啟主題刪除操作。因此,通常情況下,你都需要耐心地等待一段時間。
主題刪除失敗
當運行完上面的刪除命令后,很多人發現已刪除主題的分區數據依然 “躺在” 硬盤上,沒有被清除。這時該怎么辦呢?
實際上,造成主題刪除失敗的原因有很多,最常見的原因有兩個:
- 副本所在的 Broker 宕機了
- 待刪除主題的部分分區依然在執行遷移過程。
如果是因為前者,通常你重啟對應的 Broker 之后,刪除操作就能自動恢復;如果是因為后者,那就麻煩了,很可能兩個操作會相互干擾。
不管什么原因,一旦你碰到主題無法刪除的問題,可以采用這樣的方法:
-
手動刪除 ZooKeeper 節點 /admin/delete_topics 下以待刪除主題為名的 znode。
-
手動刪除該主題在磁盤上的分區目錄。
-
在 ZooKeeper 中執行 rmr /controller,觸發 Controller 重選舉,刷新 Controller 緩存。
在執行最后一步時,你一定要謹慎,因為它可能造成大面積的分區 Leader 重選舉。事實上,僅僅執行前兩步也是可以的,只是 Controller 緩存中沒有清空待刪除主題罷了,也不影響使用。
常見問題
__consumer_offsets 占用太多的磁盤
一旦你發現這個主題消耗了過多的磁盤空間,那么,你一定要顯式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前綴的線程狀態。通常情況下,這都是因為該線程掛掉了,無法及時清理此內部主題。倘若真是這個原因導致的,那我們就只能重啟相應的 Broker 了。另外,請你注意保留出錯日志,因為這通常都是 Bug 導致的,最好提交到社區看一下。