kafka 主題管理


對於 kafka 主題(topic)的管理(增刪改查),使用最多的便是kafka自帶的腳本。

創建主題

kafka提供了自帶的 kafka-topics 腳本,用來幫助用戶創建主題(topic)。

bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name  --partitions 1 --replication-factor 1

create 表明我們要創建主題,而 partitions 和 replication factor 分別設置了主題的分區數以及每個分區下的副本數。

這里為什么用的 --bootstrap-server 參數,而不是 --zookeeper ?

--zookeeper 參數是之前版本的用法,從kafka 2.2 版本開始,社區推薦使用 --bootstrap-server 參數替換 --zoookeeper ,並且顯式地將后者標記為 “已過期”,因此,如果你已經在使用 2.2 版本了,那么創建主題請指定 --bootstrap-server 參數。

推薦使用 --bootstrap-server 而非 --zookeeper 的原因主要有兩個。

  1. 使用 --zookeeper 會繞過 Kafka 的安全體系。這就是說,即使你為 Kafka 集群設置了安全認證,限制了主題的創建,如果你使用 --zookeeper 的命令,依然能成功創建任意主題,不受認證體系的約束。這顯然是 Kafka 集群的運維人員不希望看到的。
  2. 使用 --bootstrap-server 與集群進行交互,越來越成為使用 Kafka 的標准姿勢。換句話說,以后會有越來越少的命令和 API 需要與 ZooKeeper 進行連接。這樣,我們只需要一套連接信息,就能與 Kafka 進行全方位的交互,不用像以前一樣,必須同時維護 ZooKeeper 和 Broker 的連接信息。

查詢主題

創建好主題之后,Kafka 允許我們使用相同的腳本查詢主題。你可以使用下面的命令,查詢所有主題的列表。

bin/kafka-topics.sh --bootstrap-server broker_host:port --list

如果要查詢單個主題的詳細數據,你可以使用下面的命令。

bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>

如果 describe 命令不指定具體的主題名稱,那么 Kafka 默認會返回所有 “可見” 主題的詳細數據給你。

這里的 “可見”,是指發起這個命令的用戶能夠看到的 Kafka 主題。這和前面說到主題創建時,使用 --zookeeper 和 --bootstrap-server 的區別是一樣的。如果指定了 --bootstrap-server,那么這條命令就會受到安全認證體系的約束,即對命令發起者進行權限驗證,然后返回它能看到的主題。否則,如果指定 --zookeeper 參數,那么默認會返回集群中所有的主題詳細數據。基於這些原因,我建議你最好統一使用 --bootstrap-server 連接參數。

修改主題

修改主題分區

其實就是增加分區,目前 Kafka 不允許減少某個主題的分區數。你可以使用 kafka-topics 腳本,結合 --alter 參數來增加某個主題的分區數,命令如下:

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions < 新分區數 >

這里要注意的是,你指定的分區數一定要比原有分區數大,否則 Kafka 會拋出 InvalidPartitionsException 異常。

修改主題級別參數

在主題創建之后,我們可以使用 kafka-configs 腳本修改對應的參數。

假設我們要設置主題級別參數 max.message.bytes,那么命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760

也許你會覺得奇怪,為什么這個腳本就要指定 --zookeeper,而不是 --bootstrap-server 呢?其實,這個腳本也能指定 --bootstrap-server 參數,只是它是用來設置動態參數的。在專欄后面,我會詳細介紹什么是動態參數,以及動態參數都有哪些。現在,你只需要了解設置常規的主題級別參數,還是使用 --zookeeper。

變更副本數

使用自帶的 kafka-reassign-partitions 腳本,幫助我們增加主題的副本數。

假設kafka的內部主題 __consumer_offsets 只有 1 個副本,現在我們想要增加至 3 個副本。下面是操作:

  1. 創建一個 json 文件,顯式提供 50 個分區對應的副本數。注意,replicas 中的 3 台 Broker 排列順序不同,目的是將 Leader 副本均勻地分散在 Broker 上。該文件具體格式如下
{"version":1, "partitions":[
 {"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]}, 
  {"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
  {"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
  {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
  ...
  {"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]}
]}
  1. 執行 kafka-reassign-patitions 腳本,命令如下:
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute

除了修改內部主題,我們可能還想查看這些內部主題的消息內容。特別是對於 __consumer_offsets 而言,由於它保存了消費者組的位移數據,有時候直接查看該主題消息是很方便的事情。下面的命令可以幫助我們直接查看消費者組提交的位移數據。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

除了查看位移提交數據,我們還可以直接讀取該主題消息,查看消費者組的狀態信息。

bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning

對於內部主題 __transaction_state 而言,方法是相同的。你只需要指定 kafka.coordinator.transaction.TransactionLog$TransactionLogMessageFormatter 即可。

修改主題限速

這里主要是指設置 Leader 副本和 Follower 副本使用的帶寬。有時候,我們想要讓某個主題的副本在執行副本同步機制時,不要消耗過多的帶寬。Kafka 提供了這樣的功能。我來舉個例子。假設我有個主題,名為 test,我想讓該主題各個分區的 Leader 副本和 Follower 副本在處理副本同步時,不得占用超過 100MBps 的帶寬。注意是大寫 B,即每秒不超過 100MB。那么,我們應該怎么設置呢?

要達到這個目的,我們必須先設置 Broker 端參數 leader.replication.throttled.rate 和 follower.replication.throttled.rate,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0

這條命令結尾處的 --entity-name 就是 Broker ID。倘若該主題的副本分別在 0、1、2、3 多個 Broker 上,那么你還要依次為 Broker 1、2、3 執行這條命令。

設置好這個參數之后,我們還需要為該主題設置要限速的副本。在這個例子中,我們想要為所有副本都設置限速,因此統一使用通配符 * 來表示,命令如下:

bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test

主題分區遷移

同樣是使用 kafka-reassign-partitions 腳本,對主題各個分區的副本進行 “手術” 般的調整,比如把某些分區批量遷移到其他 Broker 上。

刪除主題

bin/kafka-topics.sh --bootstrap-server broker_host:port --delete  --topic <topic_name>

刪除主題的命令並不復雜,關鍵是刪除操作是異步的,執行完這條命令不代表主題立即就被刪除了。它僅僅是被標記成 “已刪除” 狀態而已。Kafka 會在后台默默地開啟主題刪除操作。因此,通常情況下,你都需要耐心地等待一段時間。

主題刪除失敗

當運行完上面的刪除命令后,很多人發現已刪除主題的分區數據依然 “躺在” 硬盤上,沒有被清除。這時該怎么辦呢?

實際上,造成主題刪除失敗的原因有很多,最常見的原因有兩個:

  • 副本所在的 Broker 宕機了
  • 待刪除主題的部分分區依然在執行遷移過程。

如果是因為前者,通常你重啟對應的 Broker 之后,刪除操作就能自動恢復;如果是因為后者,那就麻煩了,很可能兩個操作會相互干擾。

不管什么原因,一旦你碰到主題無法刪除的問題,可以采用這樣的方法:

  1. 手動刪除 ZooKeeper 節點 /admin/delete_topics 下以待刪除主題為名的 znode。

  2. 手動刪除該主題在磁盤上的分區目錄。

  3. 在 ZooKeeper 中執行 rmr /controller,觸發 Controller 重選舉,刷新 Controller 緩存。

在執行最后一步時,你一定要謹慎,因為它可能造成大面積的分區 Leader 重選舉。事實上,僅僅執行前兩步也是可以的,只是 Controller 緩存中沒有清空待刪除主題罷了,也不影響使用。

常見問題

__consumer_offsets 占用太多的磁盤

一旦你發現這個主題消耗了過多的磁盤空間,那么,你一定要顯式地用 jstack 命令查看一下 kafka-log-cleaner-thread 前綴的線程狀態。通常情況下,這都是因為該線程掛掉了,無法及時清理此內部主題。倘若真是這個原因導致的,那我們就只能重啟相應的 Broker 了。另外,請你注意保留出錯日志,因為這通常都是 Bug 導致的,最好提交到社區看一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM