摘要:Kafka網絡模塊之Server端,介紹Server端啟動、接收請求和處理請求的過程。
本文分享自華為雲社區《Kafka網絡模塊-Server端》,原文作者:中間件小哥 。
SocketServer 是 Kafka server 端用於處理請求的模塊,在 Kafka 啟動過程創建、初始化、啟動。
SocketServer啟動過程:

- 按照 endpoint 順序初始化 Acceptor,每個 endpoint 對應一個 Acceptor,為每個 Acceptor 創建 Processor(數量由 num.network.threads 配置項決定),並啟動 Acceptor,Acceptor 啟動后會通過 selector 監聽連接,並將新建立的連接交給 Processor 處理(輪詢選擇 Processor)
- 啟動所有 Processor
Acceptor啟動、監聽連接過程:

- Acceptor啟動后,會創建一個 serverSocketChannel,監聽在該 acceptor 對應的 endpoint 上,並在 selector 上注冊 OP_ACCEPT,然后進入死循環,每次循環,通過 selector 獲取就緒的 key(即前面注冊的 serverSocketChannel),表明有連接來到,然后通過 accept() 創建一個和該連接對應的 socketChannel,然后從該 acceptor 負責的 processors 中輪詢選擇一個,將該 socketChannel 交給選擇的 processor 處理,即將連接交給 processor。
- Acceptor 將連接交給 processor 處理,是將 socketChannel 加入 processor 的連接隊列 newConnection 中,processor 在 run 方法中會不斷地從中獲取並處理。
- Processor 從 newConnection 獲取到 socketChannel 后,在 selector 上注冊 OP_READ,並創建對應的 KafkaChannel。
Server端接收請求、處理的過程:
- Processor 收到 OP_READ 的事件就緒后,檢查並嘗試完成 SSL 握手和 SASL 校驗(此時不一定握手完成,所以在 Processor 收到 OP_READ 的事件就緒后,要先檢查並確保握手已經完成,SSL/SASL相關參考 9.4 節)
- SSL 握手和 SASL 校驗完成后,從 channel 中讀取數據,構造 NetworkReceive 對象,並入隊 stagedReceives
- 取出 stagedReceives 隊首元素(移除),加入 completedReceives
- 將 completedReceives 中的元素取出(不移除),構造 Request 對象,加入 requestQueue,移除對 OP_READ 的事件注冊,並將對應的 KafkaChannel 置為 MUTED,再置為 MUTED_AND_RESPONSE_PENDING
- KafkaRequestHandler 從 requestQueue 取出元素(移除),並交給 KafkaApi 模塊處理請求
- KafkaApi 處理完請求后,將響應放入對應的 processor 的 responseQueue 和 inflightResponses 中,並喚醒其 selector
- Processor 從 responseQueue 中取出響應(移除),若響應是需要發回給客戶端的,則將響應的 send 賦值給 KafkaChannel,並注冊 OP_WRITE 事件
- 當 channel 寫就緒后,將 send 寫入 channel 的寫 buffer,當 send 寫完后,移除對 OP_WRITE 事件的注冊,並將 send 加入 completedSends
- 從 inflightResponses 中移除對應的響應,執行響應回調,將 KafkaChannel 置為 MUTED,再從 MUTED 置為 NOT_MUTED,並重新添加 OP_READ 事件注冊
