大家好,我是 yes。
這是我的第三篇Kafka源碼分析文章,前兩篇講了日志段的讀寫和二分算法在kafka索引上的應用
今天來講講 Kafka Broker
端處理請求的全流程,剖析下底層的網絡通信是如何實現的、Reactor在kafka上的應用。
再說說社區為何在2.3版本將請求類型划分成兩大類,又是如何實現兩類請求處理的優先級。
叨叨
不過在進入今天主題之前我想先叨叨幾句,就源碼這個事兒,不同人有不同的看法。
有些人聽到源碼這兩個詞就被嚇到了,這么多代碼怎么看。奔進去就像無頭蒼蠅,一路斷點跟下來,跳來跳去,算了拜拜了您嘞。
而有些人覺得源碼有啥用,看了和沒看一樣,看了也用不上。
其實上面兩種想法我都有過,哈哈哈。那為什么我會開始看Kafka
源碼呢?
其實就是我有個同事在自學go
,然后想用go寫個消息隊列,在畫架構圖的時候就來問我,這消息隊列好像有點東西啊,消息收發,元數據管理,消息如何持久一堆問題過來,我直呼頂不住。
這市面上Kafka
、RocketMQ
都是現成的方案,於是乎我就看起了源碼。
所以促使我看源碼的初始動力,竟然是為了在同事前面裝逼!!
我是先看了RocketMQ
,因為畢竟是Java
寫的,而Kafka Broker
都是scala
寫的。
梳理了一波RocketMQ
之后,我又想看看Kafka
是怎么做的,於是乎我又看起了Kafka
。
在源碼分析之前我先總結性的說了說Kafka
底層的通信模型。應對面試官詢問Kafka
請求全過程已經夠了。
Reactor模式
在扯到Kafka
之前我們先來說說Reactor模式
,基本上只要是底層的高性能網絡通信就離不開Reactor模式
。像Netty、Redis都是使用Reactor模式
。
像我們以前剛學網絡編程的時候以下代碼可是非常的熟悉,新來一個請求,要么在當前線程直接處理了,要么新起一個線程處理。
在早期這樣的編程是沒問題的,但是隨着互聯網的快速發展,單線程處理不過來,也不能充分的利用計算機資源。
而每個請求都新起一個線程去處理,資源的要求就太高了,並且創建線程也是一個重操作。
說到這有人想到了,那搞個線程池不就完事了嘛,還要啥Reactor
。
池化技術確實能緩解資源的問題,但是池子是有限的,池子里的一個線程不還是得候着某個連接,等待指示嘛。現在的互聯網時代早已突破C10K
了。
因此引入的IO多路復用
,由一個線程來監視一堆連接,同步等待一個或多個IO事件的到來,然后將事件分發給對應的Handler
處理,這就叫Reactor模式
。
網絡通信模型的發展如下
單線程 => 多線程 => 線程池 => Reactor模型
Kafka所采用的Reactor模型
如下
Kafka Broker 網絡通信模型
簡單來說就是,Broker 中有個Acceptor(mainReactor)
監聽新連接的到來,與新連接建連之后輪詢選擇一個Processor(subReactor)
管理這個連接。
而Processor
會監聽其管理的連接,當事件到達之后,讀取封裝成Request
,並將Request
放入共享請求隊列中。
然后IO線程池不斷的從該隊列中取出請求,執行真正的處理。處理完之后將響應發送到對應的Processor
的響應隊列中,然后由Processor
將Response
返還給客戶端。
每個listener
只有一個Acceptor線程
,因為它只是作為新連接建連再分發,沒有過多的邏輯,很輕量,一個足矣。
Processor
在Kafka中稱之為網絡線程,默認網絡線程池有3個線程,對應的參數是num.network.threads
。並且可以根據實際的業務動態增減。
還有個 IO 線程池,即KafkaRequestHandlerPool
,執行真正的處理,對應的參數是num.io.threads
,默認值是 8。IO線程處理完之后會將Response
放入對應的Processor
中,由Processor
將響應返還給客戶端。
可以看到網絡線程和IO線程之間利用的經典的生產者 - 消費者模式,不論是用於處理Request的共享請求隊列,還是IO處理完返回的Response。
這樣的好處是什么?生產者和消費者之間解耦了,可以對生產者或者消費者做獨立的變更和擴展。並且可以平衡兩者的處理能力,例如消費不過來了,我多加些IO線程。
如果你看過其他中間件源碼,你會發現生產者-消費者模式真的是太常見了,所以面試題經常會有手寫一波生產者-消費者。
源碼級別剖析網絡通信模型
Kafka 網絡通信組件主要由兩大部分構成:SocketServer 和 KafkaRequestHandlerPool。
SocketServer
可以看出SocketServer
旗下管理着,Acceptor 線程
、Processor 線程
和 RequestChannel
等對象。
data-plane
和control-plane
稍后再做分析,先看看RequestChannel
是什么。
RequestChannel
關鍵的屬性和方法都已經在下面代碼中注釋了,可以看出這個對象主要就是管理Processor
和作為傳輸Request
和Response
的中轉站。
Acceptor
接下來我們再看看Acceptor
可以看到它繼承了AbstractServerThread
,接下來再看看它run些啥
再來看看accept(key)
做了啥
很簡單,標准selector
的處理,獲取准備就緒事件,調用serverSocketChannel.accept()
得到socketChannel
,將socketChannel
交給通過輪詢選擇出來的Processor
,之后由它來處理IO事件。
Processor
接下來我們再看看Processor
,相對而言比Acceptor
復雜一些。
先來看看三個關鍵的成員
再來看看主要的處理邏輯。
可以看到Processor
主要是將底層讀事件IO數據封裝成Request
存入隊列中,然后將IO線程塞入的Response
,返還給客戶端,並處理Response
的回調邏輯。
KafkaRequestHandlerPool
IO線程池,實際處理請求的線程。
再來看看IO線程都干了些啥
很簡單,核心就是不斷的從requestChannel
拿請求,然后調用handle處理請求。
handle
方法是位於KafkaApis
類中,可以理解為通過switch
,根據請求頭里面不同的apikey
調用不同的handle
來處理請求。
我們再舉例看下較為簡單的處理LIST_OFFSETS
的過程,即handleListOffsetRequest
,來完成一個請求的閉環。
我用紅色箭頭標示了調用鏈。表明處理完請求之后是塞給對應的Processor
的。
最后再來個更詳細的總覽圖,把源碼分析到的類基本上都對應的加上去了。
請求處理優先級
上面提到的data-plane
和control-plane
是時候揭開面紗了。這兩個對應的就是數據類請求和控制類請求。
為什么需要分兩類請求呢?直接在請求里面用key標明請求是要讀寫數據啊還是更新元數據不就行了嗎?
簡單點的說比如我們想刪除某個topic,我們肯定是想這個topic馬上被刪除的,而此時producer還一直往這個topic寫數據,那這個情況可能是我們的刪除請求排在第N個...等前面的寫入請求處理好了才輪到刪除的請求。實際上前面哪些往這個topic寫入的請求都是沒用的,平白的消耗資源。
再或者說進行Preferred Leader
選舉時候,producer
將ack
設置為all
時候,老leader
還在等着follower
寫完數據向他報告呢,誰知follower
已經成為了新leader
,而通知它leader已經變更的請求由於被一堆數據類型請求堵着呢,老leader
就傻傻的在等着,直到超時。
就是為了解決這種情況,社區將請求分為兩類。
那如何讓控制類的請求優先被處理?優先隊列?
社區采取的是兩套Listener
,即數據類型一個listener
,控制類一個listener
。
對應的就是我們上面講的網絡通信模型,在kafka中有兩套! kafka通過兩套監聽變相的實現了請求優先級,畢竟數據類型請求肯定很多,控制類肯定少,這樣看來控制類肯定比大部分數據類型先被處理!
迂回戰術啊。
控制類的和數據類區別就在於,就一個Porcessor線程
,並且請求隊列寫死的長度為20。
最后
看源碼主要就是得耐心,耐心跟下去。然后再跳出來看。你會發現不過如此,哈哈哈。
我是yes,一個在互聯網摸爬滾打且莫得感情的工具人。