Netty 系列一(核心組件和實例).


一、概念

    早期的 Java API 只支持由本地系統套接字庫提供所謂的阻塞函數來支持網絡編程由於是阻塞 I/O ,要管理多個並發客戶端,需要為每個新的客戶端Socket 創建一個 Thread 。這將導致一系列的問題,第一,在任何時候都可能有大量的線程處於休眠狀態(不可能每時每刻都有對應的並發數);第二,需要為每個線程的調用棧都分配內存;第三,JVM 在線程的上下文切換所帶來的開銷會帶來麻煩。

    Java 在 2002 年引入了非阻塞 I/O,位於 JDK 1.4 的 java.nio 包中。class java.nio.channels.Selector 是Java 的非阻塞 I/O 實現的關鍵。它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行 I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀態,所以如圖 1-2 所示,一個單一的線程便可以處理多個並發的連接。

    

    盡管可以直接使用 Java NIO API,但是在高負載下可靠和高效地處理和調度 I/O 操作是一項繁瑣而且容易出錯的任務,最好還是留給高性能的網絡編程專家——Netty。

    Netty 是一款異步的事件驅動的網絡應用程序框架,支持快速的開發可維護的高性能的瞄向協議的服務端和客戶端。它駕馭了Java高級API的能力,並將其隱藏在一個易於使用的API之后。首先,它的基於 Java NIO 的異步的和事件驅動的實現,保證了高負載下應用程序性能的最大化和可伸縮性。其次, Netty 也包含了一組設計模式,將應用程序邏輯從網絡層解耦,簡化了開發過程, 同時也最大限度地提高了可測試性、模塊化以及代碼的可重用性。

 

     tips:面向對象的基本概念—> 用較簡單的抽象隱藏底層實現的復雜性。

二、核心組件

  • Channel

    Channel是Java NIO的一個基本構造。可以看作是傳入或傳出數據的載體。因此,它可以被打開或關閉,連接或者斷開連接。以下是常用的Channel:

-- EmbeddedChannel
-- LocalServerChannel
-- NioDatagramChannel
-- NioSctpChannel
-- NioSocketChannel

  • 回調

    當一個回調被觸發時,相應的事件可以被一個interface-ChannelHandler的實現處理。

  • Future

    Netty中所有的I/O操作都是異步的。因為一個操作可能不會立即返回,所以我們需要一種在之后的某個時間點確定其結果的方法。

    Future 和 回調 是相互補充的機制,提供了另一種在操作完成時通知應用程序的方式。這個對象可以看作是一個異步操作結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問。

    Netty 提供了ChannelFuture,用於在執行異步操作的時候使用。每個Netty的出站I/O操作都會返回一個ChannelFuture。ChannelFuture能夠注冊一個或者多個ChannelFutureListener 實例。監聽器的回調方法operationComplete(),將會在對應的操作完成時被調用。

  • ChannelHandler

    Netty 的主要組件是ChannelHandler,它充當了所有處理入站和出站數據的應用程序邏輯的容器。

    Netty 使用不同的事件來通知我們狀態的改變或者是操作的狀態,每個事件都可以被分發給ChannelHandler類中某個用戶實現的方法。Netty提供了大量預定義的可以開箱即用的ChannelHandler實現,包括用於各種協議的ChannelHandler。

    現在,事件可以被分發給ChannelHandler類中某個用戶實現的方法。那么,如果 ChannelHandler 處理完成后不直接返回給客戶端,而是傳遞給下一個ChannelHandler 繼續處理呢?那么就要說到 ChannelPipeline !

    ChannelPipeline 提供了 ChannelHandler鏈 的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。使得事件流經 ChannelPipeline 是 ChannelHandler 的工作,它們是在應用程序的初始化或者引導階段被安裝的。這些對象接收事件、執行他們所實現的處理邏輯,並將數據傳遞給鏈中的下一個ChannelHandler:

1、一個ChannelInitializer的實現被注冊到了ServerBootstrap中。
2、當 ChannelInitializer.initChannel()方法被調用時, ChannelInitializer將在 ChannelPipeline 中安裝一組自定義的 ChannelHandler。
3、ChannelInitializer 將它自己從 ChannelPipeline 中移除。

  • EventLoop

    EventLoop 定義了Netty的核心抽象,用來處理連接的生命周期中所發生的事件,在內部,將會為每個Channel分配一個EventLoop。

    EventLoop本身只由一個線程驅動,其處理了一個Channel的所有I/O事件,並且在該EventLoop的整個生命周期內都不會改變。這個簡單而強大的設計消除了你可能有的在ChannelHandler實現中需要進行同步的任何顧慮。

 

    這里需要說到,EventLoop的管理是通過EventLoopGroup來實現的。還要一點要注意的是,客戶端引導類是 Bootstrap,只需要一個EventLoopGroup。服務端引導類是 ServerBootstrap,通常需要兩個 EventLoopGroup,一個用來接收客戶端連接,一個用來處理 I/O 事件(也可以只使用一個 EventLoopGroup,此時其將在兩個場景下共用同一個 EventLoopGroup)。

1、一個 EventLoopGroup 包含一個或者多個 EventLoop;
2、一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;
3、所有由 EventLoop 處理的 I/O 事件都將在它專有的Thread 上被處理;
4、一個 Channel 在它的生命周期內只注冊於一個EventLoop;
5、NIO中,一個 EventLoop 分配給多個 Channel(面對多個Channel,一個 EventLoop 按照事件觸發,順序執行); OIO中,一個 EventLoop 分配給一個 Channel。

  tips:Netty 應用程序的一個一般准則:盡可能的重用 EventLoop,以減少線程創建所帶來的開銷。

  • Bootstrap 和 ServerBootstrap

    BootStarp 和 ServerBootstrap 被稱為引導類,指對應用程序進行配置,並使他運行起來的過程。Netty處理引導的方式是使你的應用程序和網絡層相隔離。

    BootStrap 是客戶端的引導類,Bootstrap 在調用 bind()(連接UDP)和 connect()(連接TCP)方法時,會新創建一個 Channel,僅創建一個單獨的、沒有父 Channel 的 Channel 來實現所有的網絡交換。

    ServerBootstrap 是服務端的引導類,ServerBootstarp 在調用 bind() 方法時會創建一個 ServerChannel 來接受來自客戶端的連接,並且該 ServerChannel 管理了多個子 Channel 用於同客戶端之間的通信。

三、實例

    所有的Netty服務端/客戶端都至少需要兩個部分:

1、至少一個ChannelHandler —— 該組件實現了對數據的處理。

2、引導 —— 這是配置服務器的啟動代碼。

    服務端:

public class EchoServer {

    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //1、創建EventLoopGroup以進行事件的處理,如接受新連接以及讀/寫數據
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //2、創建ServerBootstrap,引導和綁定服務器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(group, group)
                    //3、指定所使用的NIO傳輸Channel
                    .channel(NioServerSocketChannel.class)
                    //4、使用指定的端口設置套接字地址
                    .localAddress(new InetSocketAddress(port))
                    //5、添加一個 EchoServerHandler 到子 Channel的 ChannelPipeline
                    //當一個新的連接被接受時,一個新的子Channel將會被創建,而 ChannelInitializer 將會把一個你的EchoServerHandler 的實例添加到該 Channel 的 ChannelPipeline 中
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(serverHandler);
                        }
                    });
            //6、異步地綁定服務器,調用sync()方法阻塞等待直到綁定完成
            ChannelFuture channelFuture = bootstrap.bind().sync();
            System.out.println(EchoServer.class.getName() + "started and listening for connections on" + channelFuture.channel().localAddress());
            //7、獲取 Channel 的 CloseFuture,並且阻塞當前線程直到它完成
            channelFuture.channel().closeFuture().sync();

        } finally {
            //8、關閉 EventLoopGroup 釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoServer(9999).start();
    }
}
@ChannelHandler.Sharable //標識一個Channel-Handler 可以被多個Channel安全的共享
public class EchoServerHandler extends ChannelHandlerAdapter {


    /**
     * 對於每個傳入的消息都要調用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8));
        //將接收到的消息寫給發送者,而不沖刷出站消息
        //ChannelHandlerContext 發送消息。導致消息向下一個ChannelHandler流動
        //Channel 發送消息將會導致消息從 ChannelPipeline的尾端開始流動
        ctx.write(in);
    }

    /**
     * 通知 ChannelHandlerAdapter 最后一次對channel-Read()的調用是當前批量讀取中的最后一條消息
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //暫存於ChannelOutboundBuffer中的消息,在下一次調用flush()或者writeAndFlush()方法時將會嘗試寫出到套接字
        //將這份暫存消息沖刷到遠程節點,並且關閉該Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在讀取操作期間,有異常拋出時會調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoServerHandler.java

 

    客戶端:

public class EchoClient {

    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //創建Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            //指定 EventLoopGroup 以處理客戶端事件;適應於NIO的實現
            bootstrap.group(group)
                    //適用於NIO傳輸的Channel類型
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    //在創建Channel時,向ChannelPipeline中添加一個EchoClientHandler實例
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //連接到遠程節點,阻塞等待直到連接完成
            ChannelFuture channelFuture = bootstrap.connect().sync();
            //阻塞,直到Channel 關閉
            channelFuture.channel().closeFuture().sync();
        } finally {
            //關閉線程池並且釋放所有的資源
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();

        System.out.println("------------------------------------");

        new EchoClient("127.0.0.1", 9999).start();
    }


}
@ChannelHandler.Sharable //標記該類的實例可以被多個Channel共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    /**
     * 當從服務器接收到一條消息時被調用
     *
     * @param ctx
     * @param msg ByteBuf (Netty 的字節容器) 作為一個面向流的協議,TCP 保證了字節數組將會按照服務器發送它們的順序接收
     * @throws Exception
     */
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        System.out.println("Client" + ctx.channel().remoteAddress() + "connected");
        System.out.println(msg.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在到服務器的連接已經建立之后將被調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)  {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock!", CharsetUtil.UTF_8));
    }


    /**
     * 在處理過程中引發異常時被調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}
EchoClientHandler.java

四、結語

    帶着一陣迷糊就開始了Netty學習之旅,學到現在還是對Netty一堆專有名詞頭大!沒辦法,只好硬着頭皮學下去了,畢竟,熟讀唐詩三百首,不會作詩也會吟嘛!

    來總結下,一個Netty服務端處理客戶端連接的過程:

1、創建一個channel同該用戶端進行綁定;
2、channel從EventLoopGroup獲得一個EventLoop,並注冊到該EventLoop,channel生命周期內都和該EventLoop在一起(注冊時獲得selectionKey);
3、channel同用戶端進行網絡連接、關閉和讀寫,生成相對應的event(改變selectinKey信息),觸發eventloop調度線程進行執行;
4、ChannelPipeline 找到對應 ChannelHandler 方法處理用戶邏輯。

    我們項目中使用的 Netty 服務端啟動類:

public class NettyServer {

    public static final Logger logger = LoggerFactory.getLogger(NettyServer.class);

    private static Integer LISTENER_PORT = PropertiesLoader.getResourcesLoader().getInteger("nettyPort");



    private int port;
    EventLoopGroup boss = null;
    EventLoopGroup worker = null;
    ServerBootstrap serverBootstrap = null;

    public static NettyServer nettyServer = null;

    public static NettyServer getInstance() {
        if (nettyServer == null) {
            synchronized (NettyServer.class) {
                if (nettyServer == null) {
                    nettyServer = new NettyServer(LISTENER_PORT==null?9999:LISTENER_PORT);
                }
            }
        }
        return nettyServer;
    }

    /**
     * 構造函數
     *
     * @param port 端口
     */
    private NettyServer(int port) {
        this.port = port;

    }

    /**
     * 綁定
     *
     * @throws InterruptedException
     */
    public void init() throws InterruptedException {
        try {

            //創建兩個線程池
            //目前服務器CPU為單核8線程,調整線程為8
            boss = new NioEventLoopGroup(8);
            worker = new NioEventLoopGroup(8);

            serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);//兩個工作線程
            serverBootstrap.channel(NioServerSocketChannel.class);
            //重用緩沖區
            serverBootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            serverBootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            //自動調整下一次緩沖區建立時分配的空間大小,避免內存的浪費
            serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
            //當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度,默認值50。
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            //用於啟用或關於Nagle算法。如果要求高實時性,有數據發送時就馬上發送,就將該選項設置為true關閉Nagle算法;如果要減少發送次數減少網絡交互,就設置為false等累積一定大小后再發送。默認為false。
            serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
            //是否啟用心跳保活機制
            serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            //支持tcp協議
            //bootstrap.childHandler(new TcpChannelInitializer());

            //支持webSocket協議
            serverBootstrap.childHandler(new WebSocketChannelInitializer());
            ChannelFuture f = serverBootstrap.bind(port).sync();
            if (f.isSuccess()) {
                logger.info("netty server start...");
            }
            //等到服務端監聽端口關閉
            f.channel().closeFuture().sync();
        } finally {
            //優雅釋放線程資源
            boss.shutdownGracefully().sync();
            worker.shutdownGracefully().sync();
        }
    }

    /**
     * 銷毀netty相關資源
     */
    public void destroy() {
        try {
            if (boss != null) {
                boss.shutdownGracefully();
            }
            if (worker != null) {
                worker.shutdownGracefully();
            }
            if (serverBootstrap != null) {
                serverBootstrap = null;
            }
        } catch (Exception e) {
            logger.error("netty close err:" + e.getMessage(), e);
        }
    }
}
NettyServer.java

 

tips: ServerBootstrap  中增加了一個方法childHandler(),它的目的是添加 ChannelHandler ;Bootstrap 中添加 ChannelHandler 用 handler() 方法。

 

參考資料:《Netty IN ACTION》

演示源代碼:https://github.com/JMCuixy/NettyDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM