Netty 源碼分析系列(一)Netty 概述


前言

關於Netty的學習,最近看了不少有關視頻和書籍,也收獲不少,希望把我知道的分享給你們,一起加油,一起成長。前面我們對 Java IOBIONIOAIO進行了分析,相關文章鏈接如下:

深入分析 Java IO (一)概述

深入分析 Java IO (二)BIO

深入分析 Java IO (三)NIO

深入分析 Java IO (四)AIO

本篇文章我們就開始對 Netty來進行深入分析,首先我們來了解一下 JAVA NIOAIO的不足之處。

Java原生API之痛

雖然JAVA NIOJAVA AIO框架提供了多路復用IO/異步IO的支持,但是並沒有提供上層“信息格式”的良好封裝。用這些API實現一款真正的網絡應用則並非易事。

JAVA NIOJAVA AIO並沒有提供斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流等的處理,這些都需要開發者自己來補齊相關的工作。

AIO在實踐中,並沒有比NIO更好。AIO在不同的平台有不同的實現,windows系統下使用的是一種異步IO技術:IOCP;Linux下由於沒有這種異步 IO 技術,所以使用的是epoll 對異步 IO 進行模擬。所以 AIO 在 Linux 下的性能並不理想。AIO 也沒有提供對 UDP 的支持。

綜上,在實際的大型互聯網項目中,Java 原生的 API 應用並不廣泛,取而代之的是一款第三方Java 框架,這就是Netty

Netty的優勢

Netty 提供 異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

非阻塞 I/O

Netty 是基於 Java NIO API 實現的網絡應用框架,使用它可以快速簡單的開發網絡應用程序,如服務器和客戶端程序。Netty 大大簡化了網絡程序開發的過程,如 TCP 和 UDP 的 Socket 服務的開發。

由於是基於 NIO 的 API,因此,Netty 可以提供非阻塞的 I/O操作,極大的提升了性能。同時,Netty 內部封裝了 Java NIO API 的復雜性,並提供了線程池的處理,使得開發 NIO 的應用變得極其簡單。

豐富的協議

Netty 提供了簡單、易用的 API ,但這並不意味着應用程序會有難維護和性能低的問題。Netty 是一個精心設計的框架,它從許多協議的實現中吸收了很多的經驗,如 FTP 、SMTP、 HTTP、許多二進制和基於文本的傳統協議。

Netty 支持豐富的網絡協議,如TCPUDPHTTPHTTP/2WebSocketSSL/TLS等,這些協議實現開箱即用,因此,Netty 開發者能夠在不失靈活的前提下來實現開發的簡易性、高性能和穩定性。

異步和事件驅動

Netty 是異步事件驅動的框架,該框架體現為所有的I/O操作都是異步的,所有的I/O調用會立即返回,並不保證調用成功與否,但是調用會返回ChannelFuture。Netty 會通過 ChannelFuture通知調用是成功了還是失敗了,抑或是取消了。

同時,Netty 是基於事件驅動的,調用者並不能立即獲得結果,而是通過事件監聽機制,用戶可以方便地主動獲取或者通過通知機制獲得I/O操作的結果。

Future對象剛剛創建時,處於非完成狀態,調用者可以通過返回的ChannelFuture來獲取操作執行的狀態,再通過注冊監聽函數來執行完成后的操作,常見有如下操作:

  • 通過isDone方法來判斷當前操作是否完成。
  • 通過isSuccess方法來判斷已完成的當前操作是否成功。
  • 通過getCause方法來獲取已完成的當前操作失敗的原因。
  • 通過isCancelled方法來判斷已完成的當前操作是否被取消。
  • 通過addListener方法來注冊監聽器,當操作已完成(isDone方法返回完成),將會通知指定的監聽器;如果future對象已完成,則理解通知指定的監聽器。

例如:下面的代碼中綁定端口是異步操作,當綁定操作處理完,將會調用相應的監聽器處理邏輯。

serverBootstrap.bind(port).addListener(future -> {
    if(future.isSuccess()){
        System.out.println("端口綁定成功!");
    }else {
        System.out.println("端口綁定失敗!");
    }
});

相比傳統的阻塞 I/O,Netty 異步處理的好處是不會造成線程阻塞,線程在 I/O操作期間可以執行其他的程序,在高並發情形下會更穩定並擁有更高的吞吐量。

精心設計的API

Netty 從開始就為用戶提供了體驗最好的API及實現設計。

例如,在用戶數較小的時候可能會選擇傳統的阻塞API,畢竟與 Java NIO 相比使用阻塞 API 將會更加容易一些。然而,當業務量呈指數增長並且服務器需要同時處理成千上萬的客戶連接,便會遇到問題。這種情況下可能會嘗試使用 Java NIO,但是復雜的 NIO Selector 編程接口又會耗費大量的時間並最終會阻礙快速開發。

Netty 提供了一個叫作 channel的統一的異步I/O編程接口,這個編程接口抽象了所有點對點的通信操作。也就是說,如果應用是基於Netty 的某一種傳輸實現,那么同樣的,應用也可以運行在 Netty 的另一種傳輸實現上。Channel常見的子接口有:

