目錄:
Reactor(反應堆)和Proactor(前攝器)
《I/O模型之三:兩種高性能 I/O 設計模式 Reactor 和 Proactor》
《【轉】第8章 前攝器(Proactor):用於為異步事件多路分離和分派處理器的對象行為模式》
《Java NIO系列教程(八)JDK AIO編程》-- java AIO的proactor模式
《Java NIO系列教程(七) selector原理 Epoll版的Selector》--java NIO的Reactor模式
Netty的I/O線程NioEventLoop由於聚合了多路復用器Selector,可以同時並發處理成百上千個客戶端SocketChannel。由於讀寫操作都是非阻塞的,這就可以充分提升I/O線程的運行效率,避免由頻繁的I/O阻塞導致的線程掛起。另外,由於Netty采用了異步通信模式,一個I/O線程可以並發處理N個客戶端連接和讀寫操作,這從根本上解決了傳統同步阻塞I/O一連接一線程模型,架構的性能、彈性伸縮能力和可靠性都得到了極大的提升。
高效的Reactor線程模型
常見的Reactor線程模型有三種,分別如下:
- Reactor單線程模型;
- Reactor多線程模型;
- 主從Reactor多線程模型;
Netty是典型的Reactor模型結構,關於Reactor的詳盡闡釋,可參考POSA2,這里不做概念性的解釋。而應用Java NIO構建Reactor模式,Doug Lea(就是那位讓人無限景仰的大爺)在“Scalable IO in Java”中給了很好的闡述。這里截取其PPT中經典的圖例說明 Reactor模式的典型實現:
1、Reactor單線程模型
Reactor單線程模型,指的是所有的I/O操作都在同一個NIO線程上面完成,NIO線程的職責如下:
- 作為NIO服務端,接收客戶端的TCP連接;
- 作為NIO客戶端,向服務端發起TCP連接;
- 讀取通信對端的請求或者應答消息;
- 向通信對端發送消息請求或者應答消息;
Reactor線程是個多面手,負責多路分離套接字,Accept新連接,並分派請求到處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,所以實際使用的不多。
對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,主要原因如下:
- 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;
- 當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往進行重發,這更加重了NIO線程的負載,最終導致大量消息積壓和處理超時,NIO線程會成為系統的性能瓶頸;
- 可靠性問題。一旦NIO線程意外跑飛,或者進入死循環,會導致整個系統通訊模塊不可用,不能接收和處理外部信息,造成節點故障。
為了解決這些問題,演進出了Reactor多線程模型,下面我們一起學習下Reactor多線程模型。
2、Reactor多線程模型
Reactor多線程模型與單線程模型最大區別就是有一組NIO線程處理I/O操作,它的特點如下:
- 有一個專門的NIO線程--acceptor新城用於監聽服務端,接收客戶端的TCP連接請求;
- 網絡I/O操作--讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
- 1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。
在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端並發連接,或者服務端需要對客戶端的握手信息進行安全認證,認證本身非常損耗性能。這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型--主從Reactor多線程模型。
3、主從Reactor多線程模型
特點是:服務端用於接收客戶端連接的不再是1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工作。
Acceptor線程池只用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的I/O線程上,有I/O線程負責后續的I/O操作。
第三種模型比起第二種模型,是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網 絡數據,對業務處理功能,其扔給worker線程池完成。通常,subReactor個數上可與CPU個數等同。
NioEventLoopGroup 與 Reactor 線程模型的對應
Netty的線程模型並發固定不變,通過在啟動輔助類中創建不同的EventLoopGroup實例並進行適當的參數配置,就可以支持上述三種Reactor線程模型。
Netty單線程模型服務端代碼示例如下:
/** * Netty單線程模型服務端代碼示例 * @param port */ public void bind(int port) { EventLoopGroup reactorGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(reactorGroup, reactorGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reactorGroup.shutdownGracefully(); } }
Netty多線程模型代碼如下:
/** * Netty多線程模型代碼 * @param port */ public void bind2(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
Netty主從線程模型代碼如下:
/** * Netty主從線程模型代碼 * @param port */ public void bind3(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
說完Reacotr模型的三種形式,那么Netty是哪種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,之后觸發ChannelPipeline中的ChannelHandler流。
Netty是事件驅動的,可以通過ChannelHandler鏈來控制執行流向。因為ChannelHandler鏈的執行過程是在 subReactor中同步的,所以如果業務處理handler耗時長,將嚴重影響可支持的並發數。這種模型適合於像Memcache這樣的應用場景,但 對需要操作數據庫或者和其他模塊阻塞交互的系統就不是很合適。Netty的可擴展性非常好,而像ChannelHandler線程池化的需要,可以通過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來說只是 添加一行代碼而已。對於ExecutionHandler需要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,后續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類,它還可以保證同一Channel中處理的事件流的順序性,這主要是控制事件在異步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(通常也沒必要)。一般來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇,當然,如果有需要,也可以DIY一個。