任務隊列中的Task有3種典型使用場景
-
用戶程序自定義的普通任務
-
NettyServerHandler代碼有改動:
package com.ronnie.netty.sample; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.util.CharsetUtil; /** * 1. 自定義一個Handler需要繼承 netty 規定好的某個 HandlerAdapter(適配器模式) * 2. 這時我們自定義一個Handler, 才能稱為一個handler */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 讀取數據事件(這里我們可以讀取客戶端發送的消息) * ChannelHandlerContext ctx: 上下文對象, 含有管道 pipeline, 通道 channel, 地址 address * Object msg: 就是客戶端發送的數據 默認Object * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { /* 比如這里我們有一個非常耗時的業務 -> 異步執行 -> 提交該channel對應的 NioEventLoop 的 taskQueue中 */ // 解決方案1: 用戶程序自定義的普通任務 ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }); ctx.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yang ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }); // Thread.sleep(10 * 1000); // ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8)); System.out.println("go on ..."); // System.out.println("The server is reading thread: " + Thread.currentThread().getName()); // System.out.println("server ctx = " + ctx); // System.out.println("Check the relationship between channel and pipeline"); // Channel channel = ctx.channel(); // ChannelPipeline pipeline = ctx.pipeline(); // 本質是一個雙向鏈表, 涉及到出棧入棧問題 // // 將 msg轉成一個 ByteBuf(是netty提供的, 不是NIO的 ByteBuffer, 性能更高) // ByteBuf buf = (ByteBuf) msg; // System.out.println("The message that client send: " + buf.toString(CharsetUtil.UTF_8)); // System.out.println("The address of client: " + ctx.channel().remoteAddress()); } /** * 數據讀取完畢 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // write + flush, 將數據寫入到緩沖並刷新 // 一般來說, 我們對發送的數據進行編碼 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, dear client, Kappa", CharsetUtil.UTF_8)); } /** * 處理異常, 一般需要關閉通道 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
-
打上斷點,debug啟動
-
左鍵點擊ctx
-
pipeline -> channel -> eventLoop -> taskQueue
-
可以看到兩個線程任務存入了任務隊列中
-
-
-
用戶自定義定時任務
-
在NettyServerHandler中之前添加的任務線程代碼之下, 打印go on之前添加以下代碼:
// 用戶自定義定時任務 -> 該任務是提交到 scheduleTaskQueue中的 ctx.channel().eventLoop().schedule(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yyf ", CharsetUtil.UTF_8)); } catch (InterruptedException e) { System.out.println("Exception occurs: " + e.getMessage()); } } }, 5, TimeUnit.SECONDS);
-
打上斷點, debug啟動
-
左鍵點擊ctx
-
pipeline -> channel -> eventLoop -> taskQueue
-
你會發現taskQueue中只有2個線程任務
-
我們剛剛寫的那個任務在scheduledTaskQueue中(pipeline -> channel -> eventLoop -> scheduledTaskQueue)
-
-
-
-
非當前Reactor 線程調用Channel的各種方法
- 例如在推送系統的業務線程中, 根據用戶的標識, 找到對應的Channel引用, 然后調用 Write 類方法向該用戶推送消息, 就會進入到這種場景。最終的Write會提交到任務隊列中后被異步消費。
Netty 方案再說明
- Netty 抽象出兩組線程池, BossGroup 專門負責接收客戶端連接, WorkerGroup 專門負責網絡讀寫操作。
- NioEventLoop表示一個不斷循環執行處理任務的線程, 每個 NioEventLoop都有一個selector, 用於監聽綁定在其上的socket網絡通道。
- NioEventLoop內部采用串行化設計, 從消息的讀取 -> 解碼 -> 處理 -> 編碼 -> 發送, 始終由 IO 線程 NioEventLoop 負責
- NioEventLoopGroup下包含多個NioEventLoop
- 每個NioEventLoop 中包含有一個Selector, 一個 taskQueue
- 每個NioEventLoop 中的 Selector 上可以注冊監聽多個 NioChannel
- 每個NioChannel 只會綁定在唯一的NioEventLoop上
- 每個NioChannel 都綁定有一個自己的 ChannelPipline