netty的基本介紹


一、什么是netty?為什么要用netty

  netty是jboss提供的一個java開源框架,netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可用性的網絡服務器和客戶端程序。也就是說netty是一個基於nio的編程框架,使用netty可以快速的開發出一個網絡應用。

  由於java 自帶的nio api使用起來非常復雜,並且還可能出現 Epoll Bug,這使得我們使用原生的nio來進行網絡編程存在很大的難度且非常耗時。但是netty良好的設計可以使開發人員快速高效的進行網絡應用開發。

 

二、netty的功能特性和架構思想

    如下圖所示:netty的核心是支持零拷貝的bytebuf緩沖對象、通用通信api和可擴展的事件模型;它支持多種傳輸服務並且支持HTTP、Protobuf、二進制、文本、WebSocket 等一系列常見協議,也支持自定義協議。

  

  netty的模型是基於reactor多線程模型,其中mainReactor用於接收客戶端請求並轉發給subReactor。SubReactor負責通道的讀寫請求,非 IO 請求(具體邏輯處理)的任務則會直接寫入隊列,等待 worker threads 進行處理。

    

三、netty中的一些核心的概念

  1、bootstrap、serverBootstrap:bootstrap的意思是引導,其主要作用是配置整個netty程序,將各個組件整合起來。serverBootstrap是服務器端的引導類。bootstrap用於連接遠程主機它有一個EventLoopGroup ;serverBootstrap用於監聽本地端口有兩個EventLoopGroup。

  2、eventLoop:eventLoop維護了一個線程和任務隊列,支持異步提交執行任務。

  3、eventLoopGroup:eventLoopGroup 主要是管理eventLoop的生命周期,可以將其看作是一個線程池,其內部維護了一組eventLoop,每個eventLoop對應處理多個Channel,而一個Channel只能對應一個eventLoop。

  4、channelPipeLine:是一個包含channelHandler的list,用來設置channelHandler的執行順序。

  5、Channel:Channel代表一個實體(如一個硬件設備、一個文件、一個網絡套接字或者一個能夠執行一個或者多個不同的IO操作的程序組件)的開放鏈接,如讀操作和寫操作。

  6、Futrue、ChannelFuture :Future提供了另一種在操作完成時通知應用程序的方式。這個對象可以看作是一個異步操作結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問。netty的每一個出站操作都會返回一個ChannelFuture。future上面可以注冊一個監聽器,當對應的事件發生后會出發該監聽器。

  7、ChannelInitializer:它是一個特殊的ChannelInboundHandler,當channel注冊到eventLoop上面時,對channel進行初始化

  8、ChannelHandler:用來處理業務邏輯的代碼,ChannelHandler是一個父接口,ChannelnboundHandler和ChannelOutboundHandler都繼承了該接口,它們分別用來處理入站和出站。

  9、ChannelHandlerContext:允許與其關聯的ChannelHandler與它相關聯的ChannlePipeline和其它ChannelHandler來進行交互。它可以通知相同ChannelPipeline中的下一個ChannelHandler,也可以對其所屬的ChannelPipeline進行動態修改。

 

四、netty中常用的自帶解碼器和編碼器(編解碼器名字對應的只列舉一個)

  DelimiterBasedFrameDecoder :分隔符解碼器,以設定的符號作為消息的結束解決粘包問題

  FixedLengthFrameDecoder :定長解碼器,作用於定長的消息

  LineBasedFrameDecoder  :按照每一行進行分割,也就是特殊的分隔符解碼器,它的分割符為\n或者\r\n。

  LengthFieldBasedFrameDecoder  :通過消息中設置的長度字段來進行粘包處理。該解碼器總共有5個參數

  LengthFieldBasedFrameDecoder(int maxFrameLength,  單個包的最大大小

                 int lengthFieldOffset,   定義長度的字段的相對包開始的偏移量

                 int lengthFieldLength,  定義長度字段所占字節數

                 int lengthAdjustment,  lengthAdjustment  = 數據長度字段之后剩下包的字節數 -  數據長度取值(也就是長度字段之后的所有非數據的其他信息)

                 int initialBytesToStrip)  從包頭開始,要忽略的字節數

 

  HttpRequestDecoder  :將字節解碼為HttpRequest、HttpContent和LastHttpContent消息

  HttpResponseDecoder  :將字節解碼為HttpResponse、HttpContent和LastHttpContent消息

  ReplayingDecoder  :一個特殊的ByteToMessageDecoder ,可以在阻塞的i/o模式下實現非阻塞的解碼。 ReplayingDecoder 和ByteToMessageDecoder 最大的不同就是ReplayingDecoder 允許你實現decode()和decodeLast()就像所有的字節已經接收到一樣,不需要判斷可用的字節

  

  Base64Decoder   :Base64編碼器

  StringDecoder  :將接收到的ByteBuf轉化為String

  ByteArrayDecoder  :將接收到的ByteBuf轉化為byte 數組

  DatagramPacketDecoder  :運用指定解碼器來對接收到的DatagramPacket進行解碼

  MsgpackDecoder  :用於Msgpack序列化的解碼器

  ProtobufDecoder  :用於Protobuf協議傳輸的解碼器

  HttpObjectAggregator :將http消息的多個部分聚合起來形成一個FullHttpRequest或者FullHttpResponse消息。

 

  

 

  LengthFieldPrepender  :將消息的長度添加到消息的前端的編碼器,一般是和LengthFieldBasedFrameDecoder搭配使用

 

  HttpServerCodec  :相當於HttpRequestDecoder和HttpResponseEncoder

  HttpClientCodec  :相當於HttpRequestEncoder和HttpResponseDecoder

  ChunkedWriteHandler  :在進行大文件傳輸的時候,一次將文件的全部內容映射到內存中,很有可能導致內存溢出,ChunkedWriteHandler可以解決大文件或者碼流傳輸過程中可能發生的內存溢出問題

  

五、netty的簡單使用

  

public class MyClient {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {

                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0,4,0,4))
                        .addLast(new LengthFieldPrepender(4))
                        .addLast(new StringDecoder(CharsetUtil.UTF_8))
                        .addLast(new StringEncoder(CharsetUtil.UTF_8))
                        .addLast(new SimpleChannelInboundHandler<String>() {

                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                System.out.println(ctx.channel().remoteAddress()+": "+msg);
                                ctx.writeAndFlush("來自客戶端的信息");
                            }
                            
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                for(int i=0;i<10;i++){
                                    ctx.writeAndFlush("客戶端第"+i+"條消息");
                                }
                            }
                            
                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                cause.printStackTrace();
                                ctx.close();
                            }
                        });
                    }
                    
                });
            
            ChannelFuture future = bootstrap.connect("localhost", 9999).sync();
            future.channel().closeFuture().sync();
            
        }finally{
            nioEventLoopGroup.shutdownGracefully().sync();
        }
    }

}

 

public class MyServer {

    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<Channel>() {

                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline()
                        .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4,0,4))
                        .addLast(new LengthFieldPrepender(4))
                        .addLast(new StringDecoder(CharsetUtil.UTF_8))
                        .addLast(new StringEncoder(CharsetUtil.UTF_8))
                        .addLast(new SimpleChannelInboundHandler<String>() {
                            
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                System.out.println(ctx.channel().remoteAddress()+":"+msg);
                                ctx.writeAndFlush("from server: "+UUID.randomUUID());
                            }
                            
                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                cause.printStackTrace();
                                ctx.close();
                            }
                        });
                    }
                });
            ChannelFuture future = serverBootstrap.bind(9999).sync();
            future.channel().closeFuture().sync();
        }finally{
            bossGroup.shutdownGracefully().sync();
            workerGroup.shutdownGracefully().sync();
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM