netty 詳解(一)架構設計、異步模型、任務隊列、入門案例


錄:

1、netty 是什么
2、netty 架構設計
    2.1、線程模型
    2.2、傳統阻塞 I/O 服務模型
    2.3、Reactor 模式
    2.4、單 Reactor 單線程
    2.5、單 Reactor 多線程
    2.6、主從 Reactor 多線程
    2.7、Netty工作原理架構圖
3、Netty 編程之 helloworld
4、自定義 ChannelInboundHandlerAdapter  收發消息
5、任務隊列 taskQueue 和 scheduledTaskQueue
6、Netty 異步模型
7、Netty 入門案例--HTTP 服務

1、netty 是什么    <--返回目錄

Netty 是一個異步事件驅動的網絡應用框架,用於快速開發可維護的高性能服務器和客戶端。

下面是我總結的使用 Netty 不使用 JDK 原生 NIO 的原因

  • 使用 JDK 自帶的 NIO 需要了解太多的概念,編程復雜,一不小心 bug 橫飛
  • Netty 底層 IO 模型隨意切換,而這一切只需要做微小的改動,改改參數,Netty 可以直接從 NIO 模型變身為 IO 模型
  • Netty 自帶的拆包解包,異常檢測等機制讓你從 NIO 的繁重細節中脫離出來,讓你只需要關心業務邏輯
  • Netty 解決了 JDK 的很多包括空輪詢在內的 bug
  • Netty 底層對線程,selector 做了很多細小的優化,精心設計的 reactor 線程模型做到非常高效的並發處理
  • 自帶各種協議棧讓你處理任何一種通用協議都幾乎不用親自動手
  • Netty 社區活躍,遇到問題隨時郵件列表或者 issue
  • Netty 已經歷各大 rpc 框架,消息中間件,分布式通信中間件線上的廣泛驗證,健壯性無比強大

2、netty 架構設計    <--返回目錄

  不同的線程模式,對程序的性能有很大影響,為了搞清 netty 線程模式,我們來系統分析下各個線程模式,最后看看 netty 線程模型有什么優越性。

2.1、線程模型    <--返回目錄

  目前存在的線程模型有:

  • 傳統阻塞 I/O 服務模型
  • Reactor 模式(反應器模式、分發者模式 Dispatcher、通知者模式 Notifier)

  根據 Reactor 的數量和處理資源池線程的數量不同,有 3 種典型的實現:

  • 單 Reactor 單線程
  • 單 Reactor 多線程
  • 主從 Reactor 多線程

  netty 線程模式:netty 主要基於主從 Reactor 多線程模型做了一定的改進,其中主從 Reactor 多線程模式有多個 Reactor

 

2.2、傳統阻塞 I/O 服務模型    <--返回目錄

  模型特點:

  • 采用阻塞 IO 模式 獲取輸入的數據
  • 每個連接都需要獨立的線程完成數據的輸入,業務處理,數據返回

問題分析:

  • 當並發數很大,就會創建大量的線程,占用很大系統資源
  • 連接創建后,如果當前線程暫時沒有數據可讀,該線程回阻塞在 read 操作,造成線程資源的浪費

 

2.3、Reactor 模式    <--返回目錄

  針對傳統阻塞 IO 服務模型的 2 個缺點,解決方案:

  • 基於 IO 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象等待,無需阻塞等待所有連接。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。
  • 基於線程池復用線程資源:不必為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理,一個線程可以處理多個連接的業務。

  IO 復用結合線程池,就是 Reactor 模式基本設計思想。

 

2.4、單 Reactor 單線程    <--返回目錄

  方案說明:

  • select 是前面 IO 復用模型介紹的標准網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求
  • Reactor 對象通過 Select 監控客戶端請求事件,收到事件后通過 Dispatch 進行分發
  • 如果是建立連接請求事件,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 handler 對象處理連接完成后的后續業務處理
  • 如果不是建立連接事件,則 Reactor 會分發調用連接對應的 handler 來響應
  • handler 會完成 read -> 業務處理 -> send 的完整業務流程

  服務器端用一個線程通過多路復用搞定所有的 IO 操作(包括連接、讀寫等),編碼簡單,清晰明了,但是如果客戶端連接數量較多,將無法支撐。

 

2.5、單 Reactor 多線程    <--返回目錄

  方案說明:

  • Reactor 對象通過 select 監控客戶端請求事件,收到事件后,通過 dispatch 進行分發
  • 如果建立連接請求,則由 Acceptor 通過 accept 處理連接請求,然后創建一個 handler 對象處理完成連接后的各種事件
  • 如果不是連接請求,則由 reactor 分發調用連接對象對應的 handler 來處理
  • handler 之負責響應事件,不做具體的業務處理,通過 read 讀取數據后,會分發給后面的 worker 線程池的某個線程處理業務
  • worker 線程池會分配獨立線程完成真正的業務,並將結果返回給 handler
  • handler 收到響應后,通過 send 將結果返回給 cliet

 

  缺點:reactor 處理所有的事件的監聽和響應,在單線程運行,在高並發場景容易出現性能瓶頸。

 

2.6、主從 Reactor 多線程    <--返回目錄

  主 Reactor 負責連接事件;子 Reactor 負責監聽讀寫事件

 

2.7、Netty工作原理架構圖    <--返回目錄

  1)Netty 抽象出兩組線程池,BossGroup 專門負責接受客戶端的連接,WorkerGroup 專門負責網絡的讀寫

  2)BossGroup 和 WorkGroup 類型都是 NioEventLoopGroup

  3)NioEventLoopGroup 相當於一個事件循環組,這個組含有多個事件循環,每個事件循環時 NioEventLoop

  4)NioEventLoop 表示一個不斷循環的執行處理任務的線程,每個 NioEventLoop 都有一個 Selector,用於監聽綁定在其上的 socket 的網絡通訊

  5)NioEventLoopGroup 可以有多個線程,即可以含有多個 NioEventLoop

  6)每個 Boss NioEventLoop 循環執行的步驟

  • 輪詢 accept 事件
  • 處理 accept 事件,與 client 建立連接,生成 NiobiumSocketChannel,並將其注冊到 worker NioEveltLoop 上的 Selector
  • 處理任務隊列的任務,即 runAllTasks

  7) 每個 Worker NioEventLoop 循環執行的步驟

  • 輪詢 read/write 事件
  • 處理 read/write 事件,在對應 NioSocketChannel 處理
  • 處理任務隊列的任務,即 runAllTasks

 

3、Netty 編程之 helloworld    <--返回目錄

  pom 引入依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.22.Final</version>
</dependency>
View Code

  NettyServer

package com.oy;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class NettyServer {

    public static void main(String[] args) throws Exception {

        // 1.創建 BossGroup 和 workerGroup
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();

        // 2.創建服務器端的啟動對象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.鏈式編程,配置參數
        serverBootstrap
                .group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數
                .childOption(ChannelOption.SO_KEEPALIVE, true) // 設置保持獲得連接狀態
                .childHandler(new ChannelInitializer<NioSocketChannel>() { // 給 WorkerGroup 的 EventLoop 對應的管道設置處理器
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new StringDecoder());
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                System.out.println(msg);
                            }
                        });
                    }
                });

        // 4.綁定端口,運行服務器
        ChannelFuture future = serverBootstrap.bind(8000).sync();
        System.out.println("server started and listen " + 8000);

        // 5.對關閉通道進行監聽
        future.channel().closeFuture().sync();
    }

}

  NettyClient

package com.oy;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Date;

public class NettyClient {
    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) {
                        ch.pipeline().addLast(new StringEncoder());
                    }
                });

        Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();

        while (true) {
            channel.writeAndFlush(new Date() + ": hello world11111111!");
            Thread.sleep(2000);
        }
    }
}

 

4、自定義 ChannelInboundHandlerAdapter  收發消息    <--返回目錄

  NettyServer

package com.oy;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

public class NettyServer {

    private int port;

    public static void main(String[] args) {
        new NettyServer(8080).start();
    }

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() {
        /**
         * 創建兩個EventLoopGroup,即兩個線程池,boss線程池用於接收客戶端的連接,一個線程監聽一個端口,一般只會監聽一個端口所以只需一個線程
         * work池用於處理網絡連接數據讀寫或者后續的業務處理(可指定另外的線程處理業務,work完成數據讀寫)
         */
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            /**
             * 實例化一個服務端啟動類,
             * group()指定線程組
             * channel()指定用於接收客戶端連接的類,對應java.nio.ServerSocketChannel
             * childHandler()設置編碼解碼及處理連接的類
             */
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast("decoder", new StringDecoder())
                                    //.addLast("encoder", new StringEncoder())
                                    .addLast(new NettyServerHandler());
                        }
                    });

            // 綁定端口
            ChannelFuture future = server.bind().sync();
            System.out.println("server started and listen " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }

        /**
         * 讀取客戶端發送的數據
         * ChannelHandlerContext ctx: 上下文對象,含有管道 pipeline,通道 channel,連接地址
         * Object msg: 客戶端發送的數據
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead...");

            // 讀取客戶端發送的數據
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));

            //System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
            // 返回消息
            //ctx.write("server write, 收到消息" + msg);
            //ctx.flush();
        }

        /**
         * 數據讀取完畢
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端", CharsetUtil.UTF_8));
        }

        /**
         * 處理異常,關閉通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }
}

  NettyClient

package com.oy;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

public class NettyClient {
    private static final String HOST = "127.0.0.1";
    private static final int PORT= 8080;

    public static void main(String[] args){
        new NettyClient().start(HOST, PORT);
    }

    public void start(String host, int port) {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap client = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    //.addLast("decoder", new StringDecoder())
                                    //.addLast("encoder", new StringEncoder())
                                    .addLast(new NettyClientHandler());
                        }
                    });

            ChannelFuture future = client.connect(host, port).sync();
            //future.channel().writeAndFlush("Hello Netty Server ,I am a netty client");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            group.shutdownGracefully();
        }
    }

    public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
        /**
         * 通道就緒觸發該方法
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldClientHandler Active");
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服務器", CharsetUtil.UTF_8));
        }

        /**
         * 當通道有讀取事件時觸發該方法
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("收到服務器響應: " + buf.toString(CharsetUtil.UTF_8));
        }
    }
}

 

5、任務隊列 taskQueue 和 scheduledTaskQueue    <--返回目錄

  任務隊列中的 Task 有三種典型使用場景

  • 用戶程序自定義的普通任務
  • 用戶自定義定時任務
  • 非當前 Reactor 線程調用 Channel 的各種方法。例如在推送系統的業務線程里,根據用戶的標識,找到對應的 Channel 引用,然后調用 Write 類方法向該用戶推送消息,就會進入到這種場景。最終的 Write 會提交到任務隊列中被異步消費
public static class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("HelloWorldServerHandler active");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead...");

        // 讀取客戶端發送的數據
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));

        // 模擬業務處理耗時
        //Thread.sleep(5 * 1000);
        //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端1\n", CharsetUtil.UTF_8));

        // 用戶自定義的任務,任務添加到 taskQueue 中
        ctx.channel().eventLoop().execute(new Runnable() {

            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端1\n", CharsetUtil.UTF_8));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

        });

        // 用戶自定義定義任務, 任務添加到 scheduledTaskQueue 中
        ctx.channel().eventLoop().schedule(new Runnable() {
            public void run() {
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端 shedule\n", CharsetUtil.UTF_8));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5 , TimeUnit.SECONDS);
    }

    /**
     * 數據讀取完畢
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客戶端2", CharsetUtil.UTF_8));
    }

    /**
     * 處理異常,關閉通道
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
    }
}

 

6、Netty 異步模型    <--返回目錄

  異步的概念和同步相對。當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用的組件在完成后,通過狀態、通知和回調來通知調用者。

  Netty 中的 IO 操作時異步的,包括 Bind、Write、Connect 等操作會簡單的返回一個 ChannelFuture。

  調用者並不能立刻獲得結果,而是通過 Future-Listener 機制,用戶可以方便的主動獲取或者通過通知機制獲得 IO 操作結果。

  Netty 的異步模型是建立在 Future 和 callback 之上的,callback 就是回調。重點說 Future,它的核心思想是:假設一個方法 fun,計算過程可能非常耗時,等待 fun 返回顯然不合適,那么可以在調用 fun 的時候,立馬返回一個 Future,后續可以通過 Future 取監控方法 fun 的處理過程(即 Future-Listener 機制)。

    // 綁定端口
    final ChannelFuture future = server.bind(8080).sync();

    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (future.isDone()) {
                System.out.println("監聽端口 8080 成功");
            } else {
                System.out.println("監聽端口 8080 失敗");
            }
        }
    });

 

7、Netty 入門案例--HTTP 服務    <--返回目錄

 

 

   HttpServer

package com.oy.http;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class HttpServer {
    public static void main(String[] args) {
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            ServerBootstrap server = new ServerBootstrap()
                    .group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    //.localAddress(new InetSocketAddress(port))
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new MyChannelInitializer());

            // 綁定端口
            ChannelFuture future = server.bind(8080).sync();
            System.out.println("server started and listen " + 8080);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }
}
View Code

  MyChannelInitializer

package com.oy.http;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {

        /* 向管道加入處理器 */
        ChannelPipeline pipeline = socketChannel.pipeline();
        // HttpServerCodec: netty 提供的處理 http 的編-解碼器
        pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
        // 添加自定義的處理器
        pipeline.addLast("MyHttpServerHandler", new MyHttpServerHandler());

    }

}
View Code

  MyHttpServerHandler

package com.oy.http;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

import java.net.URI;

public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {

    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println("===================================");
        System.out.println("msg 類型: " + msg.getClass().getName());
        System.out.println("客戶端地址:" + ctx.channel().remoteAddress());

        // 判斷 msg 是否是 http request 請求
        if (msg instanceof HttpRequest) {
            HttpRequest request = (HttpRequest) msg;
            URI uri = new URI(request.uri());
            System.out.println("請求 uri: " + uri.getPath());
            if ("/favicon.ico".equals(uri.getPath())) {
                System.out.println("請求 favicon.icon,不做響應");
                return;
            }

            // 回復信息給瀏覽器
            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務器", CharsetUtil.UTF_8);
            // 構造一個 http 的響應,即 http response
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                    HttpResponseStatus.OK, content);
            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            ctx.writeAndFlush(response);
        }
    }

}
View Code

 

參考:

  1)netty 官網:https://netty.io/

  2)《跟閃電俠學Netty》開篇:Netty是什么?

  3)掘金小冊:Netty 入門與實戰:仿寫微信 IM 即時通訊系統

  4)Netty整體架構

  5)Netty工作原理架構圖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM