Netty是什么:
- 異步事件驅動框架,用於快速開發高i性能服務端和客戶端
- 封裝了JDK底層BIO和NIO模型,提供高度可用的API
- 自帶編碼解碼器解決拆包粘包問題,用戶只用關心業務邏輯
- 精心設計的Reactor線程模型支持高並發海量連接
- 自帶協議棧,無需用戶關心
Netty具有如下特性:
- 設計:統一的API,支持多種傳輸類型,阻塞和非阻塞的,簡單而強大的線程模型,真正的無連接數據報套接字支持,鏈接邏輯組件以支持復用。
- 易於使用:詳實的 Javadoc 和大量的示例集不需要超過JdK 1.6+的依賴。
- 性能:擁有比 Java 的核心 API 更高的吞吐量以及更低的延遲,得益於池化和復用,擁有更低的資源消耗,最少的內存復制。
- 健壯性:不會因為慢速、快速或者超載的連接而導致 OutOfMemoryError ,消除在高速網絡中 NIO 應用程序常見的不公平讀/寫比率。
- 安全性:完整的 SSL/TLS 以及 StartTLs 支持,可用於受限環境下,如 Applet 和 OSGI。
- 社區驅動:發布快速而且頻繁。
Netty核心組件:
為了后期更好地理解和進一步深入 Netty,有必要總體認識一下 Netty 所用到的核心組件以及他們在整個 Netty 架構中是如何協調工作的。Nettty 有如下幾個核心組件:
- Bootstrap 和 ServerBootstrap
- Channel
- ChannelHandler
- ChannelPipeline
- EventLoop
- ChannelFuture
1.Bootstrap或者ServerBootstrap,一個Netty應用通常由一個Bootstrap開始,它主要作用是配置整個Netty程序,串聯起各個組件。
2.Channel:Channel 是 Netty 網絡操作抽象類,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,還包括了 Netty 框架相關的一些功能,如獲取該 Channel的 EventLoop。在傳統的網絡編程中,作為核心類的 Socket ,它對程序員來說並不是那么友好,直接使用其成本還是稍微高了點。而Netty 的 Channel 則提供的一系列的 API :它大大降低了直接與 Socket 進行操作的復雜性。而相對於原生 NIO 的 Channel,Netty 的 Channel 具有如下優勢:
-
在 Channel 接口層,采用 Facade 模式進行統一封裝,將網絡 I/O 操作、網絡 I/O 相關聯的其他操作封裝起來,統一對外提供。
-
Channel 接口的定義盡量大而全,為 SocketChannel 和 ServerSocketChannel 提供統一的視圖,由不同子類實現不同的功能,公共功能在抽象父類中實現,最大程度地實現功能和接口的重用。
-
具體實現采用聚合而非包含的方式,將相關的功能類聚合在 Channel 中,有 Channel 統一負責和調度,功能實現更加靈活。
Channel 與 socket 的關系:
在 Netty 中 Channel 有兩種,對應客戶端套接字通道NioSocketChannel,內部管理java.nio.channels.SocketChannel 套接字,對應服務器端監聽套接字通道NioServerSocketChannel,其內部管理自己的 java.nio.channels.ServerSocketChannel 套接字。也就是 Channel 是對 socket 的裝飾或者門面,其封裝了對socket 的原子操作。
3.ChannelHandler:ChannelHandler 為 Netty 中最核心的組件,它充當了所有處理入站和出站數據的應用程序邏輯的容器。ChannelHandler 主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數據接收、異常、數據轉換等。ChannelHandler 有兩個核心子類 ChannelInboundHandler 和 ChannelOutboundHandler,其中ChannelInboundHandler 用於接收、處理入站數據和事件,而 ChannelOutboundHandler 則相反。
4.ChannelPipeline:ChannelPipeline 為 ChannelHandler 鏈提供了一個容器並定義了用於沿着鏈傳播入站和出站事件流的 API。一個數據或者事件可能會被多個 Handler 處理,在這個過程中,數據或者事件經流 ChannelPipeline,由 ChannelHandler 處理。在這個處理過程中,一個 ChannelHandler 接收數據后處理完成后交給下一個 ChannelHandler,或者什么都不做直接交給下一個 ChannelHandler。

當一個數據流進入 ChannlePipeline 時,它會從 ChannelPipeline 頭部開始傳給第一個 ChannelInboundHandler ,當第一個處理完后再傳給下一個,一直傳遞到管道的尾部。與之相對應的是,當數據被寫出時,它會從管道的尾部開始,先經過管道尾部的 “最后” 一個ChannelOutboundHandler,當它處理完成后會傳遞給前一個ChannelOutboundHandler 。當 ChannelHandler 被添加到 ChannelPipeline 時,它將會被分配一個 ChannelHandlerContext,它代表了 ChannelHandler 和 ChannelPipeline之間的綁定。其中 ChannelHandler 添加到 ChannelPipeline 過程如下:
-
一個 ChannelInitializer 的實現被注冊到了 ServerBootStrap中
-
當 ChannelInitializer.initChannel() 方法被調用時,ChannelInitializer 將在 ChannelPipeline 中安裝一組自定義的 ChannelHandler
-
ChannelInitializer 將它自己從 ChannelPipeline 中移除
5.EventLoop:Netty 基於事件驅動模型,使用不同的事件來通知我們狀態的改變或者操作狀態的改變。它定義了在整個連接的生命周期里當有事件發生的時候處理的核心抽象。Channel 為Netty 網絡操作抽象類,EventLoop 主要是為Channel 處理 I/O 操作,兩者配合參與 I/O 操作。下圖是Channel、EventLoop、Thread、EventLoopGroup之間的關系(摘自《Netty In Action》):

- 一個 EventLoopGroup(Boos線程池,work線程池的分組概念) 包含一個或多個 EventLoop。
- 一個 EventLoop 在它的生命周期內只能與一個Thread綁定。
- 所有有 EnventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理。
- 一個 Channel 在它的生命周期內只能注冊與一個 EventLoop。
- 一個 EventLoop 可被分配至一個或多個 Channel 。
當一個連接到達時,Netty 就會注冊一個 Channel,然后從 EventLoopGroup 中分配一個 EventLoop 綁定到這個Channel上,在該Channel的整個生命周期中都是有這個綁定的 EventLoop 來服務的。
6.ChannelFuture:Netty 為異步非阻塞,即所有的 I/O 操作都為異步的,因此,我們不能立刻得知消息是否已經被處理了。Netty 提供了 ChannelFuture 接口,通過該接口的 addListener() 方法注冊一個 ChannelFutureListener,當操作執行成功或者失敗時,監聽就會自動觸發返回結果。
通過了解相應的組件,接下去先簡單看一下Netty的基本使用,同樣的市服務端與客戶端的交互。
public class NettyServer { private static final String IP = "127.0.0.1"; private static final int port = 6666; private static final int BIZGROUPSIZE = Runtime.getRuntime().availableProcessors() * 2; private static final int BIZTHREADSIZE = 100; //創建兩個EventLoopGroup對象,創建boss線程組 ⽤於服務端接受客戶端的連接 private static final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZGROUPSIZE); //創建 worker 線程組 ⽤於進⾏ SocketChannel 的數據讀寫 private static final EventLoopGroup workGroup = new NioEventLoopGroup(BIZTHREADSIZE); public static void start() throws Exception { //啟動類初始化 ServerBootstrap serverBootstrap = initServerBootstrap(); // 綁定端⼝,並同步等待成功,即啟動服務端 ChannelFuture channelFuture = serverBootstrap.bind(IP, port).sync(); //成功綁定到端口之后,給channel增加一個 管道關閉的監聽器並同步阻塞,直到channel關閉,線程才會往下執行,結束進程。 channelFuture.channel().closeFuture().sync(); System.out.println("server start"); } private static ServerBootstrap initServerBootstrap() { //一個Netty應用通常由一個Bootstrap開始 ServerBootstrap serverBootstrap = new ServerBootstrap(); //添加兩個組,設置使⽤的EventLoopGroup serverBootstrap.group(bossGroup,workGroup) //初始化 channel,設置要被實例化的為 NioServerSocketChannel 類 .channel(NioServerSocketChannel.class) //初始化channelHandler,設置連⼊服務端的 Client 的 SocketChannel 的處理器 .childHandler(new ChannelInitializer<Channel>() { //我們再來設置下相應的過濾條件。 這⾥需要繼承Netty中ChannelInitializer 類, //然后重寫 initChannel 該⽅法,進⾏添加相應的設置,傳輸協議設置,以及相應的業務實現類 @Override protected void initChannel(Channel ch) throws Exception { //配置pipeline相關屬性 ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4)); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); // 相關處理 Handler pipeline.addLast(new TcpServerHandler()); } }); return serverBootstrap; } protected static void shutdown(){ workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } public static void main(String[] args) throws Exception { System.out.println("啟動Server..."); NettyServer.start(); } }
服務相關的設置的代碼寫完之后,我們再來編寫主要的業務代碼。 使⽤Netty編寫 [業務層 ]的代碼,我們需要繼承 ChannelInboundHandlerAdapter 或 SimpleChannelInboundHandler 類,在這⾥說下它們兩的區別吧。
繼承 SimpleChannelInboundHandler 類之后,會在接收到數據后會⾃動 release 掉數據占⽤的 Bytebuffer 資源。並且繼承該類需要指定數據格式。
⽽繼承ChannelInboundHandlerAdapter 則不會⾃動釋放,需要⼿動調⽤ReferenceCountUtil.release() 等⽅法進⾏釋放。繼承該類不需要指定數據格式。 所以在這⾥,個⼈推薦服務端繼承 ChannelInboundHandlerAdapter ,⼿動進⾏釋放,防⽌數據未處理完就⾃動釋放了。⽽且服務端可能有多個客戶端進⾏連接,並且每⼀個客戶端請求的數據格式都不⼀致,這時便可以進⾏相應的處理。
客戶端根據情況可以繼承 SimpleChannelInboundHandler 類。好處是直接指定好傳輸的數據格式,就不需要再進⾏格式的轉換了。
TcpServerHandler :
public class TcpServerHandler extends ChannelInboundHandlerAdapter { //建⽴連接時,發送⼀條慶祝消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("chanelActive>>>>>>>"); } //業務邏輯處理 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server receive message:" + msg); ctx.channel().writeAndFlush("accept message "+ msg); ctx.close(); } //異常相關處理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("get server exception :"+cause.getMessage()); } }
客戶端:客戶端過濾其這塊基本和服務端⼀致。不過需要注意的是,傳輸協議、編碼和解碼應該⼀致.
public class NettyClient implements Runnable { @Override public void run() { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group); bootstrap.channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("handler", new MyClient()); } }); for (int i=0;i<10;i++){ ChannelFuture f = bootstrap.connect("127.0.0.1",6666).sync(); f.channel().writeAndFlush("hello service !" + Thread.currentThread().getName()+ ":---->"+i); f.channel().closeFuture().sync(); } }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } public static void main(String[] args) { for (int i = 0;i < 3 ;i++ ){ new Thread(new NettyClient(),">>> this thread "+i).start(); } } }
MyClient :這⾥有個注解, 該注解 Sharable 主要是為了多個handler可以被多個channel安全地共享,也就是保證線程安全。
public class MyClient extends ChannelInboundHandlerAdapter { //@Sharable @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("client receieve message: "+msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("get client exception :"+cause.getMessage()); } }
啟動服務器,客戶端即可看到演示效果。
Netty線程模型:
了解了Netty 基礎服務的構建,我們對Netty服務有了一定的認識,最后來一張線程模型圖(Reactor主從多線程模型):
Netty實現簡易版Tomcat:
之前我們通過手寫springMvc,實現了自己的Mvc的調用流程,其中最本質的東西是通過Servlert,將我們對應的Controller對應的請求路徑及controller給映射緩存起來,Tomcat我們稱之為Servlet容器,所以我們將自己實現的Servlet交由其管理是理所當然的,既然現在我們自己有映射關系,同時現在也有了Netty這么強大的通信框架,也了解了他的基本使用,那么我們如何將其與我們的程序關聯起來,實現自己 容器呢?
在手寫之前,我們需要明白的是在這個過程中非常重要的幾個對象,Servlet是必不可少的,Request,Response,另外一個就是我們的容器本身,我們按照我們的思路,就是通過Netty對外暴露一個端口,同時在啟動的時候初始化映射關系,在有請求進來的時候調用對應的Servlet進行業務處理,最后進行響應。
主類:
//Netty就是一個同時支持多協議的網絡通信框架 public class WuzzTomcat { //打開Tomcat源碼,全局搜索ServerSocket private int port = 8080; private Map<String, WuzzServlet> servletMapping = new HashMap<String, WuzzServlet>(); private Properties webxml = new Properties(); private void init() { //加載web.xml文件,同時初始化 ServletMapping對象 try { String WEB_INF = this.getClass().getResource("/").getPath(); FileInputStream fis = new FileInputStream(WEB_INF + "web.properties");
//加載配置文件 webxml.load(fis); for (Object k : webxml.keySet()) { String key = k.toString(); if (key.endsWith(".url")) { String servletName = key.replaceAll("\\.url$", ""); String url = webxml.getProperty(key); String className = webxml.getProperty(servletName + ".className"); WuzzServlet obj = (WuzzServlet) Class.forName(className).newInstance(); servletMapping.put(url, obj); } } } catch (Exception e) { e.printStackTrace(); } } public void start() { init(); //Netty封裝了NIO,Reactor模型,Boss,worker // Boss線程 EventLoopGroup bossGroup = new NioEventLoopGroup(); // Worker線程 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // Netty服務 //ServetBootstrap ServerSocketChannel ServerBootstrap server = new ServerBootstrap(); // 鏈路式編程 server.group(bossGroup, workerGroup) // 主線程處理類,看到這樣的寫法,底層就是用反射 .channel(NioServerSocketChannel.class) // 子線程處理類 , Handler .childHandler(new ChannelInitializer<SocketChannel>() { // 客戶端初始化處理 protected void initChannel(SocketChannel client) throws Exception { // 無鎖化串行編程 //Netty對HTTP協議的封裝,順序有要求 // HttpResponseEncoder 編碼器 client.pipeline().addLast(new HttpResponseEncoder()); // HttpRequestDecoder 解碼器 client.pipeline().addLast(new HttpRequestDecoder()); // 業務邏輯處理 client.pipeline().addLast(new WuzzTomcatHandler()); } }) // 針對主線程的配置 分配線程最大數量 128 .option(ChannelOption.SO_BACKLOG, 128) // 針對子線程的配置 保持長連接 .childOption(ChannelOption.SO_KEEPALIVE, true); // 啟動服務器 ChannelFuture f = server.bind(port).sync(); System.out.println("Wuzz Tomcat 已啟動,監聽的端口是:" + port); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 關閉線程池 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }public static void main(String[] args) { new WuzzTomcat().start(); } }
配置文件:web.properties主要是模仿web工程中webxml中對Servlet的映射關系的配置:
servlet.one.url=/firstServlet.do servlet.one.className=com.wuzz.demo.netty.tomcat.servlet.FirstServlet servlet.two.url=/secondServlet.do servlet.two.className=com.wuzz.demo.netty.tomcat.servlet.SecondServlet
所以我們這里需要定義自己的Servlet,這里主要定義自己一個抽象的Servlet類,采用模板方法的模式來編寫代碼。:
public abstract class WuzzServlet { public void service(WuzzRequest request, WuzzResponse response) throws Exception{ //由service方法來決定,是調用doGet或者調用doPost if("GET".equalsIgnoreCase(request.getMethod())){ doGet(request, response); }else{ doPost(request, response); } } public abstract void doGet(WuzzRequest request, WuzzResponse response) throws Exception; public abstract void doPost(WuzzRequest request, WuzzResponse response) throws Exception; }
public class FirstServlet extends WuzzServlet { @Override public void doGet(WuzzRequest request, WuzzResponse response) throws Exception { this.doPost(request, response); } @Override public void doPost(WuzzRequest request, WuzzResponse response) throws Exception { response.write("This is First Serlvet"); } }
public class SecondServlet extends WuzzServlet { @Override public void doGet(WuzzRequest request, WuzzResponse response) throws Exception { this.doPost(request, response); } @Override public void doPost(WuzzRequest request, WuzzResponse response) throws Exception { response.write("This is Second Serlvet"); } }
到目前為止,從初始化工作到接受請求的流程已經都可以了,那么現在就是處理這個請求的過程,那么這里需要定義Request ,Response.在Netty中進行響應的類是需要繼承 ChannelInboundHandlerAdapter 或 SimpleChannelInboundHandler 類,我們采用前者,那么我們就可以定義出這樣的兩個類:
Request:
public class WuzzRequest { private ChannelHandlerContext ctx; private HttpRequest req; public WuzzRequest(ChannelHandlerContext ctx, HttpRequest req) { this.ctx = ctx; this.req = req; } public String getUrl() { return req.uri(); } public String getMethod() { return req.method().name(); } }
Response:
public class WuzzResponse { //SocketChannel的封裝 private ChannelHandlerContext ctx; private HttpRequest req; public WuzzResponse(ChannelHandlerContext ctx, HttpRequest req) { this.ctx = ctx; this.req = req; } public void write(String out) throws Exception { try { if (out == null || out.length() == 0) { return; } // 設置 http協議及請求頭信息 FullHttpResponse response = new DefaultFullHttpResponse( // 設置http版本為1.1 HttpVersion.HTTP_1_1, // 設置響應狀態碼 HttpResponseStatus.OK, // 將輸出值寫出 編碼為UTF-8 Unpooled.wrappedBuffer(out.getBytes("UTF-8"))); response.headers().set("Content-Type", "text/html;"); // 當前是否支持長連接 // if (HttpUtil.isKeepAlive(r)) { // // 設置連接內容為長連接 // response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE); // } ctx.write(response); } finally { ctx.flush(); ctx.close(); } } }
最后我們需要定義自己的業務處理類,這里為了方便,我們直接在主類中新建一個內部類來處理:
//業務處理handler public class WuzzTomcatHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; // 轉交給我們自己的request實現 WuzzRequest request = new WuzzRequest(ctx, req); // 轉交給我們自己的response實現 WuzzResponse response = new WuzzResponse(ctx, req); // 實際業務處理 String url = request.getUrl(); if (servletMapping.containsKey(url)) { servletMapping.get(url).service(request, response); } else { response.write("404 - Not Found"); } } } //異常處理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }
這樣子就完成了我們整個容器的編寫,啟動容器,通過 http://localhost:8080/firstServlet.do 訪問可以看到拿到響應: