Netty簡介及服務器客戶端簡單開發流程


什么是Netty

  • Netty是一個基於Java NIO的編寫客服端服務器的框架,是一個異步事件框架。
  • 官網https://netty.io/

為什么選擇Netty

由於JAVA NIO編寫服務器的過程過於復雜且不易掌控,所以我們選擇Netty框架進行開發。

  • 具有很高的的性能。
  • 且比NIO更容易編碼和維護。
  • 實踐者眾多,Elastic Search,dubbo,Akka,grpc等等
  • 社區很成熟。

Netty的常用組件

  • EventLoopGroup 相當於Reactor線程組,常用NioEventLoopGroup
  • Channel 連接到網絡套接字或能夠進行I/O操作(如讀、寫、連接和綁定)的組件的連接。常用NioServerSocketChannel,NioSocketChannel,SocketChannel
  • Bootstrap/ServerBootstrap 輔助類
  • ChannelPipeline 處理或截取通道的入站事件和出站操作的通道處理程序列表
  • ChannelHandler 處理I/O事件或攔截I/O操作,並將其轉發到其ChannelPipeline中的下一個處理程序。常用ChannelInboundHandlerAdapter和SimpleChannelInboundHandler,編碼器,解碼器。
  • ChannelFuture 異步操作使用。

Netty實現一個服務器

服務器代碼:

/**
 * @author monkjavaer
 * @date 2019/7/18 14:56
 */
public class NettyServer {
    private static Logger LOGGER = LoggerFactory.getLogger(NettyServer.class);
    public static int PORT = 8080;
    public static void connect(){
        //配置兩個服務端的NIO線程組,一個用於接收客服端的鏈接,另一個用於進行SocketChannel的網絡讀寫。
        //NioEventLoopGroup是一個處理I/O操作的多線程事件循環
        //"boss":接收一個傳入連接
        EventLoopGroup boss = new NioEventLoopGroup();
        //"worker" : 當boss接收連接並把接收的連接注冊給worker,work就開始處理
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //ServerBootstrap是一個幫助類,可以設置服務器
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss,worker)
                    //NioServerSocketChannel用於實例化新通道來接收傳入的連接
                    .channel(NioServerSocketChannel.class)
                    //配置日志
                    .handler(new LoggingHandler(LogLevel.INFO))
                    //ChannelInitializer用於配置新通道
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //通過ChannelPipeline添加處理類ChannelHandler
                            //通常有很多處理類,可以將這個內部類new ChannelInitializer提為一個獨立的類
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyServerHandler());
                        }
                    })
                    //ChannelOption和ChannelConfig可以設置各種參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //option()用於接受傳入連接的NioServerSocketChannel,childOption()用於父ServerChannel接受的通道
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            //異步地綁定服務器;調用 sync()方法阻塞等待直到綁定完成
            ChannelFuture f = bootstrap.bind(PORT).sync();
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        NettyServer.connect();
    }
}
  • 首先創建兩個NioEventLoopGroup線程組,一個用於接收客服端的鏈接,另一個用於進行SocketChannel的網絡讀寫。
  • 創建一個服務器幫助類ServerBootstrap,將boss,worker兩個線程組加入
  • 通過ServerBootstrap流式設置服務器
  • 設置通道為NioServerSocketChannel
  • 通過ChannelInitializer設置自定義處理器NettyServerHandler,並將他加入ChannelPipeline,這里用內部類簡易實現,真實線上環境我們應該提取為相應的類。
  • 通過option和childOption設置TCP相關參數。
  • 異步地綁定服務器;調用 sync()方法阻塞等待直到綁定完成
  • 最后關閉相關資源

服務器處理類:

客戶端的處理類和服務器類似。

/**
 * ChannelHandler.Sharable 標注一個channel handler可以被多個channel安全地共享
 * ChannelInboundHandlerAdapter實現了ChannelInboundHandler
 * 回調事件處理類
 *
 * @author monkjavaer
 * @date 2019/7/18 15:36
 */
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);

    /**
     * 新的連接被建立時調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("client {} connected.", ctx.channel().remoteAddress());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        //獲取緩沖區可讀字節數
        int readableBytes = byteBuf.readableBytes();
        byte[] bytes = new byte[readableBytes];
        byteBuf.readBytes(bytes);
        LOGGER.info("readableBytes is{},server received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
//        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
//                .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("server exceptionCaught,{}",cause.getMessage());
        ctx.close();
    }
}
  • 自定義處理類繼承自ChannelInboundHandlerAdapter
  • 重寫我們需要的方法,channelActive(新的連接被建立時調用),channelRead(讀取數據),channelReadComplete(讀取最后的一條信息),exceptionCaught(發生異常時調用)

Netty實現一個客戶端

客戶端

/**
 * @author monkjavaer
 * @date 2019/7/18 17:17
 */
public class NettyClient {
    private static Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);
    public static String IP = "127.0.0.1";
    public static int PORT = 8080;

    public static void main(String[] args) {
        EventLoopGroup client = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(client)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new NettyClientHandler());
                        }
                    });

            ChannelFuture f = bootstrap.connect(IP,PORT).sync();
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            client.shutdownGracefully();
        }
    }
}
  • 客戶端這里只創建一個線程組即可
  • 幫助類這里使用Bootstrap
  • 設置通道為NioSocketChannel
  • 其他類容和服務器雷士
  • 和服務器建立連接

客戶端處理類

/**
 * @author monkjavaer
 * @date 2019/7/18 17:26
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static Logger LOGGER = LoggerFactory.getLogger(NettyServerHandler.class);

    /**
     * 新的連接被建立時調用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("server {} connected.", ctx.channel().remoteAddress());
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        //獲取緩沖區可讀字節數
        int readableBytes = byteBuf.readableBytes();
        byte[] bytes = new byte[readableBytes];
        byteBuf.readBytes(bytes);
        LOGGER.info("readableBytes is{},client received message:{}", readableBytes, new String(bytes, StandardCharsets.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("server exceptionCaught,{}",cause.getMessage());
        ctx.close();
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM