一、什么是Netty?
Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见 https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
二、Netty和Tomcat有什么区别?
Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。
有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。
三、为什么Netty受欢迎?
如第一部分所述,netty是一款收到大公司青睐的框架,在我看来,netty能够受到青睐的原因有三:
- 并发高
- 传输快
- 封装好
四、Netty为什么并发高

Selector
。
当一个连接建立之后,他有两个步骤要做,第一步是接收完客户端发过来的全部数据,第二步是服务端处理完请求业务之后返回response给客户端。NIO和BIO的区别主要是在第一步。
在BIO中,等待客户端发数据这个过程是阻塞的,这样就造成了一个线程只能处理一个请求的情况,而机器能支持的最大线程数是有限的,这就是为什么BIO不能支持高并发的原因。
而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端—— 这个过程是不阻塞的,这样就能让一个Thread处理更多的请求了。
下面两张图是基于BIO的处理流程和netty的处理流程,辅助你理解两种方式的差别:

除了BIO和NIO之外,还有一些其他的IO模型,下面这张图就表示了五种IO模型的处理流程:

- BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。
- NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
- 多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
- 信号驱动IO,这种IO模型主要用在嵌入式开发,不参与讨论。
- 异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。
五、Netty为什么传输快
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

六、为什么说Netty封装好?
要说Netty为什么封装好,这种用文字是说不清的,直接上代码:
- 阻塞I/O
public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); //1 try { for (;;) { final Socket clientSocket = socket.accept(); //2 System.out.println("Accepted connection from " + clientSocket); new Thread(new Runnable() { //3 @Override public void run() { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4 out.flush(); clientSocket.close(); //5 } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); //6 } } catch (IOException e) { e.printStackTrace(); } } }
- 非阻塞IO
public class PlainNioServer { public void serve(int port) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); //1 Selector selector = Selector.open(); //2 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3 final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); for (;;) { try { selector.select(); //4 } catch (IOException ex) { ex.printStackTrace(); // handle exception break; } Set<SelectionKey> readyKeys = selector.selectedKeys(); //5 Iterator<SelectionKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { //6 ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //7 System.out.println( "Accepted connection from " + client); } if (key.isWritable()) { //8 SocketChannel client = (SocketChannel)key.channel(); ByteBuffer buffer = (ByteBuffer)key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { //9 break; } } client.close(); //10 } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { // 在关闭时忽略 } } } } } }
- Netty
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleasableBuffer( Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //1 b.group(group) //2 .channel(OioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() {//3 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5 } }); } }); ChannelFuture f = b.bind().sync(); //6 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //7 } } }
从代码量上来看,Netty就已经秒杀传统Socket编程了,但是这一部分博大精深,仅仅贴几个代码岂能说明问题,在这里给大家介绍一下Netty的一些重要概念,让大家更理解Netty。
-
Channel
数据传输流,与channel相关的概念有以下四个,上一张图让你了解netty里面的Channel。
Channel一览- Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
- ChannelHandler,核心处理业务就在这里,用于处理业务请求。
- ChannelHandlerContext,用于传输业务数据。
- ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
- ByteBuf
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:

他有三种使用模式:
- Heap Buffer 堆缓冲区
堆缓冲区是ByteBuf最常用的模式,他将数据存储在堆空间。 - Direct Buffer 直接缓冲区
直接缓冲区是ByteBuf的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4引入的nio的ByteBuffer类允许jvm通过本地方法调用分配内存,这样做有两个好处- 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
- DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.
- Composite Buffer 复合缓冲区
复合缓冲区相当于多个不同ByteBuf的视图,这是netty提供的,jdk不提供这样的功能。
除此之外,他还提供一大堆api方便你使用,在这里我就不一一列出了,具体参见ByteBuf字节缓存。
- Codec
Netty中的编码/解码器,通过他你能完成字节与pojo、pojo与pojo的相互转换,从而达到自定义协议的目的。
在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。
七、实战
1、引入相关依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.pubing</groupId> <artifactId>helloNetty</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency> </dependencies> </project>
2、服务端
package com.zhouzhiyao.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; //实现客户端发送请求,服务器端会返回Hello Netty public class HelloNettyServer { public static void main(String[] args) throws InterruptedException { /** * 定义一对线程组(两个线程池) * */ //主线程组,用于接收客户端的链接,但不做任何处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //定义从线程组,主线程组会把任务转给从线程组进行处理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * 服务启动类,任务分配自动处理 * */ ServerBootstrap serverBootstrap = new ServerBootstrap(); //需要去针对一个之前的线程模型(上面定义的是主从线程) serverBootstrap.group(bossGroup, workerGroup) //设置NIO的双向通道 .channel(NioServerSocketChannel.class) //子处理器,用于处理workerGroup /** * 设置chanel初始化器 * 每一个chanel由多个handler共同组成管道(pipeline) */ .childHandler(new HelloNettyServerInitializer()); /** * 启动 * */ //绑定端口,并设置为同步方式,是一个异步的chanel ChannelFuture future = serverBootstrap.bind(8888).sync(); /** * 关闭 */ //获取某个客户端所对应的chanel,关闭并设置同步方式 future.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); }finally { //使用一种优雅的方式进行关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3、建立初始化器HelloNettyServerInitializer
package com.zhouzhiyao.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; /** * 这是一个初始化器,chanel注册后会执行里面相应的初始化方法(也就是将handler逐个进行添加) * * @author phubing * */ public class HelloNettyServerInitializer extends ChannelInitializer<SocketChannel>{ //对chanel进行初始化 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //通过socketChannel去获得对应的管道 ChannelPipeline channelPipeline = socketChannel.pipeline(); /** * pipeline中会有很多handler类(也称之拦截器类) * 获得pipeline之后,可以直接.add,添加不管是自己开发的handler还是netty提供的handler * */ //一般来讲添加到last即可,添加了一个handler并且取名为HttpServerCodec //当请求到达服务端,要做解码,响应到客户端做编码 channelPipeline.addLast("HttpServerCodec", new HttpServerCodec()); //添加自定义的CustomHandler这个handler,返回Hello Netty channelPipeline.addLast("customHandler", new CustomHandler()); } }
4、建立自定义处理器CustomHandler
package com.zhouzhiyao.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.util.CharsetUtil; /** * 创建自定义助手类 * @author phubing * */ //处理的请求是:客户端向服务端发起送数据,先把数据放在缓冲区,服务器端再从缓冲区读取,类似于[ 入栈, 入境 ] public class CustomHandler extends SimpleChannelInboundHandler<HttpObject>{//Http请求,所以使用HttpObject @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelRegistered(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel注册"); super.channelUnregistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel活跃状态"); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端与服务端断开连接之后"); super.channelInactive(ctx); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel读取数据完毕"); super.channelReadComplete(ctx); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("用户事件触发"); super.userEventTriggered(ctx, evt); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { System.out.println("channel可写事件更改"); super.channelWritabilityChanged(ctx); } @Override //channel发生异常,若不关闭,随着异常channel的逐渐增多,性能也就随之下降 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("捕获channel异常"); super.exceptionCaught(ctx, cause); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类添加"); super.handlerAdded(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("助手类移除"); super.handlerRemoved(ctx); } /** * ChannelHandlerContext:上下文对象 * * */ @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //获取当前channel Channel currentChannel = ctx.channel(); //判断msg是否为一个HttpRequest的请求类型 if(msg instanceof HttpRequest) { //客户端远程地址 System.out.println(currentChannel.remoteAddress()); /** * * 未加判断类型,控制台打印的远端地址如下: * /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5501 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5502 /0:0:0:0:0:0:0:1:5503 /0:0:0:0:0:0:0:1:5503 * * 原因是接收的MSG没有进行类型判断 * * * 增加了判断,为何还会打印两次? * /0:0:0:0:0:0:0:1:5605 /0:0:0:0:0:0:0:1:5605 * * 打开浏览器的network会发现,客户端对服务端进行了两次请求: * 1、第一次是所需的 * 2、第二次是一个icon * 因为没有加路由(相当于Springmvc中的requestMapping),只要发起请求,就都会到handler中去 * */ /** * 在Linux中也可以通过CURL 本机Ip:端口号 发送请求(只打印一次,干净的请求) */ //定义发送的消息(不是直接发送,而是要把数据拷贝到缓冲区,通过缓冲区) //Unpooed:是一个专门用于拷贝Buffer的深拷贝,可以有一个或多个 //CharsetUtil.UTF_8:Netty提供 ByteBuf content = Unpooled.copiedBuffer("Hello Netty", CharsetUtil.UTF_8); //构建一个HttpResponse,响应客户端 FullHttpResponse response = /** * params1:针对Http的版本号 * params2:状态(响应成功或失败) * params3:内容 */ //HttpVersion.HTTP_1_1:默认开启keep-alive new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); //设置当前内容长度、类型等 response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); //readableBytes:可读长度 response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //通过长下文对象,把响应刷到客户端 ctx.writeAndFlush(response); } } }
5、客户端
public class NettyClient extends SimpleChannelInboundHandler<Response> { private final String ip; private final int port; private Response response; public NettyClient(String ip, int port) { this.ip = ip; this.port = port; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception { this.response = response; } public Response client(Request request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { // 创建并初始化 Netty 客户端 Bootstrap 对象 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new RpcDecoder(Response.class)); pipeline.addLast(new RpcEncoder(Request.class)); pipeline.addLast(NettyClient.this); } }); bootstrap.option(ChannelOption.TCP_NODELAY, true); // String[] discover = new Discover().discover("/yanzhenyidai/com.yanzhenyidai.server").split(":"); // 连接 RPC 服务器 ChannelFuture future = bootstrap.connect(ip, port).sync(); // 写入 RPC 请求数据并关闭连接 Channel channel = future.channel(); channel.writeAndFlush(request).sync(); channel.closeFuture().sync(); return response; } finally { group.shutdownGracefully(); } }
6、心跳机制
服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法
ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new HeartBeatServerHandler()); } });
- 自定义处理类Handler继承ChannlInboundHandlerAdapter,实现其userEventTriggered()方法,在出现超时事件时会被触发,包括读空闲超时或者写空闲超时;
class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { private int lossConnectCount = 0; @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("已经5秒未收到客户端的消息了!"); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.READER_IDLE){ lossConnectCount++; if (lossConnectCount>2){ System.out.println("关闭这个不活跃通道!"); ctx.channel().close(); } } }else { super.userEventTriggered(ctx,evt); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lossConnectCount = 0; System.out.println("client says: "+msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;
设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;
Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS)); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new HeartBeatClientHandler()); } });
- 自定义处理类Handler继承ChannlInboundHandlerAdapter,实现自定义userEventTrigger()方法,如果出现超时时间就会被触发,包括读空闲超时或者写空闲超时;
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { System.out.println("客户端循环心跳监测发送: "+new Date()); if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.WRITER_IDLE){ if (curTime<beatTime){ curTime++; ctx.writeAndFlush("biubiu"); } } } }
7、客户端重连
/** * 连接服务端 and 重连 */ protected void doConnect() { if (channel != null && channel.isActive()){ return; } ChannelFuture connect = bootstrap.connect("127.0.0.1", 8081); //实现监听通道连接的方法 connect.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()){ channel = channelFuture.channel(); System.out.println("连接服务端成功"); }else{ System.out.println("每隔2s重连...."); channelFuture.channel().eventLoop().schedule(new Runnable() { @Override public void run() { doConnect(); } },2,TimeUnit.SECONDS); } } }); }