image-20210804105936809

豐富的緩沖實現

Netty 使用自建的緩存 API,而不是使用 Java NIO 的 ByteBuffer 來表示一個連續的字節序列。與 ByteBuffer 相比,這種方式擁有明顯的優勢。

Netty 使用新的緩沖類型 ByteBuf ,並且被設計為可從底層解決 ByteBuffer 問題,同時還滿足日常網絡應用開發需要的緩沖類型。

Netty 重要有以下特性:

  • 允許使用自定義的緩沖類型。
  • 復合緩沖類型中內置透明的零拷貝實現。
  • 開箱即用動態緩沖類型,具有像 StringBuffer 一樣的動態緩沖能力。
  • 不再需要調用flip()方法。
  • 正常情況下具有比ByteBuffer更快的響應速度。

高效的網絡傳輸

Java 原生的序列化主要存在以下幾個弊端:

  • 無法跨語言。

  • 序列化后碼流太大。

  • 序列化后性能太低。

業界有非常多的框架用於解決上述問題,如 Google ProtobufJBoss MarshallingFacebook Thrift等。針對這些框架,Netty 都提供了相應的包將這些框架集成到應用中。同時,Netty 本身也提供了眾多的編解碼工具,方便開發者使用。開發者可以基於 Netty 來開發高效的網絡傳輸應用,例如:高性能的消息中間件 Apache RocketMQ、高性能RPC框架Apache Dubbo等。

Netty 核心概念

Netty功能特性圖

從上述的架構圖可以看出,Netty 主要由三大塊組成:

  • 核心組件
  • 傳輸服務
  • 協議

核心組件

核心組件包括:事件模型、字節緩沖區和通信API

事件模型

Netty 是基於異步事件驅動的,該框架體現為所有的I/O操作都是異步的,調用者並不能立即獲得結果,而是通過事件監聽機制,用戶可以方便地主動獲取或者通過通知機制獲得I/O操作的結果。

Netty 將所有的事件按照它們與入站或出站數據流的相關性進行了分類。

可能由入站數據或者相關的狀態更改而觸發的事件包括以下幾項:

  • 連接已被激活或者連接失活。
  • 數據讀取。
  • 用戶事件。
  • 錯誤事件。

出站事件是未來將會觸發的某個動作的操作結果,包括以下動作:

  • 打開或者關閉到遠程節點的連接。
  • 將數據寫到或者沖刷到套接字。

每個事件都可以被分發到ChannelHandler類中的某個用戶實現的方法。

字節緩沖區

Netty 使用了區別於Java ByteBuffer 的新的緩沖類型ByteBuf,ByteBuf提供了豐富的特性。

通信API

Netty 的通信API都被抽象到Channel里,以統一的異步I/O編程接口來滿足所有點對點的通信操作。

傳輸服務

Netty 內置了一些開箱即用的傳輸服務。因為並不是它們所有的傳輸都支持每一種協議,所以必須選擇一個和應用程序所使用的協議相兼容的傳輸。以下是Netty提供的所有的傳輸。

NIO

io.netty.channel.socket.nio包用於支持NIO。該包下面的實現是使用java.nio.channels包作為基礎(基於選擇器的方式)。

epoll

io.netty.channel.epoll包用於支持由 JNI 驅動的 epoll 和 非阻塞 IO。

需要注意的是,這個epoll傳輸只能在 Linux 上獲得支持。epoll同時提供多種特性,如:SO_REUSEPORT 等,比 NIO傳輸更快,而且是完全非阻塞的。

OIO

io.netty.channel.socket.oio包用於支持使用java.net包作為基礎的阻塞I/O

本地

io.netty.channel.local包用於支持在 VM 內部通過管道進行通信的本地傳輸。

內嵌

io.netty.channel.embedded包作為內嵌傳輸,允許使用ChannelHandler而又不需要一個真正的基於網絡的傳輸。

協議支持

Netty 支持豐富的網絡協議,如TCPUDPHTTPHTTP/2WebSocketSSL/TLS等,這些協議實現開箱即用,因此,Netty 開發者能夠在不失靈活的前提下來實現開發的簡易性、高性能和穩定性。

Netty簡單應用

引入Maven依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.49.Final</version>
</dependency>

服務端的管道處理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //讀取數據實際(這里我們可以讀取客戶端發送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文對象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客戶端發送的數據 默認Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx =" + ctx);
        Channel channel = ctx.channel();
        //將 msg 轉成一個 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + channel.remoteAddress());
    }


    //數據讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush 是 write + flush
        //將數據寫入到緩存,並刷新
        //一般講,我們對這個發送的數據進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("公司最近賬戶沒啥錢,再等幾天吧!", CharsetUtil.UTF_8));
    }

    //處理異常, 一般是需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

NettyServerHandler繼承自ChannelInboundHandlerAdapter,這個類實現了ChannelInboundHandler接口。ChannelInboundHandler提供了許多事件處理的接口方法。

這里覆蓋了channelRead()事件處理方法。每當從客戶端收到新的數據時,這個方法會在收到消息時被調用。

channelReadComplete()事件處理方法是數據讀取完畢時被調用,通過調用ChannelHandlerContextwriteAndFlush()方法,把消息寫入管道,並最終發送給客戶端。

exceptionCaught()事件處理方法是,當出現Throwable對象時才會被調用。

服務端主程序

public class NettyServer {

    public static void main(String[] args) throws Exception {
        //創建BossGroup 和 WorkerGroup
        //說明
        //1. 創建兩個線程組 bossGroup 和 workerGroup
        //2. bossGroup 只是處理連接請求 , 真正的和客戶端業務處理,會交給 workerGroup完成
        //3. 兩個都是無限循環
        //4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數
        //   默認實際 cpu核數 * 2
        //
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
        try {
            //創建服務器端的啟動對象,配置參數
            ServerBootstrap bootstrap = new ServerBootstrap();
            //使用鏈式編程來進行設置
            bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class) //bossGroup使用NioSocketChannel 作為服務器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數 option主要是針對boss線程組,
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態 child主要是針對worker線程組
                    .childHandler(new ChannelInitializer<SocketChannel>() {//workerGroup使用 SocketChannel創建一個通道初始化對象																														(匿名對象)
                        //給pipeline 設置處理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //可以使用一個集合管理 SocketChannel, 再推送消息時,可以將業務加入到各個channel 對應的 NIOEventLoop 的 									taskQueue 或者 scheduleTaskQueue
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); // 給我們的workerGroup 的 EventLoop 對應的管道設置處理器

            System.out.println(".....服務器 is ready...");
            //綁定一個端口並且同步, 生成了一個 ChannelFuture 對象
            //啟動服務器(並綁定端口)
            ChannelFuture cf = bootstrap.bind(7788).sync();
            //給cf 注冊監聽器,監控我們關心的事件
            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("服務已啟動,端口號為7788...");
                    } else {
                        System.out.println("服務啟動失敗...");
                    }
                }
            });
            //對關閉通道進行監聽
            cf.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

NioEventLoopGroup是用來處理I/O操作的多線程事件循環器。Netty 提供了許多不同的EventLoopGroup的實現來處理不同的傳輸。

上面的服務端應用中,有兩個NioEventLoopGroup被使用。第一個叫作bossGroup,用來接收進來的連接。第二個叫作workerGroup,用來處理已經被接收的連接,一旦 bossGroup接收連接,就會把連接的信息注冊到workerGroup上。

ServerBootstrap是一個NIO服務的引導啟動類。可以在這個服務中直接使用Channel

  • group方法用於 設置EventLoopGroup
  • 通過Channel方法,可以指定新連接進來的Channel類型為NioServerSocketChannel類。
  • childHandler用於指定ChannelHandler,也就是前面實現的NettyServerHandler
  • 可以通過option設置指定的Channel來實現NioServerSocketChannel的配置參數。
  • childOption主要設置SocketChannel的子Channel的選項。
  • bind用於綁定端口啟動服務。

客戶端管道處理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //當通道就緒就會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client ctx =" + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("老板,工資什么時候發給我啊?", CharsetUtil.UTF_8));
    }

    //當通道有讀取事件時,會觸發
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務器回復的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器的地址: "+ ctx.channel().remoteAddress());
    }

    //處理異常, 一般是需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

channelRead方法中將接收到的消息轉化為字符串,方便在控制台上打印出來。

channelRead接收到的消息類型為ByteBufByteBuf提供了轉為字符串的方便方法。

客戶端主程序

public class NettyClient {

    public static void main(String[] args) throws Exception {
        //客戶端需要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建客戶端啟動對象
            //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //設置相關參數
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
                        }
                    });
            System.out.println("客戶端 ok..");
            //啟動客戶端去連接服務器端
            //關於 ChannelFuture 要分析,涉及到netty的異步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7788).sync();
            //給關閉通道進行監聽
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

客戶端只需要一個NioEventLoopGroup就可以了。

測試運行

分別啟動服務器 NettyServer 和客戶端 NettyClient程序

服務端控制台輸出內容:

.....服務器 is ready...
服務已啟動,端口號為7788...
server ctx =ChannelHandlerContext(NettyServerHandler#0, [id: 0xa1b2233c, L:/127.0.0.1:7788 - R:/127.0.0.1:63239])
客戶端發送消息是:老板,工資什么時候發給我啊?
客戶端地址:/127.0.0.1:63239

客戶端控制台輸出內容:

客戶端 ok..
client ctx =ChannelHandlerContext(NettyClientHandler#0, [id: 0x21d6f98e, L:/127.0.0.1:63239 - R:/127.0.0.1:7788])
服務器回復的消息:公司最近賬戶沒啥錢,再等幾天吧!
服務器的地址: /127.0.0.1:7788

至此,一個簡單的基於Netty開發的服務端和客戶端就完成了。

總結

本篇文章主要講解了 Netty 產生的背景、特點、核心組件及如何快速開啟第一個 Netty 應用。

后面我們會分析Netty架構設計ChannelChannelHandler、字節緩沖區ByteBuf線程模型編解碼引導程序等方面的知識。

結尾

我是一個正在被打擊還在努力前進的碼農。如果文章對你有幫助,記得點贊、關注喲,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM