Netty中的三種Reactor(反應堆)


目錄:

Reactor(反應堆)和Proactor(前攝器)

I/O模型之三:兩種高性能 I/O 設計模式 Reactor 和 Proactor

【轉】第8章 前攝器(Proactor):用於為異步事件多路分離和分派處理器的對象行為模式

Java NIO系列教程(八)JDK AIO編程》-- java AIO的proactor模式

Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式

Netty中的三種Reactor(反應堆)》 

 

Netty的I/O線程NioEventLoop由於聚合了多路復用器Selector,可以同時並發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升I/O線程的運行效率,避免由頻繁的I/O阻塞導致的線程掛起。另外,由於Netty采用了異步通信模式,一個I/O線程可以並發處理N個客戶端連接和讀寫操作,這從根本上解決了傳統同步阻塞I/O一連接一線程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升。

高效的Reactor線程模型

常見的Reactor線程模型有三種,分別如下:

  1. Reactor單線程模型;
  2. Reactor多線程模型;
  3. 主從Reactor多線程模型;

Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,可參考POSA2,這里不做概念性的解釋。而應用Java NIO構建Reactor模式,Doug Lea(就是那位讓人無限景仰的大爺)在“Scalable IO in Java”中給了很好的闡述。這里截取其PPT中經典的圖例說明 Reactor模式的典型實現:

1、Reactor單線程模型

Reactor單線程模型,指的是所有的I/O操作都在同一個NIO線程上面完成,NIO線程的職責如下:

  1. 作為NIO服務端,接收客戶端的TCP連接;
  2. 作為NIO客戶端,向服務端發起TCP連接;
  3. 讀取通信對端的請求或者應答消息;
  4. 向通信對端發送消息請求或者應答消息;

Reactor線程是個多面手,負責多路分離套接字,Accept新連接,並分派請求到處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,所以實際使用的不多。

對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,主要原因如下:

  1. 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;
  2. 當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往進行重發,這更加重了NIO線程的負載,最終導致大量消息積壓和處理超時,NIO線程會成為系統的性能瓶頸;
  3. 可靠性問題。一旦NIO線程意外跑飛,或者進入死循環,會導致整個系統通訊模塊不可用,不能接收和處理外部信息,造成節點故障。

為了解決這些問題,演進出了Reactor多線程模型,下面我們一起學習下Reactor多線程模型。

2、Reactor多線程模型

Reactor多線程模型與單線程模型最大區別就是有一組NIO線程處理I/O操作,它的特點如下:

  1. 有一個專門的NIO線程--acceptor新城用於監聽服務端,接收客戶端的TCP連接請求;
  2. 網絡I/O操作--讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
  3. 1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。

在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端並發連接,或者服務端需要對客戶端的握手信息進行安全認證,認證本身非常損耗性能。這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型--主從Reactor多線程模型。

3、主從Reactor多線程模型

特點是:服務端用於接收客戶端連接的不再是1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工作。

Acceptor線程池只用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的I/O線程上,有I/O線程負責后續的I/O操作。

第三種模型比起第二種模型,是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網 絡數據,對業務處理功能,其扔給worker線程池完成。通常,subReactor個數上可與CPU個數等同。

 

NioEventLoopGroup 與 Reactor 線程模型的對應

Netty的線程模型並發固定不變,通過在啟動輔助類中創建不同的EventLoopGroup實例並進行適當的參數配置,就可以支持上述三種Reactor線程模型。

Netty單線程模型服務端代碼示例如下:

/**
     * Netty單線程模型服務端代碼示例
     * @param port
     */
    public void bind(int port) {
        EventLoopGroup reactorGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(reactorGroup, reactorGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reactorGroup.shutdownGracefully();
        }
    }

Netty多線程模型代碼如下:

/**
     * Netty多線程模型代碼
     * @param port
     */
    public void bind2(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

Netty主從線程模型代碼如下:

/**
     * Netty主從線程模型代碼
     * @param port
     */
    public void bind3(int port) {
        EventLoopGroup acceptorGroup = new NioEventLoopGroup();
        NioEventLoopGroup ioGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(acceptorGroup, ioGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast("http-codec", new HttpServerCodec());
                        ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                        ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                        //后面代碼省略
                    }
                });
        
            Channel ch = b.bind(port).sync().channel();
            ch.closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            acceptorGroup.shutdownGracefully();
            ioGroup.shutdownGracefully();
        }
    }

 

說完Reacotr模型的三種形式,那么Netty是哪種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,之后觸發ChannelPipeline中的ChannelHandler流。

Netty是事件驅動的,可以通過ChannelHandler鏈來控制執行流向。因為ChannelHandler鏈的執行過程是在 subReactor中同步的,所以如果業務處理handler耗時長,將嚴重影響可支持的並發數。這種模型適合於像Memcache這樣的應用場景,但 對需要操作數據庫或者和其他模塊阻塞交互的系統就不是很合適。Netty的可擴展性非常好,而像ChannelHandler線程池化的需要,可以通過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來說只是 添加一行代碼而已。對於ExecutionHandler需要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,后續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是  MemoryAwareThreadPoolExecutor 的子類,它還可以保證同一Channel中處理的事件流的順序性,這主要是控制事件在異步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(通常也沒必要)。一般來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇,當然,如果有需要,也可以DIY一個。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM