Netty實戰入門詳解——讓你徹底記住什么是Netty(看不懂你來找我)


 

一、Netty 簡介

Netty 是基於 Java NIO 的異步事件驅動的網絡應用框架,使用 Netty 可以快速開發網絡應用,Netty 提供了高層次的抽象來簡化 TCP 和 UDP 服務器的編程,但是你仍然可以使用底層的 API。

Netty 的內部實現是很復雜的,但是 Netty 提供了簡單易用的API從網絡處理代碼中解耦業務邏輯。Netty 是完全基於 NIO 實現的,所以整個 Netty 都是異步的。

Netty 是最流行的 NIO 框架,它已經得到成百上千的商業、商用項目驗證,許多框架和開源組件的底層 rpc 都是使用的 Netty,如 Dubbo、Elasticsearch 等等。下面是官網給出的一些 Netty 的特性:

設計方面

  • 對各種傳輸協議提供統一的 API(使用阻塞和非阻塞套接字時候使用的是同一個 API,只是需要設置的參數不一樣)。
  • 基於一個靈活、可擴展的事件模型來實現關注點清晰分離。
  • 高度可定制的線程模型——單線程、一個或多個線程池。
  • 真正的無數據報套接字(UDP)的支持(since 3.1)。

易用性

  • 完善的 Javadoc 文檔和示例代碼。
  • 不需要額外的依賴,JDK 5 (Netty 3.x) 或者 JDK 6 (Netty 4.x) 已經足夠。

性能

  • 更好的吞吐量,更低的等待延遲。
  • 更少的資源消耗。
  • 最小化不必要的內存拷貝。

安全性

  • 完整的 SSL/TLS 和 StartTLS 支持

對於初學者,上面的特性我們在腦中有個簡單了解和印象即可, 下面開始我們的實戰部分。

二、一個簡單 Http 服務器

開始前說明下我這里使用的開發環境是 IDEA+Gradle+Netty4,當然你使用 Eclipse 和 Maven 都是可以的,然后在 Gradle 的 build 文件中添加依賴 compile 'io.netty:netty-all:4.1.26.Final',這樣就可以編寫我們的 Netty 程序了,正如在前面介紹 Netty 特性中提到的,Netty 不需要額外的依賴。

第一個示例我們使用 Netty 編寫一個 Http 服務器的程序,啟動服務我們在瀏覽器輸入網址來訪問我們的服務,便會得到服務端的響應。功能很簡單,下面我們看看具體怎么做?

首先編寫服務啟動類

public class HttpServer {
    public static void main(String[] args) {
        //構造兩個線程組
        EventLoopGroup bossrGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //服務端啟動輔助類
            ServerBootstrap bootstrap = new ServerBootstrap();
 
            bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new HttpServerInitializer());
 
            ChannelFuture future = bootstrap.bind(8080).sync();
            //等待服務端口關閉
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 優雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

 

在編寫 Netty 程序時,一開始都會生成 NioEventLoopGroup 的兩個實例,分別是 bossGroup 和 workerGroup,也可以稱為 parentGroup 和 childGroup,為什么創建這兩個實例,作用是什么?可以這么理解,bossGroup 和 workerGroup 是兩個線程池, 它們默認線程數為 CPU 核心數乘以 2,bossGroup 用於接收客戶端傳過來的請求,接收到請求后將后續操作交由 workerGroup 處理。

在這里我向大家推薦一個架構學習交流群。交流學習群號:747981058 里面會分享一些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備的知識體系。

接下來我們生成了一個服務啟動輔助類的實例 bootstrap,boostrap 用來為 Netty 程序的啟動組裝配置一些必須要組件,例如上面的創建的兩個線程組。channel 方法用於指定服務器端監聽套接字通道 NioServerSocketChannel,其內部管理了一個 Java NIO 中的ServerSocketChannel實例。

channelHandler 方法用於設置業務職責鏈,責任鏈是我們下面要編寫的,責任鏈具體是什么,它其實就是由一個個的 ChannelHandler 串聯而成,形成的鏈式結構。正是這一個個的 ChannelHandler 幫我們完成了要處理的事情。

接着我們調用了 bootstrap 的 bind 方法將服務綁定到 8080 端口上,bind 方法內部會執行端口綁定等一系列操,使得前面的配置都各就各位各司其職,sync 方法用於阻塞當前 Thread,一直到端口綁定操作完成。接下來一句是應用程序將會阻塞等待直到服務器的 Channel 關閉。

啟動類的編寫大體就是這樣了,下面要編寫的就是上面提到的責任鏈了。如何構建一個鏈,在 Netty 中很簡單,不需要我們做太多,代碼如下:

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel sc) throws Exception {
        ChannelPipeline pipeline = sc.pipeline();
        //處理http消息的編解碼
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        //添加自定義的ChannelHandler
        pipeline.addLast("httpServerHandler", new HttpServerHandler());
    }
}

 

我們自定義一個類 HttpServerInitializer 繼承 ChannelInitializer 並實現其中的 initChannel方法。

ChannelInitializer 繼承 ChannelInboundHandlerAdapter,用於初始化 Channel 的 ChannelPipeline。通過 initChannel 方法參數 sc 得到 ChannelPipeline 的一個實例。

當一個新的連接被接受時, 一個新的 Channel 將被創建,同時它會被自動地分配到它專屬的 ChannelPipeline。

ChannelPipeline 提供了 ChannelHandler 鏈的容器,推薦讀者仔細自己看看 ChannelPipeline 的 Javadoc,文章后面也會繼續說明 ChannelPipeline 的內容。

Netty 是一個高性能網絡通信框架,同時它也是比較底層的框架,想要 Netty 支持 Http(超文本傳輸協議),必須要給它提供相應的編解碼器。

所以我們這里使用 Netty 自帶的 Http 編解碼組件 HttpServerCodec 對通信數據進行編解碼,HttpServerCodec 是 HttpRequestDecoder 和 HttpResponseEncoder 的組合,因為在處理 Http 請求時這兩個類是經常使用的,所以 Netty 直接將他們合並在一起更加方便使用。所以對於上面的代碼:

pipeline.addLast("httpServerCodec", new HttpServerCodec())

 

我們替換成如下兩行也是可以的。

pipeline.addLast("httpResponseEndcoder", new HttpResponseEncoder());
pipeline.addLast("HttpRequestDecoder", new HttpRequestDecoder());

 

通過 addLast 方法將一個一個的 ChannelHandler 添加到責任鏈上並給它們取個名稱(不取也可以,Netty 會給它個默認名稱),這樣就形成了鏈式結構。在請求進來或者響應出去時都會經過鏈上這些 ChannelHandler 的處理。

最后再向鏈上加入我們自定義的 ChannelHandler 組件,處理自定義的業務邏輯。下面就是我們自定義的 ChannelHandler。

public class HttpServerChannelHandler0 extends SimpleChannelInboundHandler<HttpObject> {
    private HttpRequest request;
 
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        if (msg instanceof HttpRequest) {
            request = (HttpRequest) msg;
            request.method();
            String uri = request.uri();
            System.out.println("Uri:" + uri);
        }
        if (msg instanceof HttpContent) {
 
            HttpContent content = (HttpContent) msg;
            ByteBuf buf = content.content();
            System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
 
            ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
            response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
 
            ctx.writeAndFlush(response);
 
        }
    }
}

 

至此一個簡單的 Http 服務器就完成了。首先我們來看看效果怎樣,我們運行 HttpServer 中的 main 方法。讓后使用 Postman 這個工具來測試下,使用 post 請求方式(也可以 get,但沒有請求體),並一個 json 格式數據作為請求體發送給服務端,服務端返回給我們一個hello world字符串。

 

服務端控制台打印如下:

 

對於自定義的 ChannelHandler, 一般會繼承 Netty 提供的SimpleChannelInboundHandler類,並且對於 Http 請求我們可以給它設置泛型參數為 HttpOjbect 類,然后覆寫 channelRead0 方法,在 channelRead0 方法中編寫我們的業務邏輯代碼,此方法會在接收到服務器數據后被系統調用。

Netty 的設計中把 Http 請求分為了 HttpRequest 和 HttpContent 兩個部分,HttpRequest 主要包含請求頭、請求方法等信息,HttpContent 主要包含請求體的信息。

所以上面的代碼我們分兩塊來處理。在 HttpContent 部分,首先輸出客戶端傳過來的字符,然后通過 Unpooled 提供的靜態輔助方法來創建未池化的 ByteBuf 實例, Java NIO 提供了 ByteBuffer 作為它的字節容器,Netty 的 ByteBuffer 替代品是 ByteBuf。

接着構建一個 FullHttpResponse 的實例,並為它設置一些響應參數,最后通過 writeAndFlush 方法將它寫回給客戶端。

上面這樣獲取請求和消息體則相當不方便,Netty 又提供了另一個類 FullHttpRequest,FullHttpRequest 包含請求的所有信息,它是一個接口,直接或者間接繼承了 HttpRequest 和 HttpContent,它的實現類是 DefalutFullHttpRequest。

因此我們可以修改自定義的 ChannelHandler 如下:

public class HttpServerChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
 
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
 
        ctx.channel().remoteAddress();
 
        FullHttpRequest request = msg;
 
        System.out.println("請求方法名稱:" + request.method().name());
 
        System.out.println("uri:" + request.uri());
        ByteBuf buf = request.content();
        System.out.print(buf.toString(CharsetUtil.UTF_8));
 
 
        ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", CharsetUtil.UTF_8);
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
        response.headers().add(HttpHeaderNames.CONTENT_TYPE, "text/plain");
        response.headers().add(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
 
        ctx.writeAndFlush(response);
    }
}

 

這樣修改就可以了嗎,如果你去啟動程序運行看看,是會拋異常的。前面說過 Netty 是一個很底層的框架,對於將請求合並為一個 FullRequest 是需要代碼實現的,然而這里我們並不需要我們自己動手去實現,Netty 為我們提供了一個 HttpObjectAggregator 類,這個 ChannelHandler作用就是將請求轉換為單一的 FullHttpReques。

所以在我們的 ChannelPipeline 中添加一個 HttpObjectAggregator 的實例即可。

public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel sc) {
        ChannelPipeline pipeline = sc.pipeline();
        //處理http消息的編解碼
        pipeline.addLast("httpServerCodec", new HttpServerCodec());
        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
        //添加自定義的ChannelHandler
        pipeline.addLast("httpServerHandler", new HttpServerChannelHandler0());
    }
}

 

啟動程序運行,一切都順暢了,好了,這個簡單 Http 的例子就 OK 了。

在這里我向大家推薦一個架構學習交流群。交流學習群號:747981058 里面會分享一些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備的知識體系。

三、編寫 Netty 客戶端

上面的兩個示例中我們都是以 Netty 做為服務端,接下來看看如何編寫 Netty 客戶端,以第一個 Http 服務的例子為基礎,編寫一個訪問 Http 服務的客戶端。

public class HttpClient {

   public static void main(String[] args) throws Exception {
       String host = "127.0.0.1";
       int port = 8080;

       EventLoopGroup group = new NioEventLoopGroup();

       try {
           Bootstrap b = new Bootstrap();
           b.group(group)
           .channel(NioSocketChannel.class)
           .handler(new ChannelInitializer<SocketChannel>() {
               @Override
               public void initChannel(SocketChannel ch) throws Exception {
                   ChannelPipeline pipeline = ch.pipeline();
                   pipeline.addLast(new HttpClientCodec());
                   pipeline.addLast(new HttpObjectAggregator(65536));
                   pipeline.addLast(new HttpClientHandler());
               }
           });

           // 啟動客戶端.
           ChannelFuture f = b.connect(host, port).sync();
           f.channel().closeFuture().sync();

       } finally {
           group.shutdownGracefully();
       }
   }
}

 

客戶端啟動類編寫基本和服務端類似,在客戶端我們只用到了一個線程池,服務端使用了兩個,因為服務端要處理 n 條連接,而客戶端相對來說只處理一條,因此一個線程池足以。

然后服務端啟動輔助類使用的是 ServerBootstrap,而客戶端換成了 Bootstrap。通過 Bootstrap 組織一些必要的組件,為了方便,在 handler 方法中我們使用匿名內部類的方式來構建 ChannelPipeline 鏈容器。最后通過 connect 方法連接服務端。

接着編寫 HttpClientHandler 類。

public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        URI uri = new URI("http://127.0.0.1:8080");
        String msg = "Are you ok?";
        FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));
 
        // 構建http請求
//        request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
//        request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
        // 發送http請求
        ctx.channel().writeAndFlush(request);
    }
 
    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
 
        FullHttpResponse response = msg;
        response.headers().get(HttpHeaderNames.CONTENT_TYPE);
        ByteBuf buf = response.content();
        System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
 
    }
}

 

在 HttpClientHandler 類中,我們覆寫了 channelActive 方法,當連接建立時,此方法會被調用,我們在方法中構建了一個 FullHttpRequest 對象,並且通過 writeAndFlush 方法將請求發送出去。

channelRead0 方法用於處理服務端返回給我們的響應,打印服務端返回給客戶端的信息。至此,Netty 客戶端的編寫就完成了,我們先開啟服務端,然后開啟客戶端就可以看到效果了。

希望通過前面介紹的幾個例子能讓大家基本知道如何編寫 Netty 客戶端和服務端,下面我們來說說 Netty 程序為什么是這樣編寫的,這也是 Netty 中最為重要的一部分知識,可以讓你在編寫 netty 程序時做到心中有數。

在這里我向大家推薦一個架構學習交流群。交流學習群號:747981058 里面會分享一些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備的知識體系。

四、Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext 之間的關系

在編寫 Netty 程序時,經常跟我們打交道的是上面這幾個對象,這也是 Netty 中幾個重要的對象,下面我們來看看它們之間有什么樣的關系。

Netty 中的 Channel 是框架自己定義的一個通道接口,Netty 實現的客戶端 NIO 套接字通道是 NioSocketChannel,提供的服務器端 NIO 套接字通道是 NioServerSocketChannel。

當服務端和客戶端建立一個新的連接時, 一個新的 Channel 將被創建,同時它會被自動地分配到它專屬的 ChannelPipeline。

ChannelPipeline 是一個攔截流經 Channel 的入站和出站事件的 ChannelHandler 實例鏈,並定義了用於在該鏈上傳播入站和出站事件流的 API。那么就很容易看出這些 ChannelHandler 之間的交互是組成一個應用程序數據和事件處理邏輯的核心。

 

上圖描述了 IO 事件如何被一個 ChannelPipeline 的 ChannelHandler 處理的。

ChannelHandler分為 ChannelInBoundHandler 和 ChannelOutboundHandler 兩種,如果一個入站 IO 事件被觸發,這個事件會從第一個開始依次通過 ChannelPipeline中的 ChannelInBoundHandler,先添加的先執行。

若是一個出站 I/O 事件,則會從最后一個開始依次通過 ChannelPipeline 中的 ChannelOutboundHandler,后添加的先執行,然后通過調用在 ChannelHandlerContext 中定義的事件傳播方法傳遞給最近的 ChannelHandler。

在 ChannelPipeline 傳播事件時,它會測試 ChannelPipeline 中的下一個 ChannelHandler 的類型是否和事件的運動方向相匹配。

如果某個ChannelHandler不能處理則會跳過,並將事件傳遞到下一個ChannelHandler,直到它找到和該事件所期望的方向相匹配的為止。

假設我們創建下面這樣一個 pipeline:

ChannelPipeline p = ...;
p.addLast("1", new InboundHandlerA());
p.addLast("2", new InboundHandlerB());
p.addLast("3", new OutboundHandlerA());
p.addLast("4", new OutboundHandlerB());
p.addLast("5", new InboundOutboundHandlerX());
 

 

在上面示例代碼中,inbound 開頭的 handler 意味着它是一個ChannelInBoundHandler。outbound 開頭的 handler 意味着它是一個 ChannelOutboundHandler。

當一個事件進入 inbound 時 handler 的順序是 1,2,3,4,5;當一個事件進入 outbound 時,handler 的順序是 5,4,3,2,1。在這個最高准則下,ChannelPipeline 跳過特定 ChannelHandler 的處理:

  • 3,4 沒有實現 ChannelInboundHandler,因而一個 inbound 事件的處理順序是 1,2,5。
  • 1,2 沒有實現 ChannelOutBoundhandler,因而一個 outbound 事件的處理順序是 5,4,3。
  • 5 同時實現了 ChannelInboundHandler 和 channelOutBoundHandler,所以它同時可以處理 inbound 和 outbound 事件。

ChannelHandler 可以通過添加、刪除或者替換其他的 ChannelHandler 來實時地修改 ChannelPipeline 的布局。

(它也可以將它自己從 ChannelPipeline 中移除。)這是 ChannelHandler 最重要的能力之一。

ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之間的關聯,每當有 ChannelHandler 添加到 ChannelPipeline 中時,都會創建 ChannelHandlerContext。

ChannelHandlerContext 的主要功能是管理它所關聯的 ChannelHandler 和在同一個 ChannelPipeline 中的其他 ChannelHandler 之間的交互。事件從一個 ChannelHandler 到下一個 ChannelHandler 的移動是由 ChannelHandlerContext 上的調用完成的。

 

但是有些時候不希望總是從 ChannelPipeline 的第一個 ChannelHandler 開始事件,我們希望從一個特定的 ChannelHandler 開始處理。

你必須引用於此 ChannelHandler 的前一個 ChannelHandler 關聯的 ChannelHandlerContext,利用它調用與自身關聯的 ChannelHandler 的下一個 ChannelHandler。

如下:

ChannelHandlerContext ctx = ...;   // 獲得 ChannelHandlerContext引用
// write()將會把緩沖區發送到下一個ChannelHandler  
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
 
//流經整個pipeline
ctx.channel().write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));

 

如果我們想有一些事件流全部通過 ChannelPipeline,有兩個不同的方法可以做到:

  • 調用 Channel 的方法
  • 調用 ChannelPipeline 的方法
    這兩個方法都可以讓事件流全部通過 ChannelPipeline,無論從頭部還是尾部開始,因為它主要依賴於事件的性質。如果是一個 “ 入站 ” 事件,它開始於頭部;若是一個 “ 出站 ” 事件,則開始於尾部。

那為什么你可能會需要在 ChannelPipeline 某個特定的位置開始傳遞事件呢?

  • 減少因為讓事件穿過那些對它不感興趣的 ChannelHandler 而帶來的開銷
  • 避免事件被那些可能對它感興趣的 ChannlHandler 處理

五、Netty 線程模型

在這里我向大家推薦一個架構學習交流群。交流學習群號:747981058 里面會分享一些資深架構師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高並發、高性能、分布式、微服務架構的原理,JVM性能優化、分布式架構等這些成為架構師必備的知識體系。

在前面的示例中我們程序一開始都會生成兩個 NioEventLoopGroup 的實例,為什么需要這兩個實例呢?這兩個實例可以說是 Netty 程序的源頭,其背后是由 Netty 線程模型決定的。

Netty 線程模型是典型的 Reactor 模型結構,其中常用的 Reactor 線程模型有三種,分別為:Reactor 單線程模型、Reactor 多線程模型和主從 Reactor 多線程模型。

而在 Netty 的線程模型並非固定不變,通過在啟動輔助類中創建不同的 EventLoopGroup 實例並通過適當的參數配置,就可以支持上述三種 Reactor 線程模型。

Reactor 線程模型

Reactor 單線程模型

Reactor 單線程模型指的是所有的 IO 操作都在同一個 NIO 線程上面完成。作為 NIO 服務端接收客戶端的 TCP 連接,作為 NIO 客戶端向服務端發起 TCP 連接,讀取通信對端的請求或向通信對端發送消息請求或者應答消息。

由於 Reactor 模式使用的是異步非阻塞 IO,所有的 IO 操作都不會導致阻塞,理論上一個線程可以獨立處理所有 IO 相關的操作。

 

Netty 使用單線程模型的的方式如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
...

 

在實例化 NioEventLoopGroup 時,構造器參數是 1,表示 NioEventLoopGroup 的線程池大小是 1。然后接着我們調用 b.group(bossGroup) 設置了服務器端的 EventLoopGroup,因此 bossGroup和 workerGroup 就是同一個 NioEventLoopGroup 了。

Reactor 多線程模型

對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,需要對該模型進行改進,演進為 Reactor 多線程模型。

Rector 多線程模型與單線程模型最大的區別就是有一組 NIO 線程處理 IO 操作。

在該模型中有專門一個 NIO 線程 -Acceptor 線程用於監聽服務端,接收客戶端的 TCP 連接請求;而 1 個 NIO 線程可以同時處理N條鏈路,但是 1 個鏈路只對應 1 個 NIO 線程,防止發生並發操作問題。

網絡 IO 操作-讀、寫等由一個 NIO 線程池負責,線程池可以采用標准的 JDK 線程池實現,它包含一個任務隊列和 N 個可用的線程,由這些 NIO 線程負責消息的讀取、解碼、編碼和發送。

 

 

Netty 中實現多線程模型的方式如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

 

bossGroup 中只有一個線程,而 workerGroup 中的線程是 CPU 核心數乘以 2,那么就對應 Recator 的多線程模型。

主從 Reactor 多線程模型

在並發極高的情況單獨一個 Acceptor 線程可能會存在性能不足問題,為了解決性能問題,產生主從 Reactor 多線程模型。

主從 Reactor 線程模型的特點是:服務端用於接收客戶端連接的不再是 1 個單獨的 NIO 線程,而是一個獨立的 NIO 線程池。

Acceptor 接收到客戶端 TCP 連接請求處理完成后,將新創建的 SocketChannel 注冊到 IO 線程池(sub reactor 線程池)的某個 IO 線程上,由它負責 SocketChannel 的讀寫和編解碼工作。

Acceptor 線程池僅僅只用於客戶端的登陸、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端 subReactor 線程池的 IO 線程上,由 IO 線程負責后續的 IO 操作。

 

根據前面所講的兩個線程模型,很容想到 Netty 實現多線程的方式如下:

EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 ...

 

但是,在 Netty 的服務器端的 acceptor 階段,沒有使用到多線程, 因此上面的主從多線程模型在 Netty 的實現是有誤的。

服務器端的 ServerSocketChannel 只綁定到了 bossGroup 中的一個線程,因此在調用 Java NIO 的 Selector.select 處理客戶端的連接請求時,實際上是在一個線程中的,所以對只有一個服務的應用來說,bossGroup 設置多個線程是沒有什么作用的,反而還會造成資源浪費。

至於 Netty 中的 bossGroup 為什么使用線程池,我在 stackoverflow 找到一個對於此問題的討論 。

the creator of Netty says multiple boss threads are useful if we share NioEventLoopGroup between different server bootstraps

EventLoopGroup 和 EventLoop

當系統在運行過程中,如果頻繁的進行線程上下文切換,會帶來額外的性能損耗。多線程並發執行某個業務流程,業務開發者還需要時刻對線程安全保持警惕,哪些數據可能會被並發修改,如何保護?這不僅降低了開發效率,也會帶來額外的性能損耗。

為了解決上述問題,Netty采用了串行化設計理念,從消息的讀取、編碼以及后續 ChannelHandler 的執行,始終都由 IO 線程 EventLoop 負責,這就意外着整個流程不會進行線程上下文的切換,數據也不會面臨被並發修改的風險。

EventLoopGroup 是一組 EventLoop 的抽象,一個 EventLoopGroup 當中會包含一個或多個 EventLoop,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規則獲取其中一個 EventLoop 來處理任務。

在 Netty 服務器端編程中我們需要 BossEventLoopGroup 和 WorkerEventLoopGroup 兩個 EventLoopGroup 來進行工作。

BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護着一個注冊了 ServerSocketChannel 的 Selector 實例,EventLoop 的實現涵蓋 IO 事件的分離,和分發(Dispatcher),EventLoop 的實現充當 Reactor 模式中的分發(Dispatcher)的角色。

所以通常可以將 BossEventLoopGroup 的線程數參數為 1。

BossEventLoop 只負責處理連接,故開銷非常小,連接到來,馬上按照策略將 SocketChannel 轉發給 WorkerEventLoopGroup,WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop 來將這 個SocketChannel 注冊到其維護的 Selector 並對其后續的 IO 事件進行處理。

ChannelPipeline 中的每一個 ChannelHandler 都是通過它的 EventLoop(I/O 線程)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個線程,因為這會對整體的 I/O 處理產生嚴重的負面影響。但有時可能需要與那些使用阻塞 API 的遺留代碼進行交互。

對於這種情況, ChannelPipeline 有一些接受一個 EventExecutorGroup 的 add() 方法。如果一個事件被傳遞給一個自定義的 EventExecutorGroup, DefaultEventExecutorGroup 的默認實現。

就是在把 ChannelHanders 添加到 ChannelPipeline 的時候,指定一個 EventExecutorGroup,ChannelHandler 中所有的方法都將會在這個指定的 EventExecutorGroup 中運行。

static final EventExecutor group = new DefaultEventExecutorGroup(16);
...
ChannelPipeline p = ch.pipeline();
pipeline.addLast(group, "handler", new MyChannelHandler());
 

 

最后小結一下:(如果你還沒明白,可以看一下群里面的視頻解析)
  • NioEventLoopGroup 實際上就是個線程池,一個 EventLoopGroup 包含一個或者多個 EventLoop;
  • 一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;
  • 所有有 EnventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理;
  • 一個 Channel 在它的生命周期內只注冊於一個 EventLoop;
  • 每一個 EventLoop 負責處理一個或多個 Channel;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM