Netty與網絡編程


Netty什么?

Netty項目是一個提供異步事件驅動網絡應用框架和快速開發可維護的高性能高擴展性服務端和客戶端協議工具集的成果。
換句話說,Netty是一個NIO客戶端服務端框架,它使得快速而簡單的開發像服務端客戶端協議的網絡應用成為了可能。它它極大的簡化並流線化了如TCP和UDP套接字服務器開發的網絡編程。
“快速且簡便”不意味着目標應用將容忍維護性和性能上的問題。Netty在吸取了大量協議實現(如FTP,SMTP,HTTP以及各種二進制,基於文本的傳統協議)的經驗上進行了精心的設計。由此,Netty成功找到了一個無需折衷妥協而讓開發、性能、穩定性和靈活性相互協調的方法。

1、回顧

a) http協議是我們接觸最多的,定義的API都是基於Http協議的,Http協議屬於應用層協議。傳輸層協議,TCP / UDP協議,TCP是通過三次握手,保障通信的可信,也就是我們常說的長連接。Netty是對TCP協議的封裝。

     

b) JAVA BIO / NIO / AIO 

Java1.4版本引入NIO概念,實現了對“多路復用IO”的支持,Java1.7版本引入AIO概念。AIO是最晚提出的,理應是更先進的技術,但是並沒有大規模的在商業領域應用。Unix提供了五種參考網絡模型,在linux領域,並沒有異步IO網絡模型的成熟方案(linux發行版內核不支持AIO,需要自己安裝擴展包)。JAVA的新版本的AIO,還是用采用多路復用IO實現。在Windows平台通過IOCP協議實現,可參考 IOCP淺析

2、Netty 編程基礎

現在從最簡單的Echo程序入手,逐步深入地分享利用Netty如何編程。

Netty Server Exemple

EventLoopGroup group = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
 
try {
   ServerBootstrap b = new ServerBootstrap();
   b.group(group, workGroup)
         .channel(NioServerSocketChannel.class)
         .localAddress(new InetSocketAddress(port))
         .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
               ch.pipeline().addLast(new EchoServerHandler());
            }
         });
 
   ChannelFuture f = b.bind().sync();
   System.out.println(EchoServer.class.getName() + " started and listen on " + f.channel().localAddress());
   f.channel().closeFuture().sync();
} finally {
   group.shutdownGracefully().sync();
}

Netty Server Handler Exemple

public class EchoServerHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   @Override
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
      ByteBuf in = (ByteBuf) msg;
      System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
      ctx.write(Unpooled.copiedBuffer("Response from server. You have input \"" + in.toString(CharsetUtil.UTF_8) + "\"!", CharsetUtil.UTF_8));
      ctx.flush();
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      cause.printStackTrace();
      ctx.close();
   }
 

Netty Client Exemple

EventLoopGroup group = new NioEventLoopGroup();
 
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .remoteAddress(new InetSocketAddress(host, port))
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch)
             throws Exception {
             ch.pipeline().addLast(
                     new EchoClientHandler());
         }
     });
 
    ChannelFuture f = b.connect().sync();
    if (f.channel().isActive()) {
        f.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Casper!", CharsetUtil.UTF_8));
    }
 
    Thread.sleep(1000);
 
} finally {
    group.shutdownGracefully().sync();
}

Netty Client Handler Exemple

public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
   @Override
   public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
      System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
   }
 
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
      cause.printStackTrace();
      ctx.close();
   }
 
}

3、BootStrap

一個Netty程序開始於Bootstrap類,Bootstrap類是Netty提供的一個可以通過簡單配置來設置或"引導"程序的一個很重要的類。啟動Sever端,需要初始化ServerBootStrap。定義兩個EventLoopGroup,分別處理連接請求和socket數據傳輸。Netty巧妙的把接收請求和處理請求,都抽象成ChannelHanndler處理,模式上更加統一。通過閱讀代碼,接收請求的處理如下:接到連接請求后,設置其初始化參數,然后注冊到childGroup處理。

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    final EventLoopGroup currentChildGroup = this.childGroup;
    final ChannelHandler currentChildHandler = this.childHandler;
 
    p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)});
                }
            });
        }
    }});
}
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    private final EventLoopGroup childGroup;
    private final ChannelHandler childHandler;
    ServerBootstrapAcceptor(final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
    }
 
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel)msg;
        child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
        try {
             this.childGroup.register(child).addListener(new ChannelFutureListener() {
         });
         } catch (Throwable var8) {
         }
    }
}

4、EventLoopGroup

SingleThreadEventLoopGroup  一個線程處理所有的Channel

ThreadPerChannelLoopGroup  每個線程處理一個channel

MultiThreadEventLoopGroup 通過線程組處理channel

       NIOEventLoopGroup

       EpollEventLoopGroup       根據Selecter的不同實現,不同的處理策略。NIOEventLoopGroup,默認采用sellect方式,參考JAVA NIO實現。EpollEventLoopGroup只能在linux平台使用,更高效。

5、編程模式

宏觀上,ChannelPipline串聯起所有的ChannelHandler,在特定的時間節點,調用Handler的函數,完成處理流程。處理流程如下圖所示:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
 
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }
 
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }
 
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }
 
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }
 
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
 
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }
 
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
 
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }
 
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelPipline:內部包含一個ChannelHandlerContext的鏈表數據結構,每次從header item開始,調用context.invoke函數,開始處理流程。

ChannelHandlerContext:在Channelhandler中,調用context的fire函數;fire函數在pipline中查找需要執行的下一個context,調用context.invoke函數;調用channelhandler的函數。

ChannelHandler:channelChandler分為inbound和outbond,inbound處理接收消息,outbound處理傳出。如上所示,channelInboundHandler中定義了一系列的函數,一般情況下,編寫Netty程序,只需要在ChannelHandler中處理業務邏輯,編程模型相當簡單。

7、編碼與解碼

代碼實例中,channelHandler中讀到的是ByteBuf對象,其實就是byte數組,在程序內部,把byte數組轉換成了字符串。

問題1、半包問題,沒有讀到完整的數據,就進行了轉換,導致錯誤。

問題2、編解碼與業務柔和在一起,設計上不完美。

Netty提供了編碼器,解碼器以及編解碼器。

public class RpcChannelHandler extends SimpleChannelInboundHandler<RpcResponse> implements RpcChannel {
 
    // 讀取數據,讀的是對象
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
        String requestId = rpcResponse.getRequestId();
        if (rpcFutureMap.containsKey(requestId)) {
            RpcFuture rpcFuture = rpcFutureMap.get(requestId);
            rpcFutureMap.remove(requestId);
            rpcFuture.finish(rpcResponse);
        }
    }
 
    @Override
    public RpcFuture call(RpcRequest request, RpcCallback callback) {
        this.channel.writeAndFlush(request); // 寫數據,寫的也是對象
    }
}
 

編碼器

public class RpcEncoder extends MessageToByteEncoder {
 
    @Override
    public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
        if (genericClass.isInstance(in)) {
            byte[] data = SerializationUtil.serialize(in);
            out.writeInt(data.length);
            out.writeBytes(data);
        }
    }
}

解碼器

public class RpcDecoder extends ByteToMessageDecoder {
 
 
    @Override
    public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() < 4) {
            return;
        }
        in.markReaderIndex();
        int dataLength = in.readInt();
 
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        byte[] data = new byte[dataLength];
        in.readBytes(data);
 
        Object obj = SerializationUtil.deserialize(data, genericClass);
        out.add(obj);
    }
}

處理流程如下:

編碼解碼器類結構

編碼器是一個ChannelHandler,放在pipline中,作為處理請求消息的一個環節。處理傳入參數的是Decoder,負責把二進制的數據,轉換成程序可識別的數據結構,其實現了InboundChannelHandler接口;一般情況下,decoder要放在pip的header位置,即addFrist。處理傳出參數的是Encoder,負責把程序內部的數據結構,轉換成可在網絡傳輸的二進制數據;一般情況下,encoder需要放在pipline的最后處理。

分別實現Encoder和Decoder,可能是代碼放在兩個類里實現;Netty提供了Codec,在一個類里實現編碼和解碼。

Netty對協議的支持,都是通過提供編碼解碼器實現的。例如:http協議

       HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
       HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
       HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
       HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent

8、Netty 為什么高效?

1)、線程模型

  • Reactor單線程模型;
    • 作為NIO服務端,接收客戶端的TCP連接;
    • 作為NIO客戶端,向服務端發起TCP連接;
    • 讀取通信對端的請求或者應答消息;
    • 向通信對端發送消息請求或者應答消息。
  • Reactor多線程模型;
    • 有專門一個NIO線程-Acceptor線程用於監聽服務端,接收客戶端的TCP連接請求;
    • 網絡IO操作-讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
    • 1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。
  • 主從Reactor多線程模型。
    • 為了解決1個服務端監聽線程無法有效處理所有客戶端連接的性能不足問題,有一組NIO線程處理服務氣短的監聽。

      如上的幾種模式,在ServerBootStrap.goup初始化時設置,我們的例子,采用的是主從Reactor多線程模型。

 

2)、Zero Copy

  • Netty的接收和發送ByteBuffer采用DIRECT BUFFERS,使用堆外直接內存進行Socket讀寫,不需要進行字節緩沖區的二次拷貝。如果使用傳統的堆內存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內存Buffer拷貝一份到直接內存中,然后才寫入Socket中。相比於堆外直接內存,消息在發送過程中多了一次緩沖區的內存拷貝。
    ByteBufAllocator 通過ioBuffer分配堆外內存:
    public ByteBuf ioBuffer() {
    return PlatformDependent.hasUnsafe() ? this.directBuffer(256) : this.heapBuffer(256);
    }
      
  • Netty提供了組合Buffer對象,可以聚合多個ByteBuffer對象,用戶可以像操作一個Buffer那樣方便的對組合Buffer進行操作,避免了傳統通過內存拷貝的方式將幾個小Buffer合並成一個大的Buffer。
    private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
    ......
    wasAdded = this.components.add(c);
    ......
    return var11;
    }
  • Netty的文件傳輸采用了transferTo方法,它可以直接將文件緩沖區的數據發送到目標Channel,避免了傳統通過循環write方式導致的內存拷貝問題。linux零拷貝,sendfile https://blog.csdn.net/caianye/article/details/7576198
    private native long transferTo0(FileDescriptor var1, long var2, long var4, FileDescriptor var6);

3)、Select VS Epoll

        多路復用IO有多種實現方式,其中,select/poll是所有操作系統都支持的方式,提出時間較早。JAVA NIO默認實現是用的Select,windows操作系統只支持Select。Epoll提出較晚,在linux系統提供,是目前比較成熟穩定的方案。

 

  • select/poll 函數監視的文件描述符分3類,分別是writefds(可寫)、readfds(可讀)、和exceptfds(異常)。調用后select函數會阻塞,直到有描述符就緒(有數據 可讀、可寫、或者有except),或者超時(timeout指定等待時間,如果立即返回設為null即可),函數返回。當select函數返回后,可以通過遍歷fdset,來找到就緒的描述符。
    select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

     
  • epoll是在2.6內核中提出的,是之前的select和poll的增強版本。首先,epoll使用一組函數來完成,而不是單獨的一個函數;其次,epoll把用戶關心的文件描述符上的事件放在內核里的一個事件表中,無須向select和poll那樣每次調用都要重復傳入文件描述符集合事件集。
    int epoll_create(int size);  // 該函數返回的文件描述符指定要訪問的內核事件表,是其他所有epoll系統調用的句柄,
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  //告訴操作系統需要關注哪些文件描述符
    int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout); // 等待事件發生

9、Netty能做什么?

1)、Rpc框架,代表 Dubbo

       Http VS TCP

      

  • 采用Http協議,增加了http協議的解碼編碼過程
  • http協議本身無狀態的,連接的復用不好;(可通過keep-alive解決一部分連接復用的問題)

      Tips: 

2)、服務代理,代表 NRedis-Proxy 參考: https://my.oschina.net/u/2608504/blog/787976

NRedis-Proxy 是一個Redis中間件服務,第一個Java 版本開源Redis中間件,無須修改業務應用程序任何代碼與配置,與業務解耦;以Spring為基礎開發自定義標簽,讓它可配置化,使其更加容易上手;以netty 作為通信傳輸工具,讓它具有高性能,高並發,可分布式擴展部署等特點,單片性能損耗約5%左右。

      這里寫圖片描述

3)、中間件Vert.x  參考 https://vertx.io/

clipse Vert.x is event driven and non blocking. This means your app can handle a lot of concurrency using a small number of kernel threads. Vert.x lets your app scale with minimal hardware.

參考文章

http://ifeve.com/netty-mina-in-depth-1/ Netty-Mina深入學習與對比(一)
http://ifeve.com/netty-mina-in-depth-2/ Netty-Mina深入學習與對比(二)
http://www.52im.net/thread-304-1-1.html 詳解快的打車架構設計及技術實踐
http://www.52im.net/thread-306-1-1.html Java新一代網絡編程模型AIO原理及Linux系統AIO介紹
https://www.ibm.com/developerworks/cn/java/j-lo-comet/ Servlet 3.0 實戰:異步 Servlet 與 Comet 風格應用程序
https://www.jianshu.com/p/fbe0430959e8 利用Vertx構建簡單的API 服務、分布式服務
http://www.52im.net/thread-400-1-1.html NIO框架詳解:Netty的高性能之道


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM