Netty 中隊列的使用


任務隊列中的Task有3種典型使用場景

  1. 用戶程序自定義的普通任務

    • 此前代碼: 參考https://www.cnblogs.com/ronnieyuan/p/12016712.html

    • NettyServerHandler代碼有改動:

      package com.ronnie.netty.sample;
      
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.Channel;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      import io.netty.channel.ChannelPipeline;
      import io.netty.util.CharsetUtil;
      
      /**
       *  1. 自定義一個Handler需要繼承 netty 規定好的某個 HandlerAdapter(適配器模式)
       *  2. 這時我們自定義一個Handler, 才能稱為一個handler
       */
      public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      
          /**
           *  讀取數據事件(這里我們可以讀取客戶端發送的消息)
           *  ChannelHandlerContext ctx: 上下文對象, 含有管道 pipeline, 通道 channel, 地址 address
           *  Object msg: 就是客戶端發送的數據 默認Object
           * @param ctx
           * @param msg
           * @throws Exception
           */
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      
              /* 比如這里我們有一個非常耗時的業務 -> 異步執行 -> 提交該channel對應的
                 NioEventLoop 的 taskQueue中
               */
      
                // 解決方案1: 用戶程序自定義的普通任務
                ctx.channel().eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            Thread.sleep(10 * 1000);
                            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8));
                        } catch (InterruptedException e) {
                            System.out.println("Exception occurs: " + e.getMessage());
                        }
                    }
                });
              ctx.channel().eventLoop().execute(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          Thread.sleep(20 * 1000);
                          ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yang ", CharsetUtil.UTF_8));
                      } catch (InterruptedException e) {
                          System.out.println("Exception occurs: " + e.getMessage());
                      }
                  }
              });
      
      //          Thread.sleep(10 * 1000);
      //          ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, atme ", CharsetUtil.UTF_8));
      
                System.out.println("go on ...");
      //        System.out.println("The server is reading thread: " + Thread.currentThread().getName());
      //        System.out.println("server ctx = " + ctx);
      //        System.out.println("Check the relationship between channel and pipeline");
      //        Channel channel = ctx.channel();
      //        ChannelPipeline pipeline = ctx.pipeline(); // 本質是一個雙向鏈表, 涉及到出棧入棧問題
      //        // 將 msg轉成一個 ByteBuf(是netty提供的, 不是NIO的 ByteBuffer, 性能更高)
      //        ByteBuf buf = (ByteBuf) msg;
      //        System.out.println("The message that client send: " + buf.toString(CharsetUtil.UTF_8));
      //        System.out.println("The address of client: " + ctx.channel().remoteAddress());
          }
      
          /**
           *  數據讀取完畢
           * @param ctx
           * @throws Exception
           */
          @Override
          public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
      
              // write + flush, 將數據寫入到緩沖並刷新
              // 一般來說, 我們對發送的數據進行編碼
              ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, dear client, Kappa", CharsetUtil.UTF_8));
      
          }
      
          /**
           *  處理異常, 一般需要關閉通道
           * @param ctx
           * @param cause
           * @throws Exception
           */
          @Override
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
              ctx.close();
          }
      }
      
      
    • 打上斷點,debug啟動

      • 左鍵點擊ctx

        1575963937101

        • pipeline -> channel -> eventLoop -> taskQueue

          1575964061975

        • 可以看到兩個線程任務存入了任務隊列中

  2. 用戶自定義定時任務

    • 在NettyServerHandler中之前添加的任務線程代碼之下, 打印go on之前添加以下代碼:

              // 用戶自定義定時任務 -> 該任務是提交到 scheduleTaskQueue中的
              ctx.channel().eventLoop().schedule(new Runnable() {
                  @Override
                  public void run() {
                      try {
                          Thread.sleep(20 * 1000);
                          ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, client, yyf ", CharsetUtil.UTF_8));
                      } catch (InterruptedException e) {
                          System.out.println("Exception occurs: " + e.getMessage());
                      }
                  }
              }, 5, TimeUnit.SECONDS);
      
    • 打上斷點, debug啟動

      • 左鍵點擊ctx

        • pipeline -> channel -> eventLoop -> taskQueue

        • 你會發現taskQueue中只有2個線程任務

        • 我們剛剛寫的那個任務在scheduledTaskQueue中(pipeline -> channel -> eventLoop -> scheduledTaskQueue)

          1575965083834

  3. 非當前Reactor 線程調用Channel的各種方法

    • 例如在推送系統的業務線程中, 根據用戶的標識, 找到對應的Channel引用, 然后調用 Write 類方法向該用戶推送消息, 就會進入到這種場景。最終的Write會提交到任務隊列中后被異步消費。

Netty 方案再說明

  1. Netty 抽象出兩組線程池, BossGroup 專門負責接收客戶端連接, WorkerGroup 專門負責網絡讀寫操作。
  2. NioEventLoop表示一個不斷循環執行處理任務的線程, 每個 NioEventLoop都有一個selector, 用於監聽綁定在其上的socket網絡通道。
  3. NioEventLoop內部采用串行化設計, 從消息的讀取 -> 解碼 -> 處理 -> 編碼 -> 發送, 始終由 IO 線程 NioEventLoop 負責
    • NioEventLoopGroup下包含多個NioEventLoop
    • 每個NioEventLoop 中包含有一個Selector, 一個 taskQueue
    • 每個NioEventLoop 中的 Selector 上可以注冊監聽多個 NioChannel
    • 每個NioChannel 只會綁定在唯一的NioEventLoop上
    • 每個NioChannel 都綁定有一個自己的 ChannelPipline


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM