什么是netty--通俗易懂


 

一.Netty介紹

1.什么是netty

Netty 是由 JBOSS 提供的一個 Java 開源框架。Netty 提供異步的、基於事件驅動的網絡應用程序框架,用以快速開發高性能、高可靠性的網絡 IO 程序,是目前最流行的 NIO 框架,Netty 在互聯網領域、大數據分布式計算領域、游戲行業、通信行業等獲得了廣泛的應用,知名的 Elasticsearch Dubbo 框架內部都采用了 Netty

2.為什么要用netty

原生 NIO 存在問題:

1.NIO 的類庫和 API 繁雜

2.需要熟悉 Java 多線程編程,因為 NIO 編程涉及到 Reactor 模式,必須對多線程和網絡編程非常熟悉, 才能編寫出高質量的 NIO 程序

3.開發工作量和難度都非常大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常  流的處理等等處理起來難度會比較大。

4.JDK NIO Bug:例如臭名昭著的 Epoll Bug,它會導致 Selector 空輪詢,最終導致 CPU 100%。直到 JDK 1.7 版本該問題仍舊存在,沒有被根本解決。

3.Netty的優點

Netty JDK 自帶的 NIO API 進行了封裝,解決了上述問題。

1.設計優雅:適用於各種傳輸類型的統一 API 阻塞和非阻塞 Socket;基於靈活且可擴展的事件模型,可以清晰地分離關注點;高度可定制的線程模型 - 單線程,一個或多個線程池.

2.使用方便:詳細記錄的 Javadoc,用戶指南和示例;沒有其他依賴項,JDK 5Netty 3.x)或 6Netty 4.x)就足夠了。

3.高性能、吞吐量更高:延遲更低;減少資源消耗;最小化不必要的內存復制。

4.安全:完整的 SSL/TLS StartTLS 支持。

5.社區活躍、不斷更新:社區活躍,版本迭代周期短,發現的 Bug 可以被及時修復,同時更多的新功能會被加入

 

二.Reactor三種線程模型

1.現有的三種線程模型

不同的線程模式,對程序的性能有很大影響,目前存在的線程模型有:

.傳統阻塞 I/O 服務模型

Reactor 模式

Reactor 模式又有 3 種典型的實現

Reactor 單線程;

Reactor 多線程;

主從 Reactor 多線程

Netty 的線程模型是主要是基於主從 Reactor 多線程模型改成了主從 Reactor 多線程模型有多個 Reactor模式

2.傳統阻塞 I/O 服務模型介紹

特點:

采用阻塞IO模式獲取輸入的數據

每個連接都需要創建單獨的線程完成數據的輸入,業務處理數據返回

缺點:

當並發數很大,就會創建大量的線程,占用很大系統資源,在線程開銷和上下文切換上降低處理性能

連接創建后,如果當前線程暫時沒有數據可讀,該線程會阻塞在read 操作,造成線程資源浪費

 

 

 

 

黃色的框表示對象, 藍色的框表示線程 白色的框表示方法(API)

3. Reactor 模式

針對傳統阻塞 I/O 服務模型的 2 個缺點,解決方案:

I/O 復用模型:多個連接共用一個阻塞對象,應用程序只需要在一個阻塞對象上等待,無需阻塞等待所有連接。當某個連接有新的數據可以處理時,操作系統通知應用程序,線程從阻塞狀態返回,開始進行業務處理。

Reactor 對應的叫法: 1. 反應器模式 2. 分發者模式(Dispatcher) 3. 通知者模式(notifier)

基於線程池復用線程資源模式:不必再為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理,一個線程可以處理多個連接的業務。

I/O 復用結合線程池,就是 Reactor 模式基本設計思想

 

 

 

 

Reactor 模式,通過一個或多個輸入同時傳遞給服務處理器的模式,(基於事件驅動)

服務器端程序處理傳入的多個請求,並將它們同步分派到相應的處理線程, 因此Reactor模式也叫 Dispatcher模式

Reactor 模式使用IO復用監聽事件, 收到事件后,分發給某個線程(進程), 這點就是網絡服務器高並發處理關鍵

 

4.單 Reactor 單線程

1.工作原理:

①Select 是前面 I/O 復用模型介紹的標准網絡編程 API,可以實現應用程序通過一個阻塞對象監聽多路連接請求

②Reactor 對象通過 Select 監控客戶端請求事件,收到事件后通過 Dispatch 進行分發

如果是建立連接請求事件,則由 Acceptor 通過 Accept 處理連接請求,然后創建一個 Handler 對象處理連接完成后的后續業務處理

如果不是建立連接事件,則 Reactor 會分發調用連接對應的 Handler 來響應

⑤Handler 會完成 Read→業務處理→Send 的完整業務流程

2.優點:

模型簡單,沒有多線程、進程通信、競爭的問題,全部都在一個線程中完成

3.缺點:

性能問題,只有一個線程,無法完全發揮多核 CPU 的性能。

可靠性問題,線程意外終止,或者進入死循環,會導致整個系統通信模塊不可用,不能接收和處理外部消息,造成節點故障

服務器端用一個線程通過多路復用搞定所有的 IO 操作(包括連接,讀、寫等),編碼簡單,清晰明了,但是如果客戶端連接數量較多時,當對應多個讀時,還是會出現阻塞現象,當這種情況發生時將無法支撐高並發的場景。

4.應用場景:

客戶端的數量有限,業務處理非常快速(比如 Redis在業務處理的時間復雜度 O(1) 的情況)

 

 

 

 

5.Reactor多線程

1.工作原理:

①Reactor 對象通過select 監控客戶端請求事件, 收到事件后,通過dispatch進行分發

②如果是建立連接請求, 由Acceptor 通過accept 處理連接請求, 然后創建一個Handler對象處理完成連接后的各種事件

③如果不是連接請求,則由Reactor分發調用連接對應的handler 來處理

④handler 只負責響應事件,不做具體的業務處理, 通過read 讀取數據后,會分發給后面的worker線程池的某個線程處理業務

⑤worker 線程池會分配獨立線程完成真正的業務,並將結果返回給handler,handler收到響應后,通過send 將結果返回給client.

2.優點:

可以充分的利用多核cpu 的處理能力

3.缺點:

多線程數據共享和訪問比較復雜,Reactor處理所有的事件的監聽和響應,在單線程運行時,在高並發場景容易出現性能瓶頸.

 

6.主從 Reactor 多線程

1.工作原理:

①Reactor主線程 MainReactor 對象通過select 監聽連接事件, 收到事件后,通過Acceptor 處理連接事件

Acceptor  處理連接事件后,MainReactor 將連接分配給SubReactor

③subReactor 將連接加入到連接隊列進行監聽,並創建handler進行各種事件處理

當有新事件發生時, subreactor 就會調用對應的handler處理

⑤handler 通過read 讀取數據,分發給后面的worker 線程處理

⑥worker 線程池分配獨立的worker 線程進行業務處理,並返回結果

⑦handler 收到響應的結果后,再通過send 將結果返回給client

⑧Reactor 主線程可以對應多個Reactor 子線程, MainRecator 可以關聯多個SubReactor

 

 

 

 

三.Netty線程模型

1.工作原理

Netty抽象出兩組線程池 BossGroup 專門負責接收客戶端的連接, WorkerGroup 專門負責網絡的讀寫

BossGroup WorkerGroup 類型都是 NioEventLoopGroup

NioEventLoopGroup 相當於一個事件循環組, 這個組中含有多個事件循環 ,每一個事件循環是 NioEventLoop

NioEventLoop 表示一個不斷循環的執行處理任務的線程, 每個NioEventLoop 都有一個selector , 用於監聽綁定在該通道上的socket的網絡通訊

NioEventLoopGroup 可以有多個線程, 即可以含有多個NioEventLoop

每個Boss NioEventLoop 循環執行的步驟有3

輪詢accept 事件

處理accept 事件 , client端建立連接 , 生成NioScocketChannel , 並將其注冊到某個worker NIOEventLoop 上的 selector

處理任務隊列的任務 runAllTasks

每個 Worker NIOEventLoop 循環執行的步驟

輪詢read, write 事件

處理i/o事件, 即read , write 事件,在對應NioScocketChannel 處理

處理任務隊列的任務 runAllTasks

每個Worker NIOEventLoop  處理業務時,會使用pipeline(管道), pipeline 中包含了boss groupNioEventLoop注冊到worker selector channel , 即通過pipeline 可以獲取到對應通道, 管道中維護了很多的處理器

 

 

 

 

四.Netty入門

1.引入java包

(JDK 5(Netty 3.x)或 6(Netty 4.x))
<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.20.Final</version>
</dependency>

2.hello world 編寫

入門的編寫一共需要4個類

netty server

netty server handler

netty client

netty client handler

2.1.netty server 端編寫

package com.zpb.netty.netty.helloWorld;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoop;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * @dec : netty入門
 * @Date: 2019/11/24
 * @Auther: pengbo.zhao
 * @version: 1.0
 * @demand:
 *
 *    {@link #main(String[] args)}
 *
 */
public class NettyServer {

        public static void main(String[] args) throws  Exception{

        //1.創建BossGroup 和 WorkerGroup
        //1.1 創建2個線程組
        //bossGroup只處理連接請求
        //workerGroup 處理客戶端的業務邏輯
        //2個都是無限循環
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //2.創建服務端的啟動對象,可以為服務端啟動配置一些服務參數
        ServerBootstrap bootStrap = new ServerBootstrap();

        //2.1使用鏈式編程來配置服務參數
        bootStrap.group(bossGroup,workerGroup)                          //設置2個線程組
                .channel(NioServerSocketChannel.class)                 //使用NioServerSocketChannel作為服務器的通道
                .option(ChannelOption.SO_BACKLOG,128)            //設置線程等待的連接個數
                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE) //設置保持活動連接狀態
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    //給PipeLine設置處理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        //通過socketChannel得到pipeLine,然后向pipeLine中添加處理的handle
                        socketChannel.pipeline().addLast(new NettyServerHandle());
                    }
                }); //給workerGroup 的EventLoop對應的管道設置處理器(可以自定義/也可使用netty的)
        System.err.println("server is ready......");

        //啟動服務器,並綁定1個端口且同步生成一個ChannelFuture 對象
        ChannelFuture channelFuture = bootStrap.bind(8888).sync();

        //對關閉通道進行監聽(netty異步模型)
        //當通道進行關閉時,才會觸發這個關閉動作
        channelFuture.channel().closeFuture().sync();

    }
}

2.2.netty server handler編寫

package com.zpb.netty.netty.helloWorld;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

/**
 * @dec :
 * @Date: 2019/11/24
 * @Auther: pengbo.zhao
 * @version: 1.0
 * @demand:
 */
public class NettyServerHandle extends ChannelInboundHandlerAdapter {
    /**
     * 讀取數據
     *
     * @param: 1.ChannelHandlerContext ctx:上下文對象, 含有 管道 pipeline , 通道 channel, 地址
     * @param: 2. Object msg: 就是客戶端發送的數據 默認 Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.err.println("服務器讀取線程 " + Thread.currentThread().getName());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看 channel 和 pipeline 的關系");

        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈接, 出站入站

        //將 msg 轉成一個 ByteBuf,ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + channel.remoteAddress());
    }

    /**
     * 讀取數據完成后
     *
     * @param:
     * @return:
     * @auther:
     * @date:
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //writeAndFlush 是 write + flush
        //將數據寫入到緩存,並刷新
        //一般講,我們對這個發送的數據進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //處理異常, 一般是需要關閉通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2.3.netty client端編寫

package com.zpb.netty.netty.helloWorld;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


/**
 * @dec :
 * @Date: 2019/11/24
 * @Auther: pengbo.zhao
 * @version: 1.0
 * @demand:
 */
public class NettyClient {

    public static void main(String[] args) throws Exception {

        //1.客戶端定義一個循環事件組
        EventLoopGroup group = new NioEventLoopGroup();

        try {

            //2.創建客戶端啟動對象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)                      //設置線程組
                    .channel(NioSocketChannel.class)   //設置客戶端通道實現類
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new NettyClientHandle());
                        }
                    });
            System.err.println("client is ready......");

            //3.啟動客戶端去連接服務端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();

            //4.設置通道關閉監聽(當監聽到通道關閉時,關閉client)
            channelFuture.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

2.4.netty client handler端編寫

package com.zpb.netty.netty.helloWorld;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @dec :
 * @Date: 2019/11/24
 * @Auther: pengbo.zhao
 * @version: 1.0
 * @demand:
 */
public class NettyClientHandle extends ChannelInboundHandlerAdapter{

    //如果client 端服務啟動完成后
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.err.println("client "+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello,netty server...",CharsetUtil.UTF_8));
    }

    //當通道有讀事件時
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf byteBuf = (ByteBuf) msg;
        System.err.println("服務器端回復消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        System.err.println("服務器端地址是:"+ctx.channel().remoteAddress());
    }

    //當通道有異常時

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

五.Netty三種任務隊列的使用

當我們在處理的handle中如果出現了阻塞的情況,或者處理業務邏輯比較耗時,我們不能讓程序處於阻塞,

當有客戶端請求時,我們想讓程序定時的去執行業務邏輯,

當需要對一些用戶需要進行推送活動時,根據用戶標識,找到對應的 Channel 引用,向該用戶推送特定消息時

可以采用以下三種任務隊列:

1.提交到execute(Runnable command)中時

ctx.channel().eventLoop().execute(new Runnable() { })

業務邏輯交給線程去處理,線程不會阻塞在這里,而是直接返回,直到有數據才返回給客戶端,如果有多個線程runnable需要處理,那么只能等上一個處理完才會處理下一個,(假如第1個任務需要10S,第2個需要20s,執行完共需30S)

2.提交到 scheduledTaskQueue中

schedule(Runnable command, long delay, TimeUnit unit)

①Runnable command:執行業務邏輯處理的線程

long delay:定時時長

③TimeUnit unit:定時類型

業務邏輯交給定時線程去處理。

3.通過傳輸的內容的標識

在解碼客戶端發送的內容中,讀取到客戶端的特殊標識,利用這個標識來進行推送消息處理,這個在粘包、拆包中進行說明

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM