Kafka Controller 是 Kafka 的核心組件,在前面的文章中,已經詳細講述過 Controller 部分的內容。在過去的幾年根據大家在生產環境中應用的反饋,Controller 也積累了一些比較大的問題,而針對這些問題的修復,代碼的改動量都是非常大的,無疑是一次重構,因此,社區准備在新版的系統里對 Controller 做一些相應的優化(0.11.0及以后的版本),相應的設計方案見:Kafka Controller Redesign,本文的內容就是結合這篇文章做一個簡單的總結。
Controller 功能
在一個 Kafka 中,Controller 要處理的事情總結如下表所示:
| 功能 | 詳情 | |
|---|---|---|
| cluster metadata updates | producer 或 consumer 可以通過 MetadataRequest 請求從集群任何一台 broker 上查詢到某個 Partition 的 metadata 信息,如果一個 Partition 的 leader 或 isr 等信息變化,Controller 會廣播到集群的所有 broker 上,這樣每台 Broker 都會有該 Partition 的最新 Metadata 信息 | |
| topic creation | 用戶可以通過多種方式創建一個 topic,最終的結果都是在 zk 的 /brokers/topics目錄下新建一個 topic 節點信息,controller 通過監控這個目錄來判斷是否有新的 topic 需要創建 |
|
| topic deletion | Controller 通過監控 zk 的 /admin/delete_topics 節點來觸發 topic 刪除操作 |
|
| partition reassignment | Controller 通過監控 zk 的 /admin/reassign_partitions 節點來觸發 Partition 的副本遷移操作 |
|
| preferred replica leader election | Controller 通過監控 zk 的 /admin/preferred_replica_election 節點來觸發最優 leader 選舉操作,該操作的目的選舉 Partition 的第一個 replica 作為 leader |
|
| topic partition expansion | Controller 通過監控 zk 的 /brokers/topics/<topic> 數據內容的變化,來觸發 Topic 的 Partition 擴容操作 |
|
| broker join | Controller 通過監控 zk 的 /brokers/ids 目錄變化,就會知道哪些 broker 是最新加入的,進而觸發 broker 的上線操作 |
|
| broker failure | 同樣,Controller 通過監控 zk 的 /brokers/ids 目錄變化,就會知道哪些 broker 掉線了,進而觸發 broker 的下線操作 |
|
| controlled shutdown | Controller 通過處理 ControlledShudownRequest 請求來優雅地關閉一個 broker 節點,主動關閉與直接 kill 的區別,它可以減少 Partition 的不可用時間,因為一個 broker 的 zk 臨時節點消失是需要一定時間的 | |
| controller leader election | 集群中所有 broker 會監聽 zk 的 /controller 節點,如果該節點消失,所有的 broker 都回去搶占 controller 節點,搶占成功的,就成了最新的 controller |
Controller 目前存在的問題
之所以要重新設計 Controller,是因為現在的 Controller 積累了一些比較難解決的問題,這些問題解決起來,代碼改動量都是巨大的,甚至需要改變 controller 部門的設計,基本就跟重構差不多了,下面我們先來了看一下 controller 之前(主要是 0.11.0 之前的版本)存在的一些問題。
目前遇到的比較大的問題有以下幾個:
- Partition 級別同步 zk 寫;
- sequential per-partition controller-to-broker requests;
- Controller 復雜的並發語義;
- 代碼組織混亂;
- 控制類請求與數據類請求未分離;
- Controller 給 broker 的請求中沒有 broker 的 generation信息;
- ZkClient 阻礙 Client 的狀態管理。
Partition 級別同步 zk 寫
zookeeper 的同步寫意味着在下次寫之前需要等待前面整個過程的結束,而且由於它們都是 partition 粒度的(一個 Partition 一個 Partition 的去執行寫操作),對於 Partition 非常多的集群來說,需要等待的時間會更長,Controller 通常會在下面這兩個地方做 Partition 級別 zookeeper 同步寫操作:
- PartitionStateMachine 在進行觸發 leader 選舉(partition 目的狀態是 OnlinePartition),將會觸發上面的操作;
- ReplicaStateMachine 更新 LeaderAndIsr 信息到 zk(replica 狀態轉變為 OfflineReplica),這種情況也觸發這種情況,它既阻礙了 Controller 進程,也有可能會 zk 造成壓力。
sequential per-partition controller-to-broker requests
Controller 在向 Broker 發送請求,有些情況下也是 Partition 粒度去發送的,效率非常低,比如在 Controller 處理 broker shutdown 請求時,這里是按 Partition 級別處理,每處理一個 Partition 都會執行 Partition、Replica 狀態變化以及 Metadata 更新,並且調用 sendRequestsToBrokers() 向 broker 發送請求,這樣的話,效率將變得非常低。
Controller 復雜的並發語義
Controller 需要在多個線程之間共享狀態信息,這些線程有:
- IO threads handling controlled shutdown requests
- The ZkClient org.I0Itec.zkclient.ZkEventThread processing zookeeper callbacks sequentially;
- The TopicDeletionManager kafka.controller.DeleteTopicsThread;
- Per-broker RequestSendThread within ControllerChannelManager.
所有這些線程都需要訪問或修改狀態信息(ControllerContext),現在它們是通過 ControllerContext 的 controllerLock(排它鎖)實現的,Controller 的並發變得虛弱無力。
代碼組織混亂
KafkaController 部分的代碼組織(KafkaController、PartitionStateMachine 和 ReplicaStateMachine)不是很清晰,比如,下面的問題就很難回答:
- where and when does zookeeper get updated?
- where and when does a controller-to-broker request get formed?
- what impact does a failing zookeeper update or controller-to-broker request have on the cluster state?
這也導致了這部分很多開發者不敢輕易去改動。
控制類請求與數據類請求未分離
現在 broker 收到的請求,有來自 client、broker 和 controller 的請求,這些請求都會被放到同一個 requestQueue 中,它們有着同樣的優先級,所以來自 client 的請求很可能會影響來自 controller 請求的處理(如果是 leader 變動的請求,ack 設置的不是 all,這種情況有可能會導致數據丟失)。
Controller 給 broker 的請求中沒有 broker 的 generation信息
這里的 Broker generation 代表着一個標識,每當它重新加入集群時,這個標識都會變化。如果 Controller 的請求沒有這個信息的話,可能會導致一個重啟的 Broker 收到之前的請求,讓 Broker 進入到一個錯誤的狀態。
比如,Broker 收到之前的 StopReplica 請求,可能會導致副本同步線程退出。
ZkClient 阻礙 Client 的狀態管理
這里的狀態管理指的是當 Client 發生重連或會話過期時,Client 可以監控這種狀態變化,並做出一些處理,因為開源版的 ZKClient 在處理 notification 時,是線性處理的,一些 notification 會被先放到 ZkEventThread’s queue 中,這樣會導致一些最新的 notification 不能及時被處理,特別是與 zk 連接斷開重連的情況。
Controller 改進方案
關於上述問題,Kafka 提出了一些改進方案,有些已經在最新版的系統中實現,有的還在規划中。
使用異步的 zk api
Zookeeper 的 client 提供三種執行請求的方式:
- 同步調用,意味着下次請求需要等待當前當前請求的完成;
- 異步調用,意味着不需要等待當前請求的完成就可以開始下次請求的執行,並且我們可以通過回調機制去處理請求返回的結果;
- 單請求的 batch 調用,意味着 batch 內的所有請求都會在一次事務處理中完成,這里需要關注的是 zookeeper 的 server 對單請求的大小是有限制的(jute.maxbuffer)。
文章中給出了三種請求的測試結果,Kafka 最后選取的是異步處理機制,因為對於單請求處理,異步處理更加簡潔,並且相比於同步處理還可以保持一個更好的寫性能。
improve controller-to-broker request batching
這個在設計文檔還是 TODO 狀態,具體的方案還沒確定,不過基本可以猜測一下,因為目的是提高 batch 發送能力,那么只能是在調用對每個 broker 的 RequestSenderThread 線程發送請求之前,做一下檢測,而不是來一個請求立馬就發送,這是一個性能與時間的權衡,如果不是立馬發送請求,那么可能會帶來 broker 短時 metadata 信息的不一致,這個不一致時間不同的應用場景要求是不一樣的。
單線程的事件處理模型
采用單線程的時間處理模型將極大簡化 Controller 的並發實現,只允許這個線程訪問和修改 Controller 的本地狀態信息,因此在 Controller 部分也就不需要到處加鎖來保證線程安全了。
目前 1.1.0 的實現中,Controller 使用了一個 ControllerEventThread 線程來處理所有的 event,目前可以支持13種不同類型事件:
- Idle:代表當前 ControllerEventThread 處理空閑狀態;
- ControllerChange:Controller 切換處理;
- BrokerChange:Broker 變動處理,broker 可能有上線或掉線;
- TopicChange:Topic 新增處理;
- TopicDeletion:Topic 刪除處理;
- PartitionReassignment:Partition 副本遷移處理;
- AutoLeaderBalance:自動 rebalance 處理;
- ManualLeaderBalance:最優 leader 選舉處理,這里叫做手動 rebalance,手動去切流量;
- ControlledShutdown:優雅關閉 broker;
- IsrChange:Isr 變動處理;
- LeaderAndIsrResponseReceived;
- LogDirChange:Broker 某個目錄失敗后的處理(比如磁盤壞掉等);
- ControllerShutdown:ControllerEventThread 處理這個事件時,會關閉當前線程。
重構集群狀態管理
這部分的改動,目前社區也沒有一個很好的解決思路,重構這部分的目的是希望 Partition、Replica 的狀態管理變得更清晰一些,讓我們從代碼中可以清楚地明白狀態是在什么時間、什么地方、什么條件下被觸發的。這個優化其實是跟上面那個有很大關聯,采用單線程的事件處理模型,可以讓狀態管理也變得更清晰。
prioritize controller requests
我們想要把控制類請求與數據類請求分開,提高 controller 請求的優先級,這樣的話即使 Broker 中請求有堆積,Broker 也會優先處理控制類的請求。
這部分的優化可以在網絡層的 RequestChannel 中做,RequestChannel 可以根據請求的 id 信息把請求分為正常的和優先的,如果請求是 UpdateMetadataRequest、LeaderAndIsrRequest 或者 StopReplicaRequest,那么這個請求的優先級應該提高。實現方案有以下兩種:
- 在請求隊列中增加一個優先級隊列,優先級高的請求放到 the prioritized request queue 中,優先級低的放到普通請求隊列中,但是無論使用一個定時拉取(poll)還是2個定時拉取,都會帶來其他的問題,要么是增大普通請求的處理延遲,要么是增大了優先級高請求的延遲;
- 直接使用優先級隊列代替現在的普通隊列,設計上更傾向與這一種。
目前這部分在1.1.0中還未實現。
Controller 發送請求中添加 broker 的 generation 信息
generation 信息是用來標識當前 broker 加入集群 epoch 信息,每當 broker 重新加入集群中,該 broker.id 對應的 generation 都應該變化(要求遞增),目前有兩種實現方案:
- 為 broker 分配的一個全局唯一的 id,由 controller 廣播給其他 broker;
- 直接使用 zookeeper 的 zxid 信息(broker.id 注冊時的 zxid)。
直接使用原生的 Zookeeper client
Client 端的狀態管理意味着當 Client 端發生狀態變化(像連接中斷或回話超時)時,我們有能力做一些操作。其中,zookeeper client 有效的狀態(目前的 client 比下面又多了幾種狀態,這里先不深入)是:
- NOT_CONNECTED: the initial state of the client;
- CONNECTING: the client is establishing a connection to zookeeper;
- CONNECTED: the client has established a connection and session to zookeeper;
- CLOSED: the session has closed or expired。
有效的狀態轉移是:
- NOT_CONNECTED > CONNECTING
- CONNECTING > CONNECTED
- CONNECTING > CLOSED
- CONNECTED > CONNECTING
- CONNECTED > CLOSED
最開始的設想是直接使用原生 Client 的異步調用方式,這樣的話依然可以通過回調方法監控到狀態的變化(像連接中斷或回話超時),同樣,在每次事件處理時,可以通過檢查狀態信息來監控到 Client 狀態的變化,及時做一些處理。
當一個 Client 接收到連接中斷的 notification(Client 狀態變成了 CONNECTING 狀態),它意味着 Client 不能再從 zookeeper 接收到任何 notification 了。如果斷開連接,對於 Controller 而言,無論它現在正在做什么它都應該先暫停,因為可能集群的 Controller 已經切換到其他機器上了,只是它還沒接收到通知,它如果還在工作,可能會導致集群狀態不一致。當連接斷開后,Client 可以重新建立連接(re-establish,狀態變為 CONNECTED)或者會話過期(狀態變為 CLOSED,會話過期是由 zookeeper Server 來決定的)。如果變成了 CONNECTED 狀態,Controller 應該重新開始這些暫停的操作,而如果狀態變成了 CLOSED 狀態,舊的 Controller 就會知道它不再是 controller,應該丟棄掉這些任務。
