1、netty 是什么
2、netty 架构设计
2.1、线程模型
2.2、传统阻塞 I/O 服务模型
2.3、Reactor 模式
2.4、单 Reactor 单线程
2.5、单 Reactor 多线程
2.6、主从 Reactor 多线程
2.7、Netty工作原理架构图
3、Netty 编程之 helloworld
4、自定义 ChannelInboundHandlerAdapter 收发消息
5、任务队列 taskQueue 和 scheduledTaskQueue
6、Netty 异步模型
7、Netty 入门案例--HTTP 服务
1、netty 是什么 <--返回目录
Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
下面是我总结的使用 Netty 不使用 JDK 原生 NIO 的原因
- 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂,一不小心 bug 横飞
- Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty 可以直接从 NIO 模型变身为 IO 模型
- Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
- Netty 解决了 JDK 的很多包括空轮询在内的 bug
- Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
- 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
- Netty 社区活跃,遇到问题随时邮件列表或者 issue
- Netty 已经历各大 rpc 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
2、netty 架构设计 <--返回目录
不同的线程模式,对程序的性能有很大影响,为了搞清 netty 线程模式,我们来系统分析下各个线程模式,最后看看 netty 线程模型有什么优越性。
2.1、线程模型 <--返回目录
目前存在的线程模型有:
- 传统阻塞 I/O 服务模型
- Reactor 模式(反应器模式、分发者模式 Dispatcher、通知者模式 Notifier)
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
netty 线程模式:netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模式有多个 Reactor
2.2、传统阻塞 I/O 服务模型 <--返回目录
模型特点:
- 采用阻塞 IO 模式 获取输入的数据
- 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回
问题分析:
- 当并发数很大,就会创建大量的线程,占用很大系统资源
- 连接创建后,如果当前线程暂时没有数据可读,该线程回阻塞在 read 操作,造成线程资源的浪费
2.3、Reactor 模式 <--返回目录
针对传统阻塞 IO 服务模型的 2 个缺点,解决方案:
- 基于 IO 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
- 基于线程池复用线程资源:不必为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
IO 复用结合线程池,就是 Reactor 模式基本设计思想。
2.4、单 Reactor 单线程 <--返回目录
方案说明:
- select 是前面 IO 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求
- Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
- 如果是建立连接请求事件,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理连接完成后的后续业务处理
- 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 handler 来响应
- handler 会完成 read -> 业务处理 -> send 的完整业务流程
服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接、读写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。
2.5、单 Reactor 多线程 <--返回目录
方案说明:
- Reactor 对象通过 select 监控客户端请求事件,收到事件后,通过 dispatch 进行分发
- 如果建立连接请求,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理完成连接后的各种事件
- 如果不是连接请求,则由 reactor 分发调用连接对象对应的 handler 来处理
- handler 之负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
- worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
- handler 收到响应后,通过 send 将结果返回给 cliet
缺点:reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。
2.6、主从 Reactor 多线程 <--返回目录
主 Reactor 负责连接事件;子 Reactor 负责监听读写事件
2.7、Netty工作原理架构图 <--返回目录
1)Netty 抽象出两组线程池,BossGroup 专门负责接受客户端的连接,WorkerGroup 专门负责网络的读写
2)BossGroup 和 WorkGroup 类型都是 NioEventLoopGroup
3)NioEventLoopGroup 相当于一个事件循环组,这个组含有多个事件循环,每个事件循环时 NioEventLoop
4)NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
5)NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
6)每个 Boss NioEventLoop 循环执行的步骤
- 轮询 accept 事件
- 处理 accept 事件,与 client 建立连接,生成 NiobiumSocketChannel,并将其注册到 worker NioEveltLoop 上的 Selector
- 处理任务队列的任务,即 runAllTasks
7) 每个 Worker NioEventLoop 循环执行的步骤
- 轮询 read/write 事件
- 处理 read/write 事件,在对应 NioSocketChannel 处理
- 处理任务队列的任务,即 runAllTasks
3、Netty 编程之 helloworld <--返回目录
pom 引入依赖

<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.22.Final</version> </dependency>
NettyServer
package com.oy; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; public class NettyServer { public static void main(String[] args) throws Exception { // 1.创建 BossGroup 和 workerGroup NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); // 2.创建服务器端的启动对象 ServerBootstrap serverBootstrap = new ServerBootstrap(); // 3.链式编程,配置参数 serverBootstrap .group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持获得连接状态 .childHandler(new ChannelInitializer<NioSocketChannel>() { // 给 WorkerGroup 的 EventLoop 对应的管道设置处理器 protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }); // 4.绑定端口,运行服务器 ChannelFuture future = serverBootstrap.bind(8000).sync(); System.out.println("server started and listen " + 8000); // 5.对关闭通道进行监听 future.channel().closeFuture().sync(); } }
NettyClient
package com.oy; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringEncoder; import java.util.Date; public class NettyClient { public static void main(String[] args) throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); NioEventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }); Channel channel = bootstrap.connect("127.0.0.1", 8000).channel(); while (true) { channel.writeAndFlush(new Date() + ": hello world11111111!"); Thread.sleep(2000); } } }
4、自定义 ChannelInboundHandlerAdapter 收发消息 <--返回目录
NettyServer
package com.oy; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; public class NettyServer { private int port; public static void main(String[] args) { new NettyServer(8080).start(); } public NettyServer(int port) { this.port = port; } public void start() { /** * 创建两个EventLoopGroup,即两个线程池,boss线程池用于接收客户端的连接,一个线程监听一个端口,一般只会监听一个端口所以只需一个线程 * work池用于处理网络连接数据读写或者后续的业务处理(可指定另外的线程处理业务,work完成数据读写) */ EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { /** * 实例化一个服务端启动类, * group()指定线程组 * channel()指定用于接收客户端连接的类,对应java.nio.ServerSocketChannel * childHandler()设置编码解码及处理连接的类 */ ServerBootstrap server = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //.addLast("decoder", new StringDecoder()) //.addLast("encoder", new StringEncoder()) .addLast(new NettyServerHandler()); } }); // 绑定端口 ChannelFuture future = server.bind().sync(); System.out.println("server started and listen " + port); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } public static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldServerHandler active"); } /** * 读取客户端发送的数据 * ChannelHandlerContext ctx: 上下文对象,含有管道 pipeline,通道 channel,连接地址 * Object msg: 客户端发送的数据 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead..."); // 读取客户端发送的数据 ByteBuf buf = (ByteBuf) msg; System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8)); //System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString()); // 返回消息 //ctx.write("server write, 收到消息" + msg); //ctx.flush(); } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8)); } /** * 处理异常,关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } } }
NettyClient
package com.oy; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class NettyClient { private static final String HOST = "127.0.0.1"; private static final int PORT= 8080; public static void main(String[] args){ new NettyClient().start(HOST, PORT); } public void start(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() //.addLast("decoder", new StringDecoder()) //.addLast("encoder", new StringEncoder()) .addLast(new NettyClientHandler()); } }); ChannelFuture future = client.connect(host, port).sync(); //future.channel().writeAndFlush("Hello Netty Server ,I am a netty client"); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 通道就绪触发该方法 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldClientHandler Active"); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", CharsetUtil.UTF_8)); } /** * 当通道有读取事件时触发该方法 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("收到服务器响应: " + buf.toString(CharsetUtil.UTF_8)); } } }
5、任务队列 taskQueue 和 scheduledTaskQueue <--返回目录
任务队列中的 Task 有三种典型使用场景
- 用户程序自定义的普通任务
- 用户自定义定时任务
- 非当前 Reactor 线程调用 Channel 的各种方法。例如在推送系统的业务线程里,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中被异步消费
public static class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("HelloWorldServerHandler active"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead..."); // 读取客户端发送的数据 ByteBuf buf = (ByteBuf) msg; System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8)); // 模拟业务处理耗时 //Thread.sleep(5 * 1000); //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端1\n", CharsetUtil.UTF_8)); // 用户自定义的任务,任务添加到 taskQueue 中 ctx.channel().eventLoop().execute(new Runnable() { public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端1\n", CharsetUtil.UTF_8)); } catch (Exception e) { e.printStackTrace(); } } }); // 用户自定义定义任务, 任务添加到 scheduledTaskQueue 中 ctx.channel().eventLoop().schedule(new Runnable() { public void run() { try { Thread.sleep(5 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端 shedule\n", CharsetUtil.UTF_8)); } catch (Exception e) { e.printStackTrace(); } } },5 , TimeUnit.SECONDS); } /** * 数据读取完毕 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端2", CharsetUtil.UTF_8)); } /** * 处理异常,关闭通道 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
6、Netty 异步模型 <--返回目录
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 IO 操作时异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
Netty 的异步模型是建立在 Future 和 callback 之上的,callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适,那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 取监控方法 fun 的处理过程(即 Future-Listener 机制)。
// 绑定端口 final ChannelFuture future = server.bind(8080).sync(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (future.isDone()) { System.out.println("监听端口 8080 成功"); } else { System.out.println("监听端口 8080 失败"); } } });
7、Netty 入门案例--HTTP 服务 <--返回目录
HttpServer

package com.oy.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class HttpServer { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap server = new ServerBootstrap() .group(boss, work) .channel(NioServerSocketChannel.class) //.localAddress(new InetSocketAddress(port)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new MyChannelInitializer()); // 绑定端口 ChannelFuture future = server.bind(8080).sync(); System.out.println("server started and listen " + 8080); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
MyChannelInitializer

package com.oy.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { /* 向管道加入处理器 */ ChannelPipeline pipeline = socketChannel.pipeline(); // HttpServerCodec: netty 提供的处理 http 的编-解码器 pipeline.addLast("MyHttpServerCodec", new HttpServerCodec()); // 添加自定义的处理器 pipeline.addLast("MyHttpServerHandler", new MyHttpServerHandler()); } }
MyHttpServerHandler

package com.oy.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { System.out.println("==================================="); System.out.println("msg 类型: " + msg.getClass().getName()); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); // 判断 msg 是否是 http request 请求 if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; URI uri = new URI(request.uri()); System.out.println("请求 uri: " + uri.getPath()); if ("/favicon.ico".equals(uri.getPath())) { System.out.println("请求 favicon.icon,不做响应"); return; } // 回复信息给浏览器 ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8); // 构造一个 http 的响应,即 http response DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); ctx.writeAndFlush(response); } } }
参考:
1)netty 官网:https://netty.io/
3)掘金小册:Netty 入门与实战:仿写微信 IM 即时通讯系统