引用博客來自李志濤:https://www.cnblogs.com/lizherui/p/12271285.html
問題導讀
Apache Kafka在全球包括世界500強及互聯網公司在內廣泛使用,得益於它強大的功能和不斷完善的生態。作者帶領團隊及從事Kafka一線工作有幾年時間了,所思所想及實踐點滴順便記錄下來,通過分享形式希望能夠幫助到其他人。使用過Kafka朋友都知道,Kafka集群會因場景和數據規模的不同,會經常調整參數優化集群性能,為了不頻繁重啟服務而支持配置生效,Kafka社區開發了動態配置特性,Kafka動態配置是一個比較高頻好用的功能,下面我們就來一探究竟。
- 動態配置是如何設計的?
- 動態配置優先級是怎樣的?
- Broker初始化是如何讀取配置的?
- 動態配置支持哪些特性功能?
- 動態配置如何使用呢?
前言介紹
Kafka初始開源的幾個版本,當broker初始化啟動時,所有配置信息只能從server.properties靜態文件讀取,以后不再發生任何更改,隨着Kafka逐步迭代,在線業務對穩定性和個性化要求越來越突出,需要能支持在線修改功能動態生效的需求應運而生。例如:按照topic維度清理數據,依據clientid限流,根據用戶名稱進行訪問權限控制等等。目前Kafka最新版本支持以下幾類動態配置。

發展歷程

動態配置文件發展歷程:
kafka在0.8.0對topic的管理功能分布在三個shell中,它們分別是kafka-list-topic.sh、kafka-create-topic.sh、kafka-delete-topic.sh、kafka-add-partitions.sh,后來社區考慮到topic管理功能過於分散,到了0.8.1版本有關topic所有功能收斂到kafka-topics.sh中。0.8.0中只有topic的創建、刪除和列表及添加分區功能,到了0.8.1開始支持topics動態配置了。
0.9.0.0開始支持client(producer和consumer)客戶端配額限流支持,確保不因為某個或少數幾個topic的客戶端占滿了broker帶寬資源和磁盤IO資源,影響其他客戶端的正常讀寫,導致集群內主從同步也受到影響。這個功能對確保系統SLA大有好處,通過服務降級,保證寫/生產不受影響,降低或暫停讀/消費流量更容易解決系統資源瓶頸。
0.9.0版本動態配置與topic管理分離,為了保持向下兼容kafka-topics.sh依然包含操作topic動態配置功能,新增kafka-configs.sh支持clients和topics動態配置功能,所以kafka-topics.sh和kafka-configs.sh任意一個都可以修改topic動態配置
0.10.1.0版本新增支持users和brokers動態配置功能,user動態配置用於訪問資源的權限控制,提升集群的訪問和數據安全性,例如:用戶對讀/寫/創建/刪除等操作和API、topic、group資源訪問控制。broker動態配置,在不用重啟及影響服務運行情況下,broker級別功能實現動態生效,例如:副本注冊復制速率、磁盤內掛載點間數據遷移速率、網絡請求的線程數、處理請求的I/O線程數等等全局參數等等。
0.10.1.0~2.3.1版本都支持topics、clients、users、brokers四類型動態配置的11種粒度配置對象,只是配置模塊和屬性字段有增減與調整。
動態配置設計原理

用戶使用kafka-configs.sh腳本,根據格式和參數規范要求,ConfigCommand類進行相關邏輯處理、json格式和內容校驗,生成notification json,寫入到序列化持久節點上,zk路徑為xxx/config/changes/config_change_seqNo,節點名稱為config_change_seqNo,其中seqNo從1開始的自增序號。kafka集群中所有broker通過監聽zk上xxx/config/changes的children變化,每次獲得比當前內存中last_seqNo大的seqNo的json內容,從中讀取entity_type/entity_name相對路徑,由此判斷如何從xxx/config/topics|clients|users|brokers四種類型中讀取哪個配置路徑。同一個Broker在操作過程中任何時刻只能串行讀寫一種類型的配置,多種配置需要串行操作。
各個角色的作用:
kafka-config.sh: 負責寫dynamic config和notification,寫順序上圖有先后(圓圈數字)標識。
broker:負責監聽xxx/config/changes子節點變化和讀取entity_type/entity_name路徑節點上內容
zk:負責存儲notification和dynamic config及下發配置給相應的broker
notification json內容:
V1 0.10.0.1及以前版本有效
{ "version": 1, "entity_type": "topics", "entity_name": "finalTest" }
V2 0.10.1.0~2.3.1 當前最新版本都有效
{
"version": 2,
"entity_path": "topics/finalTest"
}
以上不管是version 1還是version2,本質上沒有變化。都是通過entity_type/entity_name獲得entity_path的zk相對路徑,全路徑為xxx/config/entityType/entityName,具體請看如下詳圖

entity_type=topics | clients | users | brokers
entity_name=topicName | clientId | userId | (brokerId | <default>)
當entity_type為brokers時,brokerId為broker編號與自己的server.properties對應,只對某個broker生效。“<default>”指對所有broker生效。而entity_type為topics | clients | users對所有broker都生效。通過以上entity_type/entity_name六種組合成六個zk相對路徑。
topics和clients組合原理一樣,但users和brokers卻略有不同,他們各自有2個組合,除了普通組合還有復合組合,兩種類型組合在一起,例如users有users與clients組合,zk路徑為users/<user>/clients/<clientId>;brokers動態配置非常實用,不需要重啟就能動態更改任意數量brokers配置,更改所有brokers為xxx/brokers/<default>
四類動態配置11種zk相對路徑,根據11種zk相對路徑可以讀取11種粒度配置對象dynamic config。
<default>說明:某種類型下所有作用域生效,例如xxx/clients/<default>和xxx/brokers/<default>就是集群內所有All clients和集群內所有All brokers配置都會生效,其他同理。
dynamic config內容示例:
entity_type/entity_name=topics/<topic_name>=topics/finalTest

{ "version": 1, "config": { "retention.bytes": "102400000", "flush.ms": "5000", "cleanup.policy": "compact", "flush.messages", ... } }
entity_type/entity_name=clients/<clientId>=clients/camusall

{ "version": 1, "config": { "producer_byte_rate": "20971520", "consumer_byte_rate": "20971520" } }
entity_type/entity_name=brokers/all brokers=brokers/<default>

{ "version": 1, "config": { "leader.replication.throttled.rate": "5000", "follower.replication.throttled.rate": "60000", "replica.alter.log.dirs.io.max.bytes.per.second": "5000", "log.retention.hours": "24", "log.flush.interval.messages": "5000", "min.insync.replicas": "2", ... } }
entity_type/entity_name=brokers/brokerId 與all brokers的配置內容完全一樣,只是影響的作用域范圍不同而已,此處省略。
寫配置格式校驗
如果寫入配置不進行規范校驗,broker就會讀取處理過程中,就會卡住或阻塞,影響服務運行穩定性。所以配置校驗至關重要,校驗規則如下:
- 配置參數格式必須合法,否則報錯不予接收,並拋出錯誤信息
- 輸入配置項進行校驗,輸入參數必須是kafka包含的配置項,否則過濾掉
config_change_seqNo生成規則

kafka-configs.sh腳本每成功執行一次,在zk上就創建一個新的seqNode節點(即/xxx/config/changes/config_change_seqNo),seqNode是zk的持久順序節點(PERSISTENT_SEQUENTIAL),它的組成是seqNode = seqNodePrefix + seqNodeSuffix,config_change_固定為seqNode的前綴,seqNodeSuffix = seqNo為seqNode的后綴,seqNo是10位數字的序列號,這個序列號后綴是自增的,由zk服務端自動生成和維護,每次事務請求成功就加1,它與MySQL的自增id原理一樣。
config_change_seqno清除規則

集群經過長期運行積累,xxx/config/changes下會留存大量歷史節點,如果不及時清理,會有以下影響:
- 大量無用的seqNode進行傳輸,會增加網絡帶寬負擔
- 占用zk服務端內存及存儲資源
- Kafka會做大量無效判斷和計算
綜上所述,因此必須要及時清除無用的seqNode集合,清除公式步驟如下:
- 當broker監聽到notification變化回調時,記錄系統時間。
- 獲取xxx/config/changes下所有子節點,讀取每個seqNode的創建時間
- 系統時間減去seqNode創建時間,如果時間差值大於過期時間,即changeExpirationMs,就會被刪除
- changeExpirationMs默認為15分鍾,可由broker配置。
config_change_seqno判斷處理


前面提到每當觸發回調處理,seqNode節點創建時間過期15分鍾會被刪除,刪除條件是觸發才會被執行,如果長時間不創建就可能有少數幾個seqNode一直保留。如果短時間內(15分鍾內)創建大量seqNode,又不會立即被刪除,只有等到下次觸發達到條件才行,那怎么判斷哪些會被處理呢?broker緩存中維護一個變量lastExecutedSeqNo,它負責保存執行歷史中seqNode最大順序號,所以每當觸發回調獲取seqNodeSet列表時,都能輕易判斷出哪些需要處理計算,也會同步更新lastExecutedSeqNo。
notification作用
- 通知broker有新的動態配置產生,讀取相應的動態配置
- 不用監聽大量四種類型配置下子節點,每個broker只需要監聽一個notification節點即可,高效且性能也高
- 大大減少broker監聽數量,如果像controller監聽/xxx/partitions/[0-N]/state一樣,監聽數量就是四種類型配置zk路徑乘以broker數量了
靜態與動態的配置及優先級


靜態與動態區別。靜態配置是Broker內置程序的默認配置和靜態配置文件server.properties,broker啟動前可以任意修改,啟動后不可修改。動態配置是broker啟動運行后,可以在線更新生效,偷偷說一句離線也可以改,就是不生效而已。
配置優先級。以上4個圖包含4類型配置既有動態也有靜態,那優先級如何呢?動態配置優先級高於靜態配置。如上圖1、2、4,環越小優先級越高,對於動態配置來說,修改配置作用域范圍越小優先級越高,反之亦然。優先級最高的,會逐級覆蓋相同配置項。
當broker啟動時。讀取順序依次為broker內置默認配置,broker靜態配置文件,動態配置。當配置項相同時,高優先級覆蓋次優先的,其他依次類推
圖示說明:上圖1、2、3、4中,其中圖1、2、4中環數字表示配置優先級關系,數字從1~5表示優先級從高到低。圖3為兩級關聯。Users和Clients組合實現配置管理,這兩者組合用於客戶端配額限流,Users與Clients就像兩級目錄一樣,一個User可以包含一個、多個clientId或所有clientId。圖4中既有優先級關系也有配置參數包含關系,topics類型配置是brokers類型配置的子集,brokers除了包含topic配置外還有DynamicThreadPool、DynamicListenerConfig、DynamicConnectionQuota、LogCleaner配置。
如何使用動態配置
- 使用腳本kafka-configs.sh和kafka-topics.sh,kafka-configs.sh支持四種類型,kafka-topics.sh僅支持topics類型
- 使用Apache-kafka官方提供的java版本客戶端API調用
- 直接寫zk實現,具體遵循如上寫notification和dynamic_config規范
如想更深入了解 kafka-configs.sh用法 請查看詳情
總結
- Kafka配置參數分為靜態配置和動態配置,靜態分為內置默認與外置用戶配置,用戶配置優先於內置配置
- 動態配置為4類型11個zk相對路徑,即11種粒度配置對象生效,同類型作用域范圍越小優先級越高
- 動態配置優先級比靜態配置優先級高,動態配置中Users與Clients既可以組合配置,也可以單獨配置
-
從設計原理中了解config_change_seqNo生成規則
-
上文中理解了寫Notification的作用,從而知曉什么場景會適合創建zk持久順序節點(PERSISTENT_SEQUENTIAL)
注意事項:以上配置解析,是基於Kafka-2.3.1版本分析
參考資料
Release Notes - Kafka - Version 0.8.1:https://archive.apache.org/dist/kafka/0.8.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.1.0:https://archive.apache.org/dist/kafka/0.10.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.0:https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 0.10.2.1:https://archive.apache.org/dist/kafka/0.10.2.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.0:https://archive.apache.org/dist/kafka/1.1.0/RELEASE_NOTES.html
Release Notes - Kafka - Version 1.1.1:https://archive.apache.org/dist/kafka/1.1.1/RELEASE_NOTES.html
Release Notes - Kafka - Version 2.0.0:https://archive.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
KIP-21 - Dynamic Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration
KIP-226 - Dynamic Broker Configuration:https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
Make DynamicConfigManager to use the ZkNodeChangeNotificationListener introduced as part of KAFKA-2211:https://issues.apache.org/jira/browse/KAFKA-2547
KIP-257 - Configurable Quota Management:https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management
KIP-73 Replication Quotas:https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas
