所有的討論都是基於KIP-291展開的。抱歉,這又是一篇沒有圖的文字。
目前Kafka broker對所有發過來的請求都是一視同仁的,不會區別對待。不管是用於生產消費的PRODUCE和FETCH請求,還是controller端發送的LeaderAndIsr/StopReplica/UpdateMetadata請求,亦或是其他類型的請求也是一樣。通常我們這里把PRODUCE/FETCH請求稱為數據類請求;把controller發送的那3種請求稱為控制類請求或controller類請求——在源碼中前者被稱為data plane request,后者稱為controller plane request。
這種公平處理原則在很多場合下都是不合理的。為什么?簡單來說控制類請求具有直接令數據類請求失效的能力。舉個例子,如果我有個topic,單分區雙副本,其中broker0上保存leader副本,broker1上保存follower副本。當broker0上積壓了大量的PRODUCE請求時,此時用戶執行了重分區或preferred分區選舉將broker1變更成了leader,那么controller會向broker0發送LeaderAndIsr請求告訴它現在是一個follower了,而broker1上的follower已經停止向leader拉取數據(因為它要成為leader了)——此時一個比較尷尬的情形出現了:如果producer的acks設置的是all,那么這些在LeaderAndIsr請求之前積壓的PRODUCE請求就無法正常完成——要么一直緩存在purtagory中要么請求超時返回給client。設想一下,如果Kafka能夠及時地處理LeaderAndIsr請求,那么這些積壓的PRODUCE請求就能立即失敗(NOT_LEADER_FOR_PARTITION),馬上返回給client。Client不用等到 purgatory中的請求超時,降低了請求的處理時間。即使acks不是all,縱然積壓的PRODUCE請求寫入本地日志后成功返回,但處理過LeaderAndIsr請求后broker0上副本變為follower,還要執行截斷(truncation),因此在client看來這些消息就丟失了。
再舉一個例子,同樣是在積壓大量數據類請求的broker上,如果用戶刪除了topic,那么StopReplica請求無法及時處理,導致topic無法真正刪除,增加了刪除topic的延時。
最后還可以舉個例子說明對UpdateMetadata的影響。如果UpdateMetadata不能及時處理,broker上保存的就是過期的元數據,當client獲取到這些數據時,不管是producer還是consumer都可能無法正常工作,直到獲取到最新的元數據信息。
通過上面3個例子可以看出通常情況下我們希望controller類請求的處理優先級要高於數據類請求,這也是社區做KIP-291的初衷 。可喜的是Kafka 2.2正式實現了這個功能,下面我們來看看社區是怎么做的:
其實在KIP-291之前,我也思考過這個問題。當時我提出的想法是這樣的:在broker的KafkaRequestHandlerPool中實現一個優先級隊列,當controller類請求到達時,它能夠”搶占式“地排在處理隊列的最前部——這是很自然的想法,所以我本以為KIP-291也是這么實現的,但通篇看下來我尷尬地發現我這個解決思路記錄在“Rejected Alternatives"中。這個方案最大的問題在於它無法處理隊列已滿的情形,即當處理隊列已經無法容納任何新的請求時該如何支持優先處理controller類請求?縱然有優先級隊列也無法解決這個問題。
KIP-291是怎么解決的呢?很簡單,Kafka重新為controller類請求做了專屬的監聽器+請求隊列+acceptor+processor線程。監聽器通過Kafka的listeners和advertised.listeners設置,新的請求隊列則專門保存controller類請求,而acceptor和processor線程負責接收網絡發送過來的以及處理隊列中的controller類請求。我們一個一個說吧。
當前,用戶可以在listeners中指定多套監聽器,比如PLAINTEXT://kafka1:9092, SSL://kafka1:9093。你其實也可以自定義你的監聽器,比如INTERNAL://kafka1:9094。用戶可以指定broker端參數inter.broker.listener.name或security.inter.broker.protocol(兩個不能同時指定)來設定,同時你還需要在listener.security.protocol.map中指定這個自定義listener使用的安全協議,比如: listener.security.protocol.map=INTERNAL:PLAINTEXT。KIP-291復用了這個設計,如果你設置了inter.broker.listener.name或security.inter.broker.protocol,Kafka會默認使用這個listener專屬服務controller類請求。同時社區還引入了一個新的參數:control.plane.listener.name,用來專門讓你設置服務controller類請求的監聽器名稱。這個參數的優先級要高於前面那兩個參數,因此還是推薦用戶直接設置此參數,比如設置control.plane.listener.name=CONTROLLER,同時更新listener.security.protocol.map,增加CONTROLLER:PLAINTEXT匹配對(假設你用的是PLAINTEXT)。這就是為controller類請求創建監聽器的方法。
下面說請求隊列和acceptor、processor線程。 其實也不用細說,和現有的設計一模一樣,只是默認的隊列大小不再是500,而是20,默認的線程數不再是8而是2,因為我們假設controller類請求通常不應該有積壓。具體的實現原理有興趣的話直接讀KafkaRequestHandlerPool.scala、RequestChannel.scala和SocketServer.scala源碼吧。還需要修改的地方是controller代碼,特別是在ControllerChannelManager.scala中增加新的broker時一定要使用controller類請求專屬的監聽器。
除了以上這些,該KIP也引入了很多監控controller類請求處理的JMX指標,如隊列請求數、線程空閑程度等,這些和之前的指標都是一樣的,只是僅監控controller plane監聽器之用。再說一點,當前Kafka支持動態地調整請求處理線程數。在對請求進行區分處理后,我估計后續也要支持對controller類請求線程數的動態調整吧。
總體來說,將請求做區分處理后對於繁忙Kafka集群將能夠更迅速地處理控制類請求,表現為狀態的更新更加及時,集群不一致狀態窗口將會縮小,同時還提升了整體可用性。目前該KIP還只是對請求做兩類處理,也許日后會做一些更加細粒度的區分——比如Metadata請求是否也應該享有更高的優先級處理。
最后還想提一句,KIP-291是我認為近期社區改動影響比較大的兩個KIP之一。另一個則是KIP-392——還記得Kafka不能從follower副本讀數據的限制吧?這個KIP要打破這個限制!只是目前該KIP還在討論中,我們后面拭目以待吧。