背景
早期京麥搭建 HTTP 和 TCP 長連接功能主要用於消息通知的推送,並未應用於 API 網關。隨着逐步對 NIO 的深入學習和對 Netty 框架的了解,以及對系統通信穩定能力越來越高的要求,開始有了采用 NIO 技術應用網關實現 API 請求調用的想法,最終在 2016 年實現,並完全支撐業務化運行。
由於諸多的改進,包括 TCP 長連接容器、Protobuf 的序列化、服務泛化調用框架等等,性能比 HTTP 網關提升 10 倍以上,穩定性也遠遠高於 HTTP 網關。
架構
基於 Netty 構建京麥 TCP 網關的長連接容器,作為網關接入層提供服務 API 請求調用。
一、網絡結構
客戶端通過域名 + 端口訪問 TCP 網關,域名不同的運營商對應不同的 VIP,VIP 發布在 LVS 上,LVS 將請求轉發給后端的 HAProxy,再由 HAProxy 把請求轉發給后端的 Netty 的 IP+Port。
LVS 轉發給后端的 HAProxy,請求經過 LVS,但是響應是 HAProxy 直接反饋給客戶端的,這也就是 LVS 的 DR 模式。

二、TCP 網關長連接容器架構
TCP 網關的核心組件是 Netty,而 Netty 的 NIO 模型是 Reactor 反應堆模型(Reactor 相當於有分發功能的多路復用器 Selector)。每一個連接對應一個 Channel(多路指多個 Channel,復用指多個連接復用了一個線程或少量線程,在 Netty 指 EventLoop),一個 Channel 對應唯一的 ChannelPipeline,多個 Handler 串行的加入到 Pipeline 中,每個 Handler 關聯唯一的 ChannelHandlerContext。
TCP 網關長連接容器的 Handler 就是放在 Pipeline 的中。我們知道 TCP 屬於 OSI 的傳輸層,所以建立 Session 管理機制構建會話層來提供應用層服務,可以極大的降低系統復雜度。
所以,每一個 Channel 對應一個 Connection,一個 Connection 又對應一個 Session,Session 由 Session Manager 管理,Session 與 Connection 是一一對應的,Connection 保存着 ChannelHandlerContext(ChannelHanderContext 可以找到 Channel),Session 通過心跳機制來保持 Channel 的 Active 狀態。
每一次 Session 的會話請求(ChannelRead)都是通過 Proxy 代理機制調用 Service 層,數據請求完畢后通過寫入 ChannelHandlerConext 再傳送到 Channel 中。數據下行主動推送也是如此,通過 Session Manager 找到 Active 的 Session,輪詢寫入 Session 中的 ChannelHandlerContext,就可以實現廣播或點對點的數據推送邏輯。

Netty 的應用實踐
京麥 TCP 網關使用 Netty Channel 進行數據通信,使用 Protobuf 進行序列化和反序列化,每個請求都將被封裝成 Byte 二進制字節流,在整個生命周期中,Channel 保持長連接,而不是每次調用都重新創建 Channel,達到鏈接的復用。
一、TCP 網關 Netty Server 的 IO 模型
- 創建 ServerBootstrap,設定 BossGroup 與 WorkerGroup 線程池。
- bind 指定的 port,開始偵聽和接受客戶端鏈接。(如果系統只有一個服務端 port 需要監聽,則 BossGroup 線程組線程數設置為 1。)
- 在 ChannelPipeline 注冊 childHandler,用來處理客戶端鏈接中的請求幀。
二、TCP 網關的線程模型
TCP 網關使用 Netty 的線程池,共三組線程池,分別為 BossGroup、WorkerGroup 和 ExecutorGroup。其中,BossGroup 用於接收客戶端的 TCP 連接,WorkerGroup 用於處理 I/O、執行系統 Task 和定時任務,ExecutorGroup 用於處理網關業務加解密、限流、路由,及將請求轉發給后端的抓取服務等業務操作。

NioEventLoop 是 Netty 的 Reactor 線程,其角色:
- Boss Group:作為服務端 Acceptor 線程,用於 accept 客戶端鏈接,並轉發給 WorkerGroup 中的線程。
- Worker Group:作為 IO 線程,負責 IO 的讀寫,從 SocketChannel 中讀取報文或向 SocketChannel 寫入報文。
- Task Queue/Delay Task Queue:作為定時任務線程,執行定時任務,例如鏈路空閑檢測和發送心跳消息等。
三、TCP 網關執行時序圖

其中步驟一至步驟九是 Netty 服務端的創建時序,步驟十至步驟十三是 TCP 網關容器創建的時序。
- 步驟一:創建 ServerBootstrap 實例,ServerBootstrap 是 Netty 服務端的啟動輔助類。
- 步驟二:設置並綁定 Reactor 線程池,EventLoopGroup 是 Netty 的 Reactor 線程池,EventLoop 負責所有注冊到本線程的 Channel。
- 步驟三:設置並綁定服務器 Channel,Netty Server 需要創建 NioServerSocketChannel 對象。
- 步驟四:TCP 鏈接建立時創建 ChannelPipeline,ChannelPipeline 本質上是一個負責和執行 ChannelHandler 的職責鏈。
- 步驟五:添加並設置 ChannelHandler,ChannelHandler 串行的加入 ChannelPipeline 中。
- 步驟六:綁定監聽端口並啟動服務端,將 NioServerSocketChannel 注冊到 Selector 上。
- 步驟七:Selector 輪訓,由 EventLoop 負責調度和執行 Selector 輪詢操作。
- 步驟八:執行網絡請求事件通知,輪詢准備就緒的 Channel,由 EventLoop 執行 ChannelPipeline。
- 步驟九:執行 Netty 系統和業務 ChannelHandler,依次調度並執行 ChannelPipeline 的 ChannelHandler。
- 步驟十:通過 Proxy 代理調用后端服務,ChannelRead 事件后,通過發射調度后端 Service。
- 步驟十一:創建 Session,Session 與 Connection 是相互依賴關系。
- 步驟十二:創建 Connection,Connection 保存 ChannelHandlerContext。
- 步驟十三:添加 SessionListener,SessionListener 監聽 SessionCreate 和 SessionDestory 等事件。
四、TCP 網關源碼分析
1. Session 管理
Session 是客戶端與服務端建立的一次會話鏈接,會話信息中保存着 SessionId、連接創建時間、上次訪問事件,以及 Connection 和 SessionListener,在 Connection 中保存了 Netty 的 ChannelHandlerContext 上下文信息。Session 會話信息會保存在 SessionManager 內存管理器中。

創建 Session 的源碼
通過源碼分析,如果 Session 已經存在銷毀 Session,但是這個需要特別注意,創建 Session 一定不要創建那些斷線重連的 Channel,否則會出現 Channel 被誤銷毀的問題。因為如果在已經建立 Connection(1) 的 Channel 上,再建立 Connection(2),進入 session.close 方法會將 cxt 關閉,Connection(1) 和 Connection(2) 的 Channel 都將會被關閉。在斷線之后再建立連接 Connection(3),由於 Session 是有一定延遲,Connection(3) 和 Connection(1/2) 不是同一個,但 Channel 可能是同一個。
所以,如何處理是否是斷線重練的 Channel,具體的方法是在 Channel 中存入 SessionId,每次事件請求判斷 Channel 中是否存在 SessionId,如果 Channel 中存在 SessionId 則判斷為斷線重連的 Channel。

2. 心跳
心跳是用來檢測保持連接的客戶端是否還存活着,客戶端每間隔一段時間就會發送一次心跳包上傳到服務端,服務端收到心跳之后更新 Session 的最后訪問時間。在服務端長連接會話檢測通過輪詢 Session 集合判斷最后訪問時間是否過期,如果過期則關閉 Session 和 Connection,包括將其從內存中刪除,同時注銷 Channel 等。

通過源碼分析,在每個 Session 創建成功之后,都會在 Session 中添加 TcpHeartbeatListener 這個心跳檢測的監聽,TcpHeartbeatListener 是一個實現了 SessionListener 接口的守護線程,通過定時休眠輪詢 Sessions 檢查是否存在過期的 Session,如果輪訓出過期的 Session,則關閉 Session。

同時,注意到 session.connect 方法,在 connect 方法中會對 Session 添加的 Listeners 進行添加時間,它會循環調用所有 Listner 的 sessionCreated 事件,其中 TcpHeartbeatListener 也是在這個過程中被喚起。

3. 數據上行
數據上行特指從客戶端發送數據到服務端,數據從 ChannelHander 的 channelRead 方法獲取數據。數據包括創建會話、發送心跳、數據請求等。這里注意的是,channelRead 的數據包括客戶端主動請求服務端的數據,以及服務端下行通知客戶端的返回數據,所以在處理 object 數據時,通過數據標識區分是請求 - 應答,還是通知 - 回復。

4. 數據下行
數據下行通過 MQ 廣播機制到所有服務器,所有服務器收到消息后,獲取當前服務器所持有的所有 Session 會話,進行數據廣播下行通知。如果是點對點的數據推送下行,數據也是先廣播到所有服務器,每天服務器判斷推送的端是否是當前服務器持有的會話,如果判斷消息數據中的信息是在當前服務,則進行推送,否則拋棄。

通過源碼分析,數據下行則通過 NotifyProxy 的方式發送數據,需要注意的是 Netty 是 NIO,如果下行通知需要獲取返回值,則要將異步轉同步,所以 NotifyFuture 是實現 java.util.concurrent.Future 的方法,通過設置超時時間,在 channelRead 獲取到上行數據之后,通過 seq 來關聯 NotifyFuture 的方法。

下行的數據通過 TcpConnector 的 send 方法發送,send 方式則是通過 ChannelHandlerContext 的 writeAndFlush 方法寫入 Channel,並實現數據下行,這里需要注意的是,之前有另一種寫法就是 cf.await,通過阻塞的方式來判斷寫入是否成功,這種寫法偶發出現 BlockingOperationException 的異常。


使用阻塞獲取返回值的寫法
關於 BlockingOperationException 的問題我在 StackOverflow 進行提問,非常幸運的得到了 Norman Maurer(Netty 的核心貢獻者之一)的解答。
最終結論大致分析出,在執行 write 方法時,Netty 會判斷 current thread 是否就是分給該 Channe 的 EventLoop,如果是則行線程執行 IO 操作,否則提交 executor 等待分配。當執行 await 方法時,會從 executor 里 fetch 出執行線程,這里就需要 checkDeadLock,判斷執行線程和 current threads 是否時同一個線程,如果是就檢測為死鎖拋出異常 BlockingOperationException。
總結
本篇文章粗淺地向大家介紹了京麥 TCP 網關中使用 Netty 實現長連接容器的架構,對涉及 TCP 長連接容器搭建的關鍵點一一進行了闡述,以及對源碼進行簡單地分析。在京麥發展過程里 Netty 還有很多的實踐應用,例如 Netty4.11+HTTP2 實現 APNs 的消息推送等等。
參考:

