Netty框架


 

學習Netty框架,三連問:

  什么是Netty框架?

  為什么要用Netty框架?

  怎么用Netty框架?

 

什么是Netty框架?

  Netty 是一個廣受歡迎的異步事件驅動的Java開源網絡應用程序框架,用於快速開發可維護的高性能協議服務器和客戶端。

  Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序。

  Netty 是一個基於 NIO 的網絡編程框架,使用 Netty 可以幫助你快速、簡單的開發出一個網絡應用,相當於簡化和流程化了 NIO 的開發過程。

  作為當前最流行的 NIO 框架,Netty 在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,知名的 Elasticsearch 、Dubbo 框架內部都采用了 Netty。

 

為什么要用Netty框架?

因為Netty 對 JDK 自帶的 NIO 的 API 進行了封裝,解決了JDK 原生 NIO 程序的問題。

  JDK 原生 NIO 程序的問題:

    JDK 原生也有一套網絡應用程序 API,但是存在一系列問題,主要如下:

      1)NIO 的類庫和 API 繁雜,使用麻煩:你需要熟練掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

      2)需要具備其他的額外技能做鋪墊:例如熟悉 Java 多線程編程,因為 NIO 編程涉及到 Reactor 模式,你必須對多線程和網路編程非常熟悉,才能編寫出高質量的 NIO 程序。

      3)可靠性能力補齊,開發工作量和難度都非常大:例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等等。NIO 編程的特點是功能開發相對容易,但是可靠性能力補齊工作量和難度都非常大。

      4)JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。官方聲稱在 JDK 1.6 版本的 update 18 修復了該問題,但是直到 JDK 1.7 版本該問題仍舊存在,只不過該 Bug 發生概率降低了一些而已,它並沒有被根本解決。

  Netty的主要特點有:

    1)設計優雅:適用於各種傳輸類型的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴展的事件模型,可以清晰地分離關注點;高度可定制的線程模型 - 單線程,一個或多個線程池;真正的無連接數據報套接字支持(自 3.1 起)。

    2)使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其他依賴項,JDK 5(Netty 3.x)或 6(Netty 4.x)就足夠了。

    3)高性能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的內存復制。

    4)安全:完整的 SSL/TLS 和 StartTLS 支持。

    5)社區活躍、不斷更新:社區活躍,版本迭代周期短,發現的 Bug 可以被及時修復,同時,更多的新功能會被加入。

Netty 常見的使用場景如下:

  1)互聯網行業:在分布式系統中,各個節點之間需要遠程服務調用,高性能的 RPC 框架必不可少,Netty 作為異步高性能的通信框架,往往作為基礎通信組件被這些 RPC 框架使用。典型的應用有:阿里分布式服務框架 Dubbo 的 RPC 框架使用 Dubbo 協議進行節點間通信,Dubbo 協議默認使用 Netty 作為基礎通信組件,用於實現各進程節點之間的內部通信。

  2)游戲行業:無論是手游服務端還是大型的網絡游戲,Java 語言得到了越來越廣泛的應用。Netty 作為高性能的基礎通信組件,它本身提供了 TCP/UDP 和 HTTP 協議棧。

非常方便定制和開發私有協議棧,賬號登錄服務器,地圖服務器之間可以方便的通過 Netty 進行高性能的通信。

  3)大數據領域:經典的 Hadoop 的高性能通信和序列化組件 Avro 的 RPC 框架,默認采用 Netty 進行跨界點通信,它的 Netty Service 基於 Netty 框架二次封裝實現。

有興趣的讀者可以了解一下目前有哪些開源項目使用了 Netty的Related Projects

 

怎么用?(簡單入門)

  可參考學習   netty 官方API: http://netty.io/4.1/api/index.html

 

 配置 pom.xml

1         <dependency>
2             <groupId>io.netty</groupId>
3             <artifactId>netty-all</artifactId>
4             <version>4.1.31.Final</version>
5         </dependency>

 

 配置 ServerConnection.java 

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ServerMsgHandler;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ServerConnection {
    
    Logger logger = Logger.getLogger(ServerConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    public ServerConnection(int port) {
        this.port = port ;
    }
    
    /***
     * NioEventLoopGroup 是用來處理I/O操作的多線程事件循環器,
     * Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。 在這個例子中我們實現了一個服務端的應用,
     * 因此會有2個NioEventLoopGroup會被使用。 第一個經常被叫做‘boss’,用來接收進來的連接。
     * 第二個經常被叫做‘worker’,用來處理已經被接收的連接, 一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
     * 如何知道多少個線程已經被使用,如何映射到已經創建的Channels上都需要依賴於EventLoopGroup的實現,
     * 並且可以通過構造函數來配置他們的關系。
     */
    public void run() {
        System.out.println("啟動服務端Netty連接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * ServerBootstrap 是一個服務端啟動NIO服務的輔助啟動類 , 可以在這個服務中直接使用Channel
         */
        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 這一步是必須的,如果沒有設置group將會報java.lang.IllegalStateException: group not set異常
         */
        bootstrap = bootstrap.group(bossGroup, workerGroup) ;
        /***
         * ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連接
         * 這里告訴Channel如何獲取新的連接.
         */
        bootstrap = bootstrap.channel(NioServerSocketChannel.class) ;
        /***
         * 綁定端口
         */
        bootstrap = bootstrap.localAddress(new InetSocketAddress(port)) ;
        /***
         * 你可以設置這里指定的通道實現的配置參數。 我們正在寫一個TCP/IP的服務端,
         * 因此我們被允許設置socket的參數選項比如tcpNoDelay和keepAlive。
         * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此可以對ChannelOptions的有一個大概的認識。
         */
        bootstrap = bootstrap.option(ChannelOption.SO_BACKLOG, 128) ;
        /***
         * option()是提供給NioServerSocketChannel用來接收進來的連接。
         * childOption()是提供給由父管道ServerChannel接收到的連接,
         * 在這個例子中也是NioServerSocketChannel。
         */
        bootstrap = bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) ;
        /***
         * 這里的事件處理類經常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類,
         * 目的是幫助使用者配置一個新的Channel。
         * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel
         * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的復雜時,可能你會增加更多的處理類到pipline上,
         * 然后提取這些匿名類到最頂層的類上。
         */
        bootstrap = bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
                    pipeline.addLast("handler", new ServerMsgHandler()) ;
                }
            }) ;
        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("服務端開始監聽") ;
//                    logger.info("服務端開始監聽") ;
                }else {
                    logger.error("服務端無法使用監聽端口",future.cause()) ;
                }
            }
        }) ;
    }
    
    public void shutdown() {
        logger.info("關閉 Server 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }

}
View Code

 

配置 ServerMsgHandler.java 

package com.example.demo.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ServerMsgHandler extends ChannelInboundHandlerAdapter {

    /**
     * 這里我們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用,
     * 這個例子中,收到的消息的類型是ByteBuf
     * 
     * @param ctx
     *            通道處理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
        System.out.println("服務端接收的消息:"+msg.toString()) ;
        //向客戶端發送消息
        String str = msg.toString() ;
        if ( "高性能NIO框架——Netty".equals(str) ) {
            ctx.writeAndFlush( "客戶端 , 你好!") ;
        }
//        ctx.writeAndFlush(msg.toString()+"你好!") ;
    }
    
    /***
     * 這個方法會在發生異常時觸發
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件處理方法是當出現 Throwable 對象才會被調用,即當 Netty 由於 IO
         * 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來 並且把關聯的 channel
         * 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不 同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。
         */
        // 出現異常就關閉
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
}
View Code

 

 

配置 ClientConnection.java

package com.example.demo.net;

import java.net.InetSocketAddress;

import org.apache.log4j.Logger;

import com.example.demo.handler.ClientMsgHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class ClientConnection {

    Logger logger = Logger.getLogger(ClientConnection.class);
    private final int port ;
    private EventLoopGroup bossGroup ;
    private EventLoopGroup workerGroup ;
    
    private Channel channel ;
    
    public ClientConnection(int port) {
        this.port = port ;
    }    
    
    public Channel getChannel() {
        return this.channel ;
    }
    
    /***
     * NioEventLoopGroup 是用來處理I/O操作的多線程事件循環器,
     * Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。 在這個例子中我們實現了一個服務端的應用,
     * 因此會有2個NioEventLoopGroup會被使用。
     * 第一個經常被叫做‘boss’,用來接收進來的連接。
     * 第二個經常被叫做‘worker’,用來處理已經被接收的連接, 一旦‘boss’接收到連接,就會把連接信息注冊到‘worker’上。
     * 如何知道多少個線程已經被使用,如何映射到已經創建的Channels上都需要依賴於EventLoopGroup的實現,
     * 並且可以通過構造函數來配置他們的關系。
     * @throws InterruptedException 
     */
    public void run() {
        System.out.println("啟動客戶端Netty連接");
        bossGroup = new NioEventLoopGroup() ;
        workerGroup = new NioEventLoopGroup() ;
        /**
         * Bootstrap 是客戶端一個啟動NIO服務的輔助啟動類 , 可以在這個服務中直接使用Channel
         */
        Bootstrap bootstrap = new Bootstrap() ;
//        ServerBootstrap bootstrap = new ServerBootstrap() ;
        /**
         * 這一步是必須的,如果沒有設置group將會報java.lang.IllegalStateException: group not set異常
         */
//        bootstrap.group(bossGroup, workerGroup)
        bootstrap.group(bossGroup)
        /***
         * ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連接
         * 這里告訴Channel如何獲取新的連接.
         */
//            .channel(NioServerSocketChannel.class)
            .channel(NioSocketChannel.class)
            /***
             * 綁定端口,等價於bootstrap.bind("127.0.0.1", port) ,若下面用了,就要把這個注釋掉,不然會報錯
             */
//            .localAddress(new InetSocketAddress(port))
            .remoteAddress("127.0.0.1", port)
            /***
             * 你可以設置這里指定的通道實現的配置參數。 我們正在寫一個TCP/IP的服務端,
             * 因此我們被允許設置socket的參數選項比如tcpNoDelay和keepAlive。
             * 請參考ChannelOption和詳細的ChannelConfig實現的接口文檔以此可以對ChannelOptions的有一個大概的認識。
             */
//            .option(ChannelOption.SO_BACKLOG, 128)
            /***
             * option()是提供給NioServerSocketChannel用來接收進來的連接。
             * childOption()是提供給由父管道ServerChannel接收到的連接,
             * 在這個例子中也是NioServerSocketChannel。
             */
//            .childOption(ChannelOption.SO_KEEPALIVE, true)
            /***
             * 這里的事件處理類經常會被用來處理一個最近的已經接收的Channel。 ChannelInitializer是一個特殊的處理類,
             * 目的是幫助使用者配置一個新的Channel。
             * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel
             * 或者其對應的ChannelPipeline來實現你的網絡程序。 當你的程序變的復雜時,可能你會增加更多的處理類到pipline上,
             * 然后提取這些匿名類到最頂層的類上。
             */
//            .childHandler(new ChannelInitializer<SocketChannel>() {
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline() ;
                    pipeline.addLast("decoder", new StringDecoder()) ;
                    pipeline.addLast("encoder", new StringEncoder()) ;
//                    pipeline.addLast("handler", new ClientMsgHandler()) ;
                    pipeline.addLast(new ClientMsgHandler()) ;
                }
            }) ;
/*        bootstrap.bind().addListener(new ChannelFutureListener() {            
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if ( future.isSuccess() ) {
                    System.out.println("客戶端開始監聽") ;
                    logger.info("客戶端開始監聽") ;
                }else {
                    logger.error("客戶端無法使用監聽端口",future.cause()) ;
                }
            }
        }) ;    */
        
/*        //綁定端口,開始接收進來的連接
        ChannelFuture cFuture;
        try {
//            cFuture = bootstrap.connect(host, port).sync();            
//            cFuture = bootstrap.bind("127.0.0.1", port).sync();
            cFuture = bootstrap.bind().sync() ;
            //在這里拿到這個channel,是為了 等下 測試消息發送 用的
            channel = cFuture.channel();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }        */
                    
        ChannelFuture cf;
        try {
            cf = bootstrap.connect().sync();
            channel = cf.channel();
            channel.writeAndFlush("ClientConnection客戶端已成功啟動!");
            
            cf.addListener(new ChannelFutureListener() {            
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if ( future.isSuccess() ) {
                        System.out.println("客戶端開始監聽") ;
//                        logger.info("客戶端開始監聽") ;
                    }else {
//                        logger.error("客戶端無法使用監聽端口",future.cause()) ;
                    }
                }
            }) ;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
    }
    
    public void shutdown() {
        System.out.println("關閉 Client 端口");
        logger.info("關閉 Client 端口");
        bossGroup.shutdownGracefully() ;
        workerGroup.shutdownGracefully() ;
    }
    
}
View Code

 

配置 ClientMsgHandler.java

package com.example.demo.handler;

import org.apache.log4j.Logger;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientMsgHandler extends ChannelInboundHandlerAdapter {

    Logger logger = Logger.getLogger(ClientMsgHandler.class) ;
    
    /**
     * 這里我們覆蓋了chanelRead()事件處理方法。 每當從客戶端收到新的數據時, 這個方法會在收到消息時被調用,
     * 這個例子中,收到的消息的類型是ByteBuf
     * 
     * @param ctx
     *            通道處理的上下文信息
     * @param msg
     *            接收的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        
//        super.channelRead(ctx, msg);
        System.out.println("客戶端接收的消息:"+msg.toString()) ;
        //向服務端發送消息
        ctx.writeAndFlush("服務端 , 你好!") ;
    }
    
    /***
     * 這個方法會在發生異常時觸發
     * 
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**
         * exceptionCaught() 事件處理方法是當出現 Throwable 對象才會被調用,即當 Netty 由於 IO
         * 錯誤或者處理器在處理事件時拋出的異常時。在大部分情況下,捕獲的異常應該被記錄下來 並且把關聯的 channel
         * 給關閉掉。然而這個方法的處理方式會在遇到不同異常的情況下有不 同的實現,比如你可能想在關閉連接之前發送一個錯誤碼的響應消息。
         */
        // 出現異常就關閉
        cause.printStackTrace() ;
        ctx.close() ;
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        super.channelActive(ctx);
//        logger.info("client channel active");
        System.out.println("client channel active");
    }
    
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client channel inactive");
        ctx.close() ;
    }
    
}
View Code

 

 

編寫測試類 DemoTest.java 

 1 package com.example.demo.netty;
 2 
 3 import org.junit.runner.RunWith;
 4 import org.springframework.boot.test.context.SpringBootTest;
 5 import org.springframework.test.context.ActiveProfiles;
 6 import org.springframework.test.context.junit4.SpringRunner;
 7 
 8 import com.example.demo.DemoApplicationTests;
 9 import com.example.demo.net.ClientConnection;
10 import com.example.demo.net.ServerConnection;
11 
12 import io.netty.channel.Channel;
13 
14 @RunWith(SpringRunner.class)
15 @SpringBootTest(classes = DemoApplicationTests.class)
16 @ActiveProfiles("test")
17 public class DemoTest {
18 
19     public static void main(String[] args) {
20         int port = 2222 ;
21         Thread serverThread = new Thread( new Runnable() {            
22             @Override
23             public void run() {
24                 new ServerConnection(port).run() ;
25             }
26         } ) ;
27         serverThread.start() ;
28         ClientConnection clientConnection = new ClientConnection(port) ;
29 //        Thread clientThread = new Thread( new Runnable() {            
30 //            @Override
31 //            public void run() {
32 //                new ClientConnection(port).run() ;
33 //            }
34 //        } ) ;
35 //        clientThread.start() ;
36         
37         clientConnection.run();
38         Channel channel = clientConnection.getChannel() ;
39         channel.writeAndFlush("高性能NIO框架——Netty");
40         
41 //        new Thread( ()->{
42 //            new ServerConnection(port) ;
43 //        } ).start() ;        
44 //        new Thread( ()->{
45 //            new ClientConnection(port) ;
46 //        } ).start() ;
47         
48     }
49 }
View Code

 

服務端與客戶端的區別:

  1. 在客戶端只創建了一個NioEventLoopGroup實例,因為客戶端並不需要使用I/O多路復用模型,需要有一個Reactor來接受請求。只需要單純的讀寫數據即可

  2. 在客戶端只需要創建一個Bootstrap對象,它是客戶端輔助啟動類,功能類似於ServerBootstrap。

 

 

共同學習,共同進步,若有補充,歡迎指出,謝謝!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM