從零開始實現簡單 RPC 框架 6:網絡通信之 Netty


網絡通信的開發,就涉及到一些開發框架:Java NIONettyMina 等等。
理論上來說,類似於序列化器,可以為其定義一套統一的接口,讓不同類型的框架實現,事實上,Dubbo 就是這么干的。
但是,作為一個簡單的 RPC 框架,ccx-rpc 就先不統一了,因為基本上網絡框架是不會換的,而且統一起來代碼量巨大。
ccx-rpc 選擇的網絡框架是 NettyNetty 是一款大名鼎鼎的異步事件驅動的網絡應用程序框架,支持快速地開發可維護的高性能的面向協議的服務器和客戶端。

Netty 在 JDK 自帶的 NIO 基礎之上進行了封裝,解決了 JDK 自身的一些問題,具備如下優點:

  • 入門簡單,使用方便,文檔齊全,無其他依賴,只依賴 JDK 就夠了。
  • 高性能,高吞吐,低延遲,資源消耗少。
  • 靈活的線程模型,支持阻塞和非阻塞的I/O 模型。
  • 代碼質量高,目前主流版本基本沒有 Bug。

下面我們先來介紹一下 Netty 的核心設計吧。

Netty 線程模型設計

服務收到請求之后,執行的邏輯大致有:編解碼、消息派發、業務處理以及返回響應。這些邏輯是放到一個線程串行執行,還是分配到不同線程中執行,會對程序的性能產生很大的影響。優秀的線程模型對一個高性能網絡庫來說是至關重要的。

Netty 采用了 Reactor 線程模型的設計。

什么是 Reactor

Wikipedia 的定義是:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

從上面的定義可以看出有幾個重點:

  1. 事件驅動
  2. 能處理一個或多個輸入源
  3. 多路復用、分發事件給對應的處理器

Reactor 線程模型有幾個角色:

  1. Reactor:負責響應事件,將事件分發給綁定了該事件的 Handler 處理;
  2. Handler:事件處理器,綁定了某類事件,負責對事件進行處理;
  3. Acceptor:Handler 的一種,綁定了連接事件。當客戶端發起連接請求時,Reactor 會將 accept 事件分發給 Acceptor 處理。

簡單來說,其核心原理是 Reactor 負責監聽事件,在監聽到事件之后,分發給相關線程的處理器進行處理。

為什么用 Reactor

我們先來看看傳統阻塞 I/O 模型的缺點

  1. 每個連接都需要獨立線程處理,當並發數大時,創建線程數多,占用資源
  2. 采用阻塞 I/O 模型,連接建立后,若當前線程沒有數據可讀,線程會阻塞在讀操作上,造成資源浪費

針對傳統阻塞 I/O 模型的兩個問題,可以采用如下的方案

  1. 基於池化思想,避免為每個連接創建線程,連接完成后將業務處理交給線程池處理
  2. 基於 I/O 復用模型,多個連接共用同一個阻塞對象,不用等待所有的連接。遍歷到有新數據可以處理時,操作系統會通知程序,線程跳出阻塞狀態,進行業務邏輯處理

Reactor 線程模型的思想就是 線程池I/O復用 的結合。

為了幫助你更好地了解 Netty 線程模型的設計理念,我們將從最基礎的單 Reactor 單線程模型開始介紹,然后逐步增加模型的復雜度,最終到 Netty 目前使用的非常成熟的線程模型設計。

1. 單 Reactor 單線程

Reactor 對象監聽客戶端請求事件,收到事件后通過進行分發。

  • 如果是建立連接事件,則由 Acceptor 通過 Accept 處理連接請求,然后創建一個 Handler 對象處理連接建立之后的業務請求。
  • 如果是讀寫事件,則 Reactor 會將事件分發對應的 Handler 來處理,由單線程調用 Handler 對象來完成讀取數據、業務處理、發送響應的完整流程。

具體情況如下圖所示:
單 Reactor 單線程

單 Reactor 單線程的優點就是:線程模型簡單,沒有引入多線程,自然也就沒有多線程並發和競爭的問題。

但其缺點也非常明顯,那就是性能瓶頸問題,一個線程只能跑在一個 CPU 上,能處理的連接數是有限的,無法完全發揮多核 CPU 的優勢。一旦某個業務邏輯耗時較長,這唯一的線程就會卡在上面,無法處理其他連接的請求,程序進入假死的狀態,可用性也就降低了。正是由於這種限制,一般只會在客戶端使用這種線程模型。

2. 單 Reactor 多線程

其流程跟 "單 Reactor 單線程" 的流程差不多,也是 Acceptor 處理連接事件,Handler 處理讀寫事件。
唯一的區別就是:Handler 處理請求的時候,使用的是 線程池 來處理。
單 Reactor 多線程

很明顯,單 Reactor 多線程的模型可以充分利用多核 CPU 的處理能力,提高整個系統的吞吐量,但引入多線程模型就要考慮線程並發、數據共享等問題。
在這個模型中,只有一個線程來處理 Reactor 監聽到的所有 I/O 事件,其中就包括連接建立事件以及讀寫事件,當連接數不斷增大的時候,這個唯一的 Reactor 線程也會遇到瓶頸。

3. 主從 Reactor 多線程

為了解決單 Reactor 多線程模型中的問題,我們可以引入多個 Reactor。

  • Reactor 主線程接收建立連接事件,然后給 Acceptor 處理。網絡連接建立之后,主 Reactor 會將連接分配給 子 Reactor 進行后續監聽。
  • 子 Reactor 分配到連接之后,負責監聽該連接上的讀寫事件。讀寫事件到來時分發給 Worker 線程池的 Handler 處理。
    主從 Reactor 多線程

4. Netty 線程模型

Netty 同時支持上述幾種線程模式,Netty 針對服務器端的設計是在主從 Reactor 多線程模型的基礎上進行的修改,如下圖所示:
Netty 線程模型
Netty 抽象出兩組線程池:BossGroup 專門用於接收客戶端的連接,WorkerGroup 專門用於網絡的讀寫。

BossGroup 里的線程 會監聽連接事件,與客戶端建立網絡連接后,生成相應的 NioSocketChannel 對象,表示一條網絡連接。之后會將 NioSocketChannel 注冊到 WorkerGroup 中某個線程上。

WorkerGroup 里的線程會監聽對應連接上的讀寫事件,當監聽到讀寫事件的時候,會通過 Pipeline 添加的多個處理器進行處理,每個處理器中都可以包含一定的邏輯,例如編解碼、心跳、業務邏輯等。

Netty 的核心組件

介紹完 Netty 優秀的線程模型設計,接下來開始介紹 Netty 的核心組件。

1. EventLoopGroup / EventLoop

在前面介紹 Netty 線程模型的時候,提到 BossGroup 和 WorkerGroup,他們就是 EventLoopGroup,一個 EventLoopGroup 當中會包含一個或多個 EventLoop,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規則獲取其中一個 EventLoop 來處理任務。EventLoop 從表面上看是一個不斷循環的線程

EventLoop 最常用的實現類是:NioEventLoop,一個 NioEventLoop 包含了一個 Selector 對象, 可以支持多個 Channel 注冊在其上,該 NioEventLoop 可以同時服務多個 Channel,每個 Channel 只能與一個 NioEventLoop 綁定,這樣就實現了線程與 Channel 之間的關聯。

EventLoop 並不是一個純粹的 I/O 線程,它除了負責 I/O 的讀寫之外,還兼顧處理以下兩類任務:

  • 系統任務:通過調用 NioEventLoop 的 execute(Runnable task) 方法實現,Netty 有很多系統任務,當 I/O 線程和用戶線程同時操作網絡資源時,為了防止並發操作導致的鎖競爭,將用戶線程的操作封裝成任務放入消息隊列中,由 I/O 線程負責執行,這樣就實現了局部無鎖化。
  • 定時任務:通過調用 NioEventLoop 的 schedule(Runnable command, long delay, TimeUnit unit) 方法實現。

2. Channel

Channel 是 Netty 對網絡連接的抽象,核心功能是執行網絡 I/O 操作,是服務端和客戶端進行 I/O 數據交互的媒介。

工作流程:

  1. 當客戶端連接成功,將新建一個 Channel 於該客戶端進行綁定
  2. Channel 從 EventLoopGroup 獲得一個 EventLoop,並注冊到該 EventLoop,Channel 生命周期內都和該 EventLoop 在一起(注冊時獲得selectionKey)
  3. Channel 與客戶端進行網絡連接、關閉和讀寫,生成相對應的 event(改變selectinKey信息),觸發 EventLoop 調度線程進行執行
  4. 如果是讀事件,執行線程調度 Pipeline 來處理邏輯

3. Pipeline

上面介紹 Channel 的時候提到,如果是讀事件,則通過 Pipeline 來處理。一個 Channel 對應一個 Pipeline,一個 Pipeline 由多個 Handler 串成一個有序的鏈表,一個 Handler 處理完,調用 next 獲得下一個 Handler 進行處理。Pipeline
上圖黃色部分即為 Handler,一個 Handler 可以是 Inbound、OutBound。處理入站事件時,Handler 按照正向順序執行。處理出站事件時,Handler 按照反向順序執行。

常規 Pipeline 的 Handler 注冊代碼如下:

new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline p = ch.pipeline();
        // 30 秒之內沒有收到客戶端請求的話就關閉連接
        p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
        // 編解碼器
        p.addLast(new NettyEncoder());
        p.addLast(new NettyDecoder());
        // RPC 消息處理器
        p.addLast(serviceHandlerGroup, new NettyServerHandler());
    }
}

4. ByteBuf

在進行跨進程遠程交互的時候,我們需要以字節的形式發送和接收數據,發送端和接收端都需要一個高效的數據容器來緩存字節數據,ByteBuf 就扮演了這樣一個數據容器的角色。

ByteBuf 類似於一個字節數組,其中維護了一個讀索引(readerIndex)和一個寫索引(writerIndex),分別用來控制對 ByteBuf 中數據的讀寫操作。還有一個capacity 用來記錄緩沖區的總長度,當寫數據超過 capacity 時,ByteBuf 會自動擴容,直到 capacity 達到 maxCapacity

ByteBuf 的結構如下:
ButeBuf 結構
Netty 中主要分為以下三大類 ByteBuf:

  • Heap Buffer(堆緩沖區):這是最常用的一種 ByteBuf,它將數據存儲在 JVM 的堆空間,其底層實現是在 JVM 堆內分配一個數組,實現數據的存儲。堆緩沖區可以快速分配,當不使用時也可以由 GC 輕松釋放。
    創建代碼:Unpooled.buffer() 或者 ctx.alloc().buffer()
  • Direct Buffer(直接緩沖區):直接緩沖區會使用堆外內存存儲數據,不會占用 JVM 堆的空間,使用時應該考慮應用程序要使用的最大內存容量以及如何及時釋放。直接緩沖區在使用 Socket 傳遞數據時性能很好,當然,它也是有缺點的,因為沒有了 JVM GC 的管理,在分配內存空間和釋放內存時,比堆緩沖區更復雜,Netty 主要使用內存池來解決這樣的問題。
    創建代碼:Unpooled.directBuffer() 或者 ctx.alloc().directBuffer()
  • Composite Buffer(復合緩沖區):我們可以創建多個不同的 ByteBuf,然后提供一個這些 ByteBuf 組合的視圖,也就是 CompositeByteBuf。它就像一個列表,可以動態添加和刪除其中的 ByteBuf。
    例如:一條消息由 header 和 body 兩部分組成,將 header 和 body 組裝成一條消息發送出去,可能 body 相同,只是 header 不同,使用CompositeByteBuf 就不用每次都重新分配一個新的緩沖區。
    創建代碼:Unpooled.compositeBuffer() 或者 ctx.alloc().compositeBuffer()

Netty 使用 ByteBuf 對象作為數據容器,進行 I/O 讀寫操作,其實 Netty 的內存管理也是圍繞着 ByteBuf 對象高效地分配和釋放。從內存管理角度來看,ByteBuf 可分為 Unpooled 和 Pooled 兩類。

  • Unpooled:是指非池化的內存管理方式。每次分配時直接調用系統 API 向操作系統申請 ByteBuf,在使用完成之后,通過系統調用進行釋放。Unpooled 將內存管理完全交給系統,不做任何特殊處理,使用起來比較方便,對於申請和釋放操作不頻繁、操作成本比較低的 ByteBuf 來說,是比較好的選擇。
    使用示例:Unpooled.buffer()
  • Pooled:是指池化的內存管理方式。該方式會預先申請一塊大內存形成內存池,在需要申請 ByteBuf 空間的時候,會將內存池中一部分合理的空間封裝成 ByteBuf 給服務使用,使用完成后回收到內存池中。前面提到 DirectByteBuf 底層使用的堆外內存管理比較復雜,池化技術很好地解決了這一問題。
    池化分配器 ByteBufAllocator 需要從 ChannelHandlerContext 中獲取實例:ByteBufAllocator byteBufAllocator = ctx.alloc(),然后再生成 ByteBuf 對象:byteBufAllocator.buffer()`

最后,我們來總結一下 ByteBuf 的優點:

  • 通過內置的復合緩沖區類型實現了透明的零拷貝。
  • 容量可以按需增長。
  • 在讀和寫這兩種模式之間切換不需要調用 ByteBuffer 的 flip()方法。
  • 讀和寫使用了不同的索引。
  • 支持引用計數。
  • 支持池化。

總結

上面我們介紹了 Netty 優秀的線程模型和核心組件,Netty 優秀的設計還有很多,感興趣的讀者可以再去深入了解,以上介紹的已經夠寫一個 RPC 框架了。
接下來,我們就要講到網絡通信的核心實現了,敬請期待!

ccx-rpc 代碼已經開源
Github:https://github.com/chenchuxin/ccx-rpc
Gitee:https://gitee.com/imccx/ccx-rpc


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM