概述
現在介紹學習一下kafka的請求處理模塊,請求處理模塊就是網絡請求處理和api處理,這是kafka無論是對客戶端還是集群內部都是非常重要的模塊。現在我們對他進行源碼深入探討。當我們說到 Kafka 服務器端,也就是 Broker 的時候,往往會說它承擔着消息持久化的功能,但本質上,它其實就是一個不斷接收外部請求、處理請求,然后發送處理結果的 Java 進程。
kafka請求隊列
高效地保存排隊中的請求,是確保 Broker 高處理性能的關鍵。既然這樣,那你一定很想知道,Broker 上的請求隊列是怎么實現的呢?接下來,我們就一起看下 Broker 底層請求對象的建模和請求隊列的實現原理,以及 Broker請求處理方面的核心監控指標。目前,Broker 與 Clients 進行交互主要是基於Request/Response 機制,所以,我們很有必要學習一下源碼是如何建模或定義 Request 和 Response 的。
請求(Request)
我們先來看一下 RequestChannel 源碼中的 Request 定義代碼。
1 sealed trait BaseRequest 2 case object ShutdownRequest extends BaseRequest 3 4 class Request(val processor: Int, 5 val context: RequestContext, 6 val startTimeNanos: Long, 7 memoryPool: MemoryPool, 8 @volatile private var buffer: ByteBuffer, 9 metrics: RequestChannel.Metrics) extends BaseRequest { 10 ...... 11 }
Request 則是真正定義各類 Clients 端或 Broker 端請求的實現類。它定義的屬性包括 processor、context、startTimeNanos、memoryPool、buffer 和 metrics。下面我們一一來看。
processorprocessor 是 Processor 線程的序號,即這個請求是由哪個 Processor 線程接收處理的。Broker 端參數 num.network.threads 控制了 Broker 每個監聽器上創建的 Processor 線程數。假設你的 listeners 配置為 PLAINTEXT://localhost:9092,SSL://localhost:9093,那么,在默認情況下,Broker 啟動時會創建 6 個 Processor 線程,每 3 個為一組,分別給 listeners 參數中設置的兩個監聽器使用,每組的序號分別是 0、1、2。
contextcontext 是用來標識請求上下文信息的。Kafka 源碼中定義了 RequestContext 類,顧名思義,它保存了有關 Request 的所有上下文信息。RequestContext 類定義在 clients 工程中,
startTimeNanosstartTimeNanos 記錄了 Request 對象被創建的時間,主要用於各種時間統計指標的計算。請求對象中的很多 JMX 指標,特別是時間類的統計指標,都需要使用 startTimeNanos 字段。你要注意的是,它是以納秒為單位的時間戳信息,可以實現非常細粒度的時間統計精度。
memoryPoolmemoryPool 表示源碼定義的一個非阻塞式的內存緩沖區,主要作用是避免 Request 對象無限使用內存。當前,該內存緩沖區的接口類和實現類,分別是 MemoryPool 和 SimpleMemoryPool。你可以重點關注下 SimpleMemoryPool 的 tryAllocate 方法,看看它是怎么為 Request 對象分配內存的。
bufferbuffer 是真正保存 Request 對象內容的字節緩沖區。Request 發送方必須按照 Kafka RPC 協議規定的格式向該緩沖區寫入字節,否則將拋出 InvalidRequestException 異常。這個邏輯主要是由 RequestContext 的 parseRequest 方法實現的。
metricsmetrics 是 Request 相關的各種監控指標的一個管理類。它里面構建了一個 Map,封裝了所有的請求 JMX 指標。除了上面這些重要的字段屬性之外,Request 類中的大部分代碼都是與監控指標相關的,后面我們再詳細說。
響應(Response)
說完了 Request 代碼,我們再來說下 Response。Kafka 為 Response 定義了 1 個抽象父類和 5 個具體子類。Okay,現在,我們看下 Response 相關的代碼部分。
1 abstract class Response(val request: Request) { 2 locally { 3 val nowNs = Time.SYSTEM.nanoseconds 4 request.responseCompleteTimeNanos = nowNs 5 if (request.apiLocalCompleteTimeNanos == -1L) 6 request.apiLocalCompleteTimeNanos = nowNs 7 } 8 def processor: Int = request.processor 9 def responseString: Option[String] = Some("") 10 def onComplete: Option[Send => Unit] = None 11 override def toString: String 12 }
這個抽象基類只有一個屬性字段:request。這就是說,每個 Response 對象都要保存它對應的 Request 對象。我在前面說過,onComplete 方法是調用指定回調邏輯的地方。SendResponse 類就是復寫(Override)了這個方法,如下所示:
1 class SendResponse(request: Request, 2 val responseSend: Send, 3 val responseAsString: Option[String], 4 val onCompleteCallback: Option[Send => Unit]) 5 extends Response(request) { 6 ...... 7 override def onComplete: Option[Send => Unit] = onCompleteCallback 8 }
這里的 SendResponse 類繼承了 Response 父類,並重新定義了 onComplete 方法。復寫的邏輯很簡單,就是指定輸入參數 onCompleteCallback。
RequestChannel
RequestChannel,顧名思義,就是傳輸 Request/Response 的通道。有了 Request 和 Response 的基礎,下面我們可以學習 RequestChannel 類的實現了。我們先看下 RequestChannel 類的定義和重要的字段屬性。
1 class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup { 2 import RequestChannel._ 3 val metrics = new RequestChannel.Metrics 4 private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) 5 private val processors = new ConcurrentHashMap[Int, Processor]() 6 val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) 7 val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) 8 9 ...... 10 }
RequestChannel 類實現了 KafkaMetricsGroup trait,后者封裝了許多實用的指標監控方法,比如,newGauge 方法用於創建數值型的監控指標,newHistogram 方法用於創建直方圖型的監控指標。就 RequestChannel 類本身的主體功能而言,它定義了最核心的 3 個屬性:requestQueue、queueSize 和 processors。下面我分別解釋下它們的含義。
每個 RequestChannel 對象實例創建時,會定義一個隊列來保存 Broker 接收到的各類請求,這個隊列被稱為請求隊列或 Request 隊列。Kafka 使用 Java 提供的阻塞隊列 ArrayBlockingQueue 實現這個請求隊列,並利用它天然提供的線程安全性來保證多個線程能夠並發安全高效地訪問請求隊列。在代碼中,這個隊列由變量requestQueue定義。而字段 queueSize 就是 Request 隊列的最大長度。
當 Broker 啟動時,SocketServer 組件會創建 RequestChannel 對象,並把 Broker 端參數 queued.max.requests 賦值給 queueSize。因此,在默認情況下,每個 RequestChannel 上的隊列長度是 500。字段 processors 封裝的是 RequestChannel 下轄的 Processor 線程池。每個 Processor 線程負責具體的請求處理邏輯。下面我詳細說說 Processor 的管理。
Processor 管理
上面代碼中的第4行創建了一個 Processor 線程池——當然,它是用 Java 的 ConcurrentHashMap 數據結構去保存的。Map 中的 Key 就是前面我們說的 processor 序號,而 Value 則對應具體的 Processor 線程對象。這個線程池的存在告訴了我們一個事實:當前 Kafka Broker 端所有網絡線程都是在 RequestChannel 中維護的。既然創建了線程池,代碼中必然要有管理線程池的操作。RequestChannel 中的 addProcessor 和 removeProcessor 方法就是做這些事的。
1 def addProcessor(processor: Processor): Unit = { 2 // 添加Processor到Processor線程池 3 if (processors.putIfAbsent(processor.id, processor) != null) 4 warn(s"Unexpected processor with processorId ${processor.id}") 5 newGauge(responseQueueSizeMetricName, 6 () => processor.responseQueueSize, 7 // 為給定Processor對象創建對應的監控指標 8 Map(ProcessorMetricTag -> processor.id.toString)) 9 } 10 11 def removeProcessor(processorId: Int): Unit = { 12 processors.remove(processorId) // 從Processor線程池中移除給定Processor線程 13 removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString)) // 移除對應Processor的監控指標 14 }
代碼很簡單,基本上就是調用 ConcurrentHashMap 的 putIfAbsent 和 remove 方法分別實現增加和移除線程。每當 Broker 啟動時,它都會調用 addProcessor 方法,向 RequestChannel 對象添加 num.network.threads 個 Processor 線程。如果查詢 Kafka 官方文檔的話,你就會發現,num.network.threads 這個參數的更新模式(Update Mode)是 Cluster-wide。這就說明,Kafka 允許你動態地修改此參數值。比如,Broker 啟動時指定 num.network.threads 為 8,之后你通過 kafka-configs 命令將其修改為 3。顯然,這個操作會減少 Processor 線程池中的線程數量。在這個場景下,removeProcessor 方法會被調用。
處理 Request 和 Response
除了 Processor 的管理之外,RequestChannel 的另一個重要功能,是處理 Request 和 Response,具體表現為收發 Request 和發送 Response。比如,收發 Request 的方法有 sendRequest 和 receiveRequest:
1 def sendRequest(request: RequestChannel.Request): Unit = { 2 requestQueue.put(request) 3 } 4 def receiveRequest(timeout: Long): RequestChannel.BaseRequest = 5 requestQueue.poll(timeout, TimeUnit.MILLISECONDS) 6 def receiveRequest(): RequestChannel.BaseRequest = 7 requestQueue.take()
所謂的發送 Request,僅僅是將 Request 對象放置在 Request 隊列中而已,而接收 Request 則是從隊列中取出 Request。整個流程構成了一個迷你版的“生產者 - 消費者”模式,然后依靠 ArrayBlockingQueue 的線程安全性來確保整個過程的線程安全。
對於 Response 而言,則沒有所謂的接收 Response,只有發送 Response,即 sendResponse 方法。sendResponse 是啥意思呢?其實就是把 Response 對象發送出去,也就是將 Response 添加到 Response 隊列的過程。
kafka使用NIO通信
在深入學習 Kafka 各個網絡組件之前,我們先從整體上看一下完整的網絡通信層架構,如下圖所示:
可以看出,Kafka 網絡通信組件主要由兩大部分構成:SocketServer 和 KafkaRequestHandlerPool。SocketServer 組件是核心,主要實現了 Reactor 模式,用於處理外部多個 Clients(這里的 Clients 指的是廣義的 Clients,可能包含 Producer、Consumer 或其他 Broker)的並發請求,並負責將處理結果封裝進 Response 中,返還給 Clients。KafkaRequestHandlerPool 組件就是我們常說的 I/O 線程池,里面定義了若干個 I/O 線程,用於執行真實的請求處理邏輯。兩者的交互點在於 SocketServer 中定義的 RequestChannel 對象和 Processor 線程。對了,我所說的線程,在代碼中本質上都是 Runnable 類型,不管是 Acceptor 類、Processor 類。
我們要重點關注一下 SocketServer 組件。這個組件是 Kafka 網絡通信層中最重要的子模塊。它下轄的 Acceptor 線程、Processor 線程和 RequestChannel 等對象,都是實施網絡通信的重要組成部分。現在講解一下最重要的部分。Acceptor 線程、Processor 線程。
Acceptor 線程
經典的 Reactor 模式有個 Dispatcher 的角色,接收外部請求並分發給下面的實際處理線程。在 Kafka 中,這個 Dispatcher 就是 Acceptor 線程。
Acceptor 線程接收 5 個參數,其中比較重要的有 3 個。
endPoint。它就是你定義的 Kafka Broker 連接信息,比如 PLAINTEXT://localhost:9092。Acceptor 需要用到 endPoint 包含的主機名和端口信息創建 Server Socket。
sendBufferSize。它設置的是 SocketOptions 的 SO_SNDBUF,即用於設置出站(Outbound)網絡 I/O 的底層緩沖區大小。該值默認是 Broker 端參數 socket.send.buffer.bytes 的值,即 100KB。
recvBufferSize。它設置的是 SocketOptions 的 SO_RCVBUF,即用於設置入站(Inbound)網絡 I/O 的底層緩沖區大小。該值默認是 Broker 端參數 socket.receive.buffer.bytes 的值,即 100KB。
如果在你的生產環境中,Clients 與 Broker 的通信網絡延遲很大(比如 RTT>10ms),那么我建議你調大控制緩沖區大小的兩個參數,也就是 sendBufferSize 和 recvBufferSize。通常來說,默認值 100KB 太小了。
除了類定義的字段,Acceptor 線程還有兩個非常關鍵的自定義屬性。
nioSelector:是 Java NIO 庫的 Selector 對象實例,也是后續所有網絡通信組件實現 Java NIO 機制的基礎。
processors:網絡 Processor 線程池。Acceptor 線程在初始化時,需要創建對應的網絡 Processor 線程池。可見,Processor 線程是在 Acceptor 線程中管理和維護的。
Acceptor 類邏輯的重頭戲其實是 run 方法,它是處理 Reactor 模式中分發邏輯的主要實現方法。下面我使用注釋的方式給出 run 方法的大體運行邏輯,如下所示:
1 def run(): Unit = { 2 //注冊OP_ACCEPT事件 3 serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) 4 // 等待Acceptor線程啟動完成 5 startupComplete() 6 try { 7 // 當前使用的Processor序號,從0開始,最大值是num.network.threads - 1 8 var currentProcessorIndex = 0 9 while (isRunning) { 10 try { 11 // 每500毫秒獲取一次就緒I/O事件 12 val ready = nioSelector.select(500) 13 if (ready > 0) { // 如果有I/O事件准備就緒 14 val keys = nioSelector.selectedKeys() 15 val iter = keys.iterator() 16 while (iter.hasNext && isRunning) { 17 try { 18 val key = iter.next 19 iter.remove() 20 if (key.isAcceptable) { 21 // 調用accept方法創建Socket連接 22 accept(key).foreach { socketChannel => 23 var retriesLeft = synchronized(processors.length) 24 var processor: Processor = null 25 do { 26 retriesLeft -= 1 27 // 指定由哪個Processor線程進行處理 28 processor = synchronized { 29 currentProcessorIndex = currentProcessorIndex % processors.length 30 processors(currentProcessorIndex) 31 } 32 // 更新Processor線程序號 33 currentProcessorIndex += 1 34 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) // Processor是否接受了該連接 35 } 36 } else 37 throw new IllegalStateException("Unrecognized key state for acceptor thread.") 38 } catch { 39 case e: Throwable => error("Error while accepting connection", e) 40 } 41 } 42 } 43 } 44 catch { 45 case e: ControlThrowable => throw e 46 case e: Throwable => error("Error occurred", e) 47 } 48 } 49 } finally { // 執行各種資源關閉邏輯 50 debug("Closing server socket and selector.") 51 CoreUtils.swallow(serverChannel.close(), this, Level.ERROR) 52 CoreUtils.swallow(nioSelector.close(), this, Level.ERROR) 53 shutdownComplete() 54 } 55 }
基本上,Acceptor 線程使用 Java NIO 的 Selector + SocketChannel 的方式循環地輪詢准備就緒的 I/O 事件。這里的 I/O 事件,主要是指網絡連接創建事件,即代碼中的 SelectionKey.OP_ACCEPT。一旦接收到外部連接請求,Acceptor 就會指定一個 Processor 線程,並將該請求交由它,讓它創建真正的網絡連接。
Processor 線程
如果說 Acceptor 是做入站連接處理的,那么,Processor 代碼則是真正創建連接以及分發請求的地方。顯然,它要做的事情遠比 Acceptor 要多得多。processor線程的run方法如下:
1 override def run(): Unit = { 2 startupComplete() // 等待Processor線程啟動完成 3 try { 4 while (isRunning) { 5 try { 6 configureNewConnections() // 創建新連接 7 // register any new responses for writing 8 processNewResponses() // 發送Response,並將Response放入到inflightResponses臨時隊列 9 poll() // 執行NIO poll,獲取對應SocketChannel上准備就緒的I/O操作 10 processCompletedReceives() // 將接收到的Request放入Request隊列 11 processCompletedSends() // 為臨時Response隊列中的Response執行回調邏輯 12 processDisconnected() // 處理因發送失敗而導致的連接斷開 13 closeExcessConnections() // 關閉超過配額限制部分的連接 14 } catch { 15 case e: Throwable => processException("Processor got uncaught exception.", e) 16 } 17 } 18 } finally { // 關閉底層資源 19 debug(s"Closing selector - processor $id") 20 CoreUtils.swallow(closeAll(), this, Level.ERROR) 21 shutdownComplete() 22 } 23 }
每個 Processor 線程在創建時都會創建 3 個隊列。注意,這里的隊列是廣義的隊列,其底層使用的數據結構可能是阻塞隊列,也可能是一個 Map 對象而已,如下所示:
1 private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize) 2 private val inflightResponses = mutable.Map[String, RequestChannel.Response]() 3 private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()
隊列一:newConnections
它保存的是要創建的新連接信息,具體來說,就是 SocketChannel 對象。這是一個默認上限是 20 的隊列,而且,目前代碼中硬編碼了隊列的長度,因此,你無法變更這個隊列的長度。每當 Processor 線程接收新的連接請求時,都會將對應的 SocketChannel 放入這個隊列。后面在創建連接時(也就是調用 configureNewConnections 時),就從該隊列中取出 SocketChannel,然后注冊新的連接。
隊列二:inflightResponses
嚴格來說,這是一個臨時 Response 隊列。當 Processor 線程將 Response 返還給 Request 發送方之后,還要將 Response 放入這個臨時隊列。為什么需要這個臨時隊列呢?這是因為,有些 Response 回調邏輯要在 Response 被發送回發送方之后,才能執行,因此需要暫存在一個臨時隊列里面。這就是 inflightResponses 存在的意義。
隊列三:responseQueue
看名字我們就可以知道,這是 Response 隊列,而不是 Request 隊列。這告訴了我們一個事實:每個 Processor 線程都會維護自己的 Response 隊列,而不是像網上的某些文章說的,Response 隊列是線程共享的或是保存在 RequestChannel 中的。Response 隊列里面保存着需要被返還給發送方的所有 Response 對象。
請求要分優先級
在閱讀 SocketServer 代碼、深入學習請求優先級實現機制之前,我們要先掌握一些基本概念,這是我們理解后面內容的基礎。
1.Data plane 和 Control plane社區將 Kafka 請求類型划分為兩大類:數據類請求和控制類請求。Data plane 和 Control plane 的字面意思是數據面和控制面,各自對應數據類請求和控制類請求,也就是說 Data plane 負責處理數據類請求,Control plane 負責處理控制類請求。目前,Controller 與 Broker 交互的請求類型有 3 種:LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest。這 3 類請求屬於控制類請求,通常應該被賦予高優先級。像我們熟知的 PRODUCE 和 FETCH 請求,就是典型的數據類請求。對這兩大類請求區分處理,是 SocketServer 源碼實現的核心邏輯。
2. 監聽器(Listener)目前,源碼區分數據類請求和控制類請求不同處理方式的主要途徑,就是通過監聽器。也就是說,創建多組監聽器分別來執行數據類和控制類請求的處理代碼。在 Kafka 中,Broker 端參數 listeners 和 advertised.listeners 就是用來配置監聽器的。在源碼中,監聽器使用 EndPoint 類來定義。
每個 EndPoint 對象定義了 4 個屬性,我們分別來看下。
host:Broker 主機名。
port:Broker 端口號。
listenerName:監聽器名字。目前預定義的名稱包括 PLAINTEXT、SSL、SASL_PLAINTEXT 和 SASL_SSL。Kafka 允許你自定義其他監聽器名稱,比如 CONTROLLER、INTERNAL 等。
securityProtocol:監聽器使用的安全協議。Kafka 支持 4 種安全協議,分別是 PLAINTEXT、SSL、SASL_PLAINTEXT 和 SASL_SSL。
這里簡單提一下,Broker 端參數 listener.security.protocol.map 用於指定不同名字的監聽器都使用哪種安全協議。我舉個例子,如果 Broker 端相應參數配置如下:
1 listener.security.protocol.map=CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:SSL 2 listeners=CONTROLLER://192.1.1.8:9091,INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093
那么,這就表示,Kafka 配置了 3 套監聽器,名字分別是 CONTROLLER、INTERNAL 和 EXTERNAL,使用的安全協議分別是 PLAINTEXT、PLAINTEXT 和 SSL。有了這些基礎知識,接下來,我們就可以看一下 SocketServer 是如何實現 Data plane 與 Control plane 的分離的。當然,在此之前,我們要先了解下 SocketServer 的定義。
Data plane 和 Control plane 注釋下面分別定義了一組變量,即 Processor 線程池、Acceptor 線程池和 RequestChannel 實例。
創建 Data plane 所需資源
SocketServer 的 createDataPlaneAcceptorsAndProcessors 方法負責為 Data plane 創建所需資源。我們看下它的實現:
1 private def createDataPlaneAcceptorsAndProcessors( 2 dataProcessorsPerListener: Int, endpoints: Seq[EndPoint]): Unit = { 3 // 遍歷監聽器集合 4 endpoints.foreach { endpoint => 5 // 將監聽器納入到連接配額管理之下 6 connectionQuotas.addListener(config, endpoint.listenerName) 7 // 為監聽器創建對應的Acceptor線程 8 val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix) 9 // 為監聽器創建多個Processor線程。具體數目由num.network.threads決定 10 addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) 11 // 將<監聽器,Acceptor線程>對保存起來統一管理 12 dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) 13 info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") 14 } 15 }
createDataPlaneAcceptorsAndProcessors 方法會遍歷你配置的所有監聽器,然后為每個監聽器執行下面的邏輯。
初始化該監聽器對應的最大連接數計數器。后續這些計數器將被用來確保沒有配額超限的情形發生。
為該監聽器創建 Acceptor 線程,也就是調用 Acceptor 類的構造函數,生成對應的 Acceptor 線程實例。
創建 Processor 線程池。對於 Data plane 而言,線程池的數量由 Broker 端參數 num.network.threads 決定。
將 < 監聽器,Acceptor 線程 > 對加入到 Acceptor 線程池統一管理。
創建 Control plane 所需資源
前面說過了,基於控制類請求的負載遠遠小於數據類請求負載的假設,Control plane 的配套資源只有 1 個 Acceptor 線程 + 1 個 Processor 線程 + 1 個深度是 20 的請求隊列而已。和 Data plane 相比,這些配置稍顯寒酸,不過在大部分情況下,應該是夠用了。SocketServer 提供了 createControlPlaneAcceptorAndProcessor 方法,用於為 Control plane 創建所需資源,源碼如下:
1 private def createControlPlaneAcceptorAndProcessor( 2 endpointOpt: Option[EndPoint]): Unit = { 3 // 如果為Control plane配置了監聽器 4 endpointOpt.foreach { endpoint => 5 // 將監聽器納入到連接配額管理之下 6 connectionQuotas.addListener(config, endpoint.listenerName) 7 // 為監聽器創建對應的Acceptor線程 8 val controlPlaneAcceptor = createAcceptor(endpoint, ControlPlaneMetricPrefix) 9 // 為監聽器創建對應的Processor線程 10 val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool) 11 controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) 12 controlPlaneProcessorOpt = Some(controlPlaneProcessor) 13 val listenerProcessors = new ArrayBuffer[Processor]() 14 listenerProcessors += controlPlaneProcessor 15 // 將Processor線程添加到控制類請求專屬RequestChannel中 16 // 即添加到RequestChannel實例保存的Processor線程池中 17 controlPlaneRequestChannelOpt.foreach( 18 _.addProcessor(controlPlaneProcessor)) 19 nextProcessorId += 1 20 // 把Processor對象也添加到Acceptor線程管理的Processor線程池中 21 controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlaneThreadPrefix) 22 info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}") 23 } 24 }
總體流程和 createDataPlaneAcceptorsAndProcessors 非常類似,只是方法開頭需要判斷是否配置了用於 Control plane 的監聽器。目前,Kafka 規定只能有 1 套監聽器用於 Control plane,而不能像 Data plane 那樣可以配置多套監聽器。如果認真看的話,你會發現,上面兩張圖中都沒有提到啟動 Acceptor 和 Processor 線程。那這些線程到底是在什么時候啟動呢?實際上,Processor 和 Acceptor 線程是在啟動 SocketServer 組件之后啟動的,具體代碼在 KafkaServer.scala 文件的 startup 方法中,如下所示:
1 // KafkaServer.scala 2 def startup(): Unit = { 3 try { 4 info("starting") 5 ...... 6 // 創建SocketServer組件 7 socketServer = new SocketServer(config, metrics, time, credentialProvider) 8 // 啟動SocketServer,但不啟動Processor線程 9 socketServer.startup(startProcessingRequests = false) 10 ...... 11 // 啟動Data plane和Control plane的所有線程 12 socketServer.startProcessingRequests(authorizerFutures) 13 ...... 14 } catch { 15 ...... 16 } 17 }
1 def startProcessingRequests(authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit = { 2 info("Starting socket server acceptors and processors") 3 this.synchronized { 4 if (!startedProcessingRequests) { 5 // 啟動處理控制類請求的Processor和Acceptor線程 6 startControlPlaneProcessorAndAcceptor(authorizerFutures) 7 // 啟動處理數據類請求的Processor和Acceptor線程 8 startDataPlaneProcessorsAndAcceptors(authorizerFutures) 9 startedProcessingRequests = true 10 } else { 11 info("Socket server acceptors and processors already started") 12 } 13 } 14 info("Started socket server acceptors and processors") 15 }
請求處理全流程
要知道,Kafka 官網可沒有告訴我們,什么是網絡線程和 I/O 線程。如果不明白“請求是被網絡線程接收並放入請求隊列的”這件事,我們就很可能犯這樣的錯誤——當請求隊列快滿了的時候,我們會以為是網絡線程處理能力不夠,進而盲目地增加 num.network.threads 值,但最終效果很可能是適得其反的。我相信,在今天的課程結束之后,你就會知道,碰到這種情況的時候,我們更應該增加的是 num.io.threads 的值。num.io.threads 參數表征的就是 I/O 線程池的大小。所謂的 I/O 線程池,即 KafkaRequestHandlerPool,也稱請求處理線程池。這節課我會先講解 KafkaRequestHandlerPool 源碼,再具體解析請求處理全流程的代碼。
KafkaRequestHandlerPool
KafkaRequestHandlerPool 是真正處理 Kafka 請求的地方。切記,Kafka 中處理請求的類不是 SocketServer,也不是 RequestChannel,而是 KafkaRequestHandlerPool。
1 // 關鍵字段說明 2 // id: I/O線程序號 3 // brokerId:所在Broker序號,即broker.id值 4 // totalHandlerThreads:I/O線程池大小 5 // requestChannel:請求處理通道 6 // apis:KafkaApis類,用於真正實現請求處理邏輯的類 7 class KafkaRequestHandler( 8 id: Int, 9 brokerId: Int, 10 val aggregateIdleMeter: Meter, 11 val totalHandlerThreads: AtomicInteger, 12 val requestChannel: RequestChannel, 13 apis: KafkaApis, 14 time: Time) extends Runnable with Logging { 15 ...... 16 }
KafkaRequestHandler 是一個 Runnable 對象,因此,你可以把它當成是一個線程。每個 KafkaRequestHandler 實例,都有 4 個關鍵的屬性。
id:請求處理線程的序號,類似於 Processor 線程的 ID 序號,僅僅用於標識這是線程池中的第幾個線程。
brokerId:Broker 序號,用於標識這是哪個 Broker 上的請求處理線程。
requestChannel:SocketServer 中的請求通道對象。KafkaRequestHandler 對象為什么要定義這個字段呢?我們說過,它是負責處理請求的類,那請求保存在什么地方呢?實際上,請求恰恰是保存在 RequestChannel 中的請求隊列中,因此,Kafka 在構造 KafkaRequestHandler 實例時,必須關聯 SocketServer 組件中的 RequestChannel 實例,也就是說,要讓 I/O 線程能夠找到請求被保存的地方。
apis:這是一個 KafkaApis 類。如果說 KafkaRequestHandler 是真正處理請求的,那么,KafkaApis 類就是真正執行請求處理邏輯的地方。在第 10 節課,我會具體講解 KafkaApis 的代碼。目前,你需要知道的是,它有個 handle 方法,用於執行請求處理邏輯。
run 方法的主要運行邏輯。它的所有執行邏輯都在 while 循環之下,因此,只要標志線程關閉狀態的 stopped 為 false,run 方法將一直循環執行 while 下的語句。第 1 步是從請求隊列中獲取下一個待處理的請求,同時更新一些相關的統計指標。如果本次循環沒取到,那么本輪循環結束,進入到下一輪。如果是 ShutdownRequest 請求,則說明該 Broker 發起了關閉操作。而 Broker 關閉時會調用 KafkaRequestHandler 的 shutdown 方法,進而調用 initiateShutdown 方法,以及 RequestChannel 的 sendShutdownRequest 方法,而后者就是將 ShutdownRequest 寫入到請求隊列。一旦從請求隊列中獲取到 ShutdownRequest,run 方法代碼會調用 shutdownComplete 的 countDown 方法,正式完成對 KafkaRequestHandler 線程的關閉操作。你看看 KafkaRequestHandlerPool 的 shutdown 方法代碼,就能明白這是怎么回事了。
1 def shutdown(): Unit = synchronized { 2 info("shutting down") 3 for (handler <- runnables) 4 handler.initiateShutdown() // 調用initiateShutdown方法發起關閉 5 for (handler <- runnables) 6 // 調用awaitShutdown方法等待關閉完成 7 // run方法一旦調用countDown方法,這里將解除等待狀態 8 handler.awaitShutdown() 9 info("shut down completely") 10 }
KafkaRequestHandlerPool從上面的分析來看,KafkaRequestHandler 邏輯大體上還是比較簡單的。下面我們來看下 KafkaRequestHandlerPool 線程池的實現。它是管理 I/O 線程池的,實現邏輯也不復雜。它的 shutdown 方法前面我講過了,這里我們重點學習下,它是如何創建這些線程的,以及創建它們的時機。首先看它的定義:
1 // 關鍵字段說明 2 // brokerId:所屬Broker的序號,即broker.id值 3 // requestChannel:SocketServer組件下的RequestChannel對象 4 // api:KafkaApis類,實際請求處理邏輯類 5 // numThreads:I/O線程池初始大小 6 class KafkaRequestHandlerPool( 7 val brokerId: Int, 8 val requestChannel: RequestChannel, 9 val apis: KafkaApis, 10 time: Time, 11 numThreads: Int, 12 requestHandlerAvgIdleMetricName: String, 13 logAndThreadNamePrefix : String) 14 extends Logging with KafkaMetricsGroup { 15 // I/O線程池大小 16 private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) 17 // I/O線程池 18 val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) 19 ...... 20 }
KafkaRequestHandlerPool 對象定義了 7 個屬性,其中比較關鍵的有 4 個,我分別來解釋下。
brokerId:和 KafkaRequestHandler 中的一樣,保存 Broker 的序號。
requestChannel:SocketServer 的請求處理通道,它下轄的請求隊列為所有 I/O 線程所共享。requestChannel 字段也是 KafkaRequestHandler 類的一個重要屬性。
apis:KafkaApis 實例,執行實際的請求處理邏輯。它同時也是 KafkaRequestHandler 類的一個重要屬性。
numThreads:線程池中的初始線程數量。它是 Broker 端參數 num.io.threads 的值。目前,Kafka 支持動態修改 I/O 線程池的大小,因此,這里的 numThreads 是初始線程數,調整后的 I/O 線程池的實際大小可以和 numThreads 不一致。
全處理流程
比較熟悉的圖形如下圖:
第 1 步:Clients 或其他 Broker 發送請求給 Acceptor 線程我在第 7 節課講過,Acceptor 線程實時接收來自外部的發送請求。一旦接收到了之后,就會創建對應的 Socket 通道。可以看到,Acceptor 線程通過調用 accept 方法,創建對應的 SocketChannel,然后將該 Channel 實例傳給 assignNewConnection 方法,等待 Processor 線程將該 Socket 連接請求,放入到它維護的待處理連接隊列中。后續 Processor 線程的 run 方法會不斷地從該隊列中取出這些 Socket 連接請求,然后創建對應的 Socket 連接。assignNewConnection 方法的主要作用是,將這個新建的 SocketChannel 對象存入 Processors 線程的 newConnections 隊列中。之后,Processor 線程會不斷輪詢這個隊列中的待處理 Channel(可以參考第 7 講的 configureNewConnections 方法),並向這些 Channel 注冊基於 Java NIO 的 Selector,用於真正的請求獲取和響應發送 I/O 操作。嚴格來說,Acceptor 線程處理的這一步並非真正意義上的獲取請求,僅僅是 Acceptor 線程為后續 Processor 線程獲取請求鋪路而已,也就是把需要用到的 Socket 通道創建出來,傳給下面的 Processor 線程使用。
第 2 & 3 步:Processor 線程處理請求,並放入請求隊列一旦 Processor 線程成功地向 SocketChannel 注冊了 Selector,Clients 端或其他 Broker 端發送的請求就能通過該 SocketChannel 被獲取到,具體的方法是 Processor 的 processCompleteReceives。因為代碼很多,我進行了精簡,只保留了最關鍵的邏輯。該方法會將 Selector 獲取到的所有 Receive 對象轉換成對應的 Request 對象,然后將這些 Request 實例放置到請求隊列中,就像上圖中第 2、3 步展示的那樣。所謂的 Processor 線程處理請求,就是指它從底層 I/O 獲取到發送數據,將其轉換成 Request 對象實例,並最終添加到請求隊列的過程。
第 4 步:I/O 線程處理請求所謂的 I/O 線程,就是我們開頭提到的 KafkaRequestHandler 線程,它的處理邏輯就在 KafkaRequestHandler 類的 run 方法中。
第 5 步:KafkaRequestHandler 線程將 Response 放入 Processor 線程的 Response 隊列這一步的工作由 KafkaApis 類完成。當然,這依然是由 KafkaRequestHandler 線程來完成的。KafkaApis.scala 中有個 sendResponse 方法,將 Request 的處理結果 Response 發送出去。本質上,它就是調用了 RequestChannel 的 sendResponse 方法。
第 6 步:Processor 線程發送 Response 給 Request 發送方最后一步是,Processor 線程取出 Response 隊列中的 Response,返還給 Request 發送方。具體代碼位於 Processor 線程的 processNewResponses 方法中。
KafkaApis
KafkaApis 類的定義代碼如下:
1 class KafkaApis( 2 val requestChannel: RequestChannel, // 請求通道 3 val replicaManager: ReplicaManager, // 副本管理器 4 val adminManager: AdminManager, // 主題、分區、配置等方面的管理器 5 val groupCoordinator: GroupCoordinator, // 消費者組協調器組件 6 val txnCoordinator: TransactionCoordinator, // 事務管理器組件 7 val controller: KafkaController, // 控制器組件 8 val zkClient: KafkaZkClient, // ZooKeeper客戶端程序,Kafka依賴於該類實現與ZooKeeper交互 9 val brokerId: Int, // broker.id參數值 10 val config: KafkaConfig, // Kafka配置類 11 val metadataCache: MetadataCache, // 元數據緩存類 12 val metrics: Metrics, 13 val authorizer: Option[Authorizer], 14 val quotas: QuotaManagers, // 配額管理器組件 15 val fetchManager: FetchManager, 16 brokerTopicStats: BrokerTopicStats, 17 val clusterId: String, 18 time: Time, 19 val tokenManager: DelegationTokenManager) extends Logging { 20 type FetchResponseStats = Map[TopicPartition, RecordConversionStats] 21 this.logIdent = "[KafkaApi-%d] ".format(brokerId) 22 val adminZkClient = new AdminZkClient(zkClient) 23 private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId) 24 ...... 25 }
KafkaApis 是 Broker 端所有功能的入口,同時關聯了超多的 Kafka 組件。它絕對是你學習源碼的第一入口。面對龐大的源碼工程,如果你不知道從何下手,那就先從 KafkaApis.scala 這個文件開始吧。
handle 方法封裝了所有 RPC 請求的具體處理邏輯。每當社區新增 RPC 協議時,增加對應的 handle×××Request 方法和 case 分支都是首要的。
sendResponse 系列方法負責發送 Response 給請求發送方。發送 Response 的邏輯是將 Response 對象放置在 Processor 線程的 Response 隊列中,然后交由 Processor 線程實現網絡發送。
authorize 方法是請求處理前權限校驗層的主要邏輯實現。你可以查看一下官方文檔,了解一下當前都有哪些權限,然后對照着具體的方法,找出每類 RPC 協議都要求 Clients 端具備什么權限。
總結
以后關於kafka系列的總結大部分來自Geek Time的課件,大家可以自行關鍵字搜索。