Kafka處理請求的全流程分析


大家好,我是 yes。

這是我的第三篇Kafka源碼分析文章,前兩篇講了日志段的讀寫二分算法在kafka索引上的應用

今天來講講 Kafka Broker端處理請求的全流程,剖析下底層的網絡通信是如何實現的、Reactor在kafka上的應用。

再說說社區為何在2.3版本將請求類型划分成兩大類,又是如何實現兩類請求處理的優先級。

叨叨

不過在進入今天主題之前我想先叨叨幾句,就源碼這個事兒,不同人有不同的看法。

有些人聽到源碼這兩個詞就被嚇到了,這么多代碼怎么看。奔進去就像無頭蒼蠅,一路斷點跟下來,跳來跳去,算了拜拜了您嘞。

而有些人覺得源碼有啥用,看了和沒看一樣,看了也用不上。

其實上面兩種想法我都有過,哈哈哈。那為什么我會開始看Kafka源碼呢?

其實就是我有個同事在自學go,然后想用go寫個消息隊列,在畫架構圖的時候就來問我,這消息隊列好像有點東西啊,消息收發,元數據管理,消息如何持久一堆問題過來,我直呼頂不住。

這市面上KafkaRocketMQ都是現成的方案,於是乎我就看起了源碼。

所以促使我看源碼的初始動力,竟然是為了在同事前面裝逼!!

我是先看了RocketMQ,因為畢竟是Java寫的,而Kafka Broker都是scala寫的。

梳理了一波RocketMQ之后,我又想看看Kafka是怎么做的,於是乎我又看起了Kafka

在源碼分析之前我先總結性的說了說Kafka底層的通信模型。應對面試官詢問Kafka請求全過程已經夠了。

Reactor模式

在扯到Kafka之前我們先來說說Reactor模式,基本上只要是底層的高性能網絡通信就離不開Reactor模式。像Netty、Redis都是使用Reactor模式

像我們以前剛學網絡編程的時候以下代碼可是非常的熟悉,新來一個請求,要么在當前線程直接處理了,要么新起一個線程處理。

在早期這樣的編程是沒問題的,但是隨着互聯網的快速發展,單線程處理不過來,也不能充分的利用計算機資源。

而每個請求都新起一個線程去處理,資源的要求就太高了,並且創建線程也是一個重操作。

說到這有人想到了,那搞個線程池不就完事了嘛,還要啥Reactor

池化技術確實能緩解資源的問題,但是池子是有限的,池子里的一個線程不還是得候着某個連接,等待指示嘛。現在的互聯網時代早已突破C10K了。

因此引入的IO多路復用,由一個線程來監視一堆連接,同步等待一個或多個IO事件的到來,然后將事件分發給對應的Handler處理,這就叫Reactor模式

網絡通信模型的發展如下

單線程 => 多線程 => 線程池 => Reactor模型

Kafka所采用的Reactor模型如下
圖來自Doug Lea大神的 Scalable IO in Java

Kafka Broker 網絡通信模型

簡單來說就是,Broker 中有個Acceptor(mainReactor)監聽新連接的到來,與新連接建連之后輪詢選擇一個Processor(subReactor)管理這個連接。

Processor會監聽其管理的連接,當事件到達之后,讀取封裝成Request,並將Request放入共享請求隊列中。

然后IO線程池不斷的從該隊列中取出請求,執行真正的處理。處理完之后將響應發送到對應的Processor的響應隊列中,然后由ProcessorResponse返還給客戶端。

每個listener只有一個Acceptor線程,因為它只是作為新連接建連再分發,沒有過多的邏輯,很輕量,一個足矣。

Processor 在Kafka中稱之為網絡線程,默認網絡線程池有3個線程,對應的參數是num.network.threads。並且可以根據實際的業務動態增減。

還有個 IO 線程池,即KafkaRequestHandlerPool,執行真正的處理,對應的參數是num.io.threads,默認值是 8。IO線程處理完之后會將Response放入對應的Processor中,由Processor將響應返還給客戶端。

可以看到網絡線程和IO線程之間利用的經典的生產者 - 消費者模式,不論是用於處理Request的共享請求隊列,還是IO處理完返回的Response。

這樣的好處是什么?生產者和消費者之間解耦了,可以對生產者或者消費者做獨立的變更和擴展。並且可以平衡兩者的處理能力,例如消費不過來了,我多加些IO線程。

如果你看過其他中間件源碼,你會發現生產者-消費者模式真的是太常見了,所以面試題經常會有手寫一波生產者-消費者。

源碼級別剖析網絡通信模型

Kafka 網絡通信組件主要由兩大部分構成:SocketServerKafkaRequestHandlerPool

SocketServer


可以看出SocketServer旗下管理着,Acceptor 線程Processor 線程RequestChannel 等對象。

data-planecontrol-plane稍后再做分析,先看看RequestChannel是什么。

RequestChannel


關鍵的屬性和方法都已經在下面代碼中注釋了,可以看出這個對象主要就是管理Processor作為傳輸RequestResponse的中轉站

Acceptor

接下來我們再看看Acceptor

可以看到它繼承了AbstractServerThread,接下來再看看它run些啥


再來看看accept(key) 做了啥

很簡單,標准selector的處理,獲取准備就緒事件,調用serverSocketChannel.accept()得到socketChannel,將socketChannel交給通過輪詢選擇出來的Processor,之后由它來處理IO事件。

Processor

接下來我們再看看Processor,相對而言比Acceptor 復雜一些。

先來看看三個關鍵的成員

再來看看主要的處理邏輯。

可以看到Processor主要是將底層讀事件IO數據封裝成Request存入隊列中,然后將IO線程塞入的Response,返還給客戶端,並處理Response 的回調邏輯。

KafkaRequestHandlerPool

IO線程池,實際處理請求的線程。

再來看看IO線程都干了些啥

很簡單,核心就是不斷的從requestChannel拿請求,然后調用handle處理請求。

handle方法是位於KafkaApis類中,可以理解為通過switch,根據請求頭里面不同的apikey調用不同的handle來處理請求。

我們再舉例看下較為簡單的處理LIST_OFFSETS的過程,即handleListOffsetRequest,來完成一個請求的閉環。

我用紅色箭頭標示了調用鏈。表明處理完請求之后是塞給對應的Processor的。

最后再來個更詳細的總覽圖,把源碼分析到的類基本上都對應的加上去了。

請求處理優先級

上面提到的data-planecontrol-plane是時候揭開面紗了。這兩個對應的就是數據類請求和控制類請求。

為什么需要分兩類請求呢?直接在請求里面用key標明請求是要讀寫數據啊還是更新元數據不就行了嗎?

簡單點的說比如我們想刪除某個topic,我們肯定是想這個topic馬上被刪除的,而此時producer還一直往這個topic寫數據,那這個情況可能是我們的刪除請求排在第N個...等前面的寫入請求處理好了才輪到刪除的請求。實際上前面哪些往這個topic寫入的請求都是沒用的,平白的消耗資源。

再或者說進行Preferred Leader選舉時候,producerack設置為all時候,老leader還在等着follower寫完數據向他報告呢,誰知follower已經成為了新leader,而通知它leader已經變更的請求由於被一堆數據類型請求堵着呢,老leader就傻傻的在等着,直到超時。

就是為了解決這種情況,社區將請求分為兩類。

那如何讓控制類的請求優先被處理?優先隊列?

社區采取的是兩套Listener,即數據類型一個listener,控制類一個listener

對應的就是我們上面講的網絡通信模型,在kafka中有兩套! kafka通過兩套監聽變相的實現了請求優先級,畢竟數據類型請求肯定很多,控制類肯定少,這樣看來控制類肯定比大部分數據類型先被處理!

迂回戰術啊。

控制類的和數據類區別就在於,就一個Porcessor線程,並且請求隊列寫死的長度為20。

最后

看源碼主要就是得耐心,耐心跟下去。然后再跳出來看。你會發現不過如此,哈哈哈。

我是yes,一個在互聯網摸爬滾打且莫得感情的工具人。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM