一、什么是Reactor模型
Reactor設計模式是event-driven architecture(事件驅動)的一種實現方式。Reactor會解耦並發請求的服務並分發給對應的事件處理器來處理。
目前,許多流行的開源框架都用到了Reactor模型。如:netty、node.js等,包括java的nio。
二、基於IO事件驅動的分發處理模型
1)分而治之
一個連接里完整的網絡處理過程一般分為accept、read、decode、process、encode、send這幾步。
Reactor模式將每個步驟映射為一個Task,服務端線程執行的最小邏輯單元不再是一次完整的網絡請求,而是Task,且采用非阻塞方式執行。
2)事件驅動
每個Task對應特定網絡事件。當Task准備就緒時,Reactor收到對應的網絡事件通知,並將Task分發給綁定了對應網絡事件的Handler執行。
3)幾個角色
reactor:負責綁定管理事件和處理接口;
selector:負責監聽響應事件,將事件分發給綁定了該事件的Handler處理;
Handler:事件處理器,綁定了某類事件,負責執行對應事件的Task對事件進行處理;
Acceptor:Handler的一種,綁定了connect事件。當客戶端發起connect請求時,Reactor會將accept事件分發給Acceptor處理。
三、Reactor三種線程模型
Netty是典型的Reactor模型結構,常見的Reactor線程模型有三種,分別是:Reactor單線程模型;Reactor多線程模型;主從Reactor多線程模型。
1、單線程模型
Reactor單線程模型,指的是所有的I/O操作都在同一個NIO線程上面完成,NIO線程的職責如下:
- 作為NIO服務端,接收客戶端的TCP連接;
- 作為NIO客戶端,向服務端發起TCP連接;
- 讀取通信對端的請求或者應答消息;
- 向通信對端發送消息請求或者應答消息;
Reactor線程是個多面手,負責多路分離套接字,Accept新連接,並分派請求到處理器鏈中。該模型 適用於處理器鏈中業務處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,所以實際使用的不多。如圖所示:所有的處理操作Reactor、Acceptor、Handler都是一個線程實現。
服務端線程啟動代碼如下:
public class ReactorServer { public static void main(String[] args) throws Exception{ new Thread(new Reactor(8080),"reactor-001").start(); } }
Reactor線程:
public class Reactor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); //打開一個Selector serverSocketChannel = ServerSocketChannel.open(); //建立一個Server端通道 serverSocketChannel.socket().bind(new InetSocketAddress(port)); //綁定服務端口 serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必須是非阻塞的 //Reactor是入口,最初給一個channel注冊上去的事件都是accept SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor(serverSocketChannel, selector));//綁定接收事件處理器 } @Override public void run() { try { while (!Thread.interrupted()) { selector.select(); //就緒事件到達之前,阻塞 Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件 Iterator it = selected.iterator(); while (it.hasNext()) { //這里進行任務分發 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException e) { e.printStackTrace(); }finally { if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //這里很關鍵,拿到每次selectKey里面附帶的處理對象,然后調用其run,這個對象在具體的Handler里會進行創建,初始化的附帶對象為Acceptor(看上面構造器) //調用之前注冊的callback對象 if (r != null) { r.run();//只是拿到句柄執行run方法,並沒有新起線程 } } }
服務端Acceptor連接建立:
public class Acceptor implements Runnable { private final Selector selector; private final ServerSocketChannel serverSocketChannel; Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) { this.serverSocketChannel = serverSocketChannel; this.selector = selector; } @Override public void run() { SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); //三次握手 if (socketChannel != null) { System.out.println(String.format("收到來自 %s 的連接", socketChannel.getRemoteAddress())); new Handler(socketChannel, selector); //這里把客戶端通道傳給Handler, // Handler負責接下來的事件處理(除了連接事件以外的事件均可) // new AsyncHandler(socketChannel, selector); } } catch (IOException e) { e.printStackTrace(); } } }
服務端Handler代碼:
public class Handler implements Runnable { private final SelectionKey selectionKey; private final SocketChannel socketChannel; private ByteBuffer readBuffer = ByteBuffer.allocate(1024); private ByteBuffer sendBuffer = ByteBuffer.allocate(2048); private final static int READ = 0; private final static int SEND = 1; private int status = READ; public Handler(SocketChannel socketChannel, Selector selector) throws IOException { this.socketChannel = socketChannel; //接收客戶端連接 this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式) selectionKey = socketChannel.register(selector, 0); //將該客戶端注冊到selector,得到一個SelectionKey,以后的select到的就緒動作全都是由該對象進行封裝 selectionKey.attach(this); //附加處理對象,當前是Handler對象,run是對象處理業務的方法 selectionKey.interestOps(SelectionKey.OP_READ); //走到這里,說明之前Acceptor里的建連已完成,那么接下來就是讀取動作,因此這里首先將讀事件標記為“感興趣”事件 selector.wakeup(); //讓阻塞的selector立即返回 ----> selector.select() } @Override public void run() { try { switch (status) { case READ: read(); break; case SEND: send(); break; default: } } catch (IOException e) { //這里的異常處理是做了匯總,常出的異常就是server端還有未讀/寫完的客戶端消息,客戶端就主動斷開連接,這種情況下是不會觸發返回-1的,這樣下面read和write方法里的cancel和close就都無法觸發,這樣會導致死循環異常(read/write處理失敗,事件又未被cancel,因此會不斷的被select到,不斷的報異常) System.err.println("read或send時發生異常!異常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e2) { System.err.println("關閉通道時發生異常!異常信息:" + e2.getMessage()); e2.printStackTrace(); } } } private void read() throws IOException { if (selectionKey.isValid()) { readBuffer.clear(); int count = socketChannel.read(readBuffer); //read方法結束,意味着本次"讀就緒"變為"讀完畢",標記着一次就緒事件的結束 if (count > 0) { System.out.println(String.format("收到來自 %s 的消息: %s", socketChannel.getRemoteAddress(),new String(readBuffer.array()))); status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫方法 } else { //讀模式下拿到的值是-1,說明客戶端已經斷開連接,那么將對應的selectKey從selector里清除,否則下次還會select到,因為斷開連接意味着讀就緒不會變成讀完畢,也不cancel,下次select會不停收到該事件 //所以在這種場景下,(服務器程序)你需要關閉socketChannel並且取消key,最好是退出當前函數。注意,這個時候服務端要是繼續使用該socketChannel進行讀操作的話,就會拋出“遠程主機強迫關閉一個現有的連接”的IO異常。 selectionKey.cancel(); socketChannel.close(); System.out.println("read時-------連接關閉"); } } } void send() throws IOException { if (selectionKey.isValid()) { sendBuffer.clear(); sendBuffer.put(String.format("我收到來自%s的信息辣:%s, 200ok;", socketChannel.getRemoteAddress(), new String(readBuffer.array())).getBytes()); sendBuffer.flip(); int count = socketChannel.write(sendBuffer); //write方法結束, // 意味着本次寫就緒變為寫完畢,標記着一次事件的結束 if (count < 0) { //同上,write場景下,取到-1,也意味着客戶端斷開連接 selectionKey.cancel(); socketChannel.close(); System.out.println("send時-------連接關閉"); } //沒斷開連接,則再次切換到讀 status = READ; selectionKey.interestOps(SelectionKey.OP_READ); } } }
對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,主要原因如下:
- 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發送;
- 當NIO線程負載過重之后,處理速度將變慢,這會導致大量客戶端連接超時,超時之后往往進行重發,這更加重了NIO線程的負載,最終導致大量消息積壓和處理超時,NIO線程會成為系統的性能瓶頸;
- 可靠性問題。一旦NIO線程意外跑飛,或者進入死循環,會導致整個系統通訊模塊不可用,不能接收和處理外部信息,造成節點故障。
為了解決這些問題,演進出了Reactor多線程模型,下面我們一起學習下Reactor多線程模型。
2、多線程模型
Reactor多線程模型與單線程模型最大區別就是有一組NIO線程處理I/O操作,它的特點如下:
- 有一個專門的NIO線程--acceptor新城用於監聽服務端,接收客戶端的TCP連接請求;
- 網絡I/O操作--讀、寫等由一個NIO線程池負責,線程池可以采用標准的JDK線程池實現,它包含一個任務隊列和N個可用的線程,由這些NIO線程負責消息的讀取、解碼、編碼和發送;
- 1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應1個NIO線程,防止發生並發操作問題。
如圖所示:Reactor、Acceptor的處理操作是一個線程實現。Handler是另一個線程實現。
因此,如代碼所示,其余代碼一致,重寫Handler為:
public class AsyncHandler implements Runnable { private final Selector selector; private final SelectionKey selectionKey; private final SocketChannel socketChannel; private ByteBuffer readBuffer = ByteBuffer.allocate(1024); private ByteBuffer sendBuffer = ByteBuffer.allocate(2048); private final static int READ = 0; //讀取就緒 private final static int SEND = 1; //響應就緒 private final static int PROCESSING = 2; //處理中 private int status = READ; //所有連接完成后都是從一個讀取動作開始的 //開啟線程數為5的異步處理線程池 private static final ExecutorService workers = Executors.newFixedThreadPool(5); public AsyncHandler(SocketChannel socketChannel, Selector selector) throws IOException { this.socketChannel = socketChannel; this.socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); this.selector = selector; this.selector.wakeup(); } @Override public void run() { //如果一個任務正在異步處理,那么這個run是直接不觸發任何處理的,read和send只負責簡單的數據讀取和響應,業務處理完全不阻塞這里的處理 switch (status) { case READ: read(); break; case SEND: send(); break; default: } } private void read() { if (selectionKey.isValid()) { try { readBuffer.clear(); int count = socketChannel.read(readBuffer); if (count > 0) { status = PROCESSING; //置為處理中,處理完成后該狀態為響應,表示讀入處理完成,接下來可以響應客戶端了 workers.execute(this::readWorker); //異步處理 } else { selectionKey.cancel(); socketChannel.close(); System.out.println("read時-------連接關閉"); } } catch (IOException e) { System.err.println("處理read業務時發生異常!異常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { System.err.println("處理read業務關閉通道時發生異常!異常信息:" + e.getMessage()); } } } } void send() { if (selectionKey.isValid()) { status = PROCESSING; //置為執行中 workers.execute(this::sendWorker); //異步處理 selectionKey.interestOps(SelectionKey.OP_READ); //重新設置為讀 } } //讀入信息后的業務處理 private void readWorker() { // try { // Thread.sleep(5000L); // } catch (InterruptedException e) { // e.printStackTrace(); // } System.out.println(String.format("收到來自客戶端的消息: %s", new String(readBuffer.array()))); status = SEND; selectionKey.interestOps(SelectionKey.OP_WRITE); //把當前事件改為寫事件 this.selector.wakeup(); //喚醒阻塞在select的線程, // 因為該interestOps寫事件是放到子線程的, // select在該channel還是對read事件感興趣時又被調用 // ,因此如果不主動喚醒, // select可能並不會立刻select該讀就緒事件(在該例中,可能永遠不會被select到) } private void sendWorker() { try { sendBuffer.clear(); sendBuffer.put(String.format("我收到來自%s的信息辣:%s, 200ok;", socketChannel.getRemoteAddress(), new String(readBuffer.array())).getBytes()); sendBuffer.flip(); int count = socketChannel.write(sendBuffer); if (count < 0) { selectionKey.cancel(); socketChannel.close(); System.out.println("send時-------連接關閉"); } else { //再次切換到讀 status = READ; } } catch (IOException e) { System.err.println("異步處理send業務時發生異常!異常信息:" + e.getMessage()); selectionKey.cancel(); try { socketChannel.close(); } catch (IOException e1) { System.err.println("異步處理send業務關閉通道時發生異常!異常信息:" + e.getMessage()); } } } }
在絕大多數場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應用場景中,一個NIO線程負責監聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端並發連接,或者服務端需要對客戶端的握手信息進行安全認證,認證本身非常損耗性能。這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產生了第三種Reactor線程模型--主從Reactor多線程模型。
3、主從多線程模型
特點是:服務端用於接收客戶端連接的不再是1個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創建的SocketChannel注冊到I/O線程池(sub reactor線程池)的某個I/O線程上,由它負責SocketChannel的讀寫和編解碼工作。
Acceptor線程池只用於客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的I/O線程上,有I/O線程負責后續的I/O操作。
第三種模型比起第二種模型,是將Reactor分成兩部分,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網絡數據,對業務處理功能,其扔給worker線程池完成。通常,subReactor個數上可與CPU個數等同。
如下圖所示,
新增SubReactor部分:
public class SubReactor implements Runnable { private final Selector selector; private boolean register = false; //注冊開關表示,為什么要加這么個東西,可以參考Acceptor設置這個值那里的描述 private int num; //序號,也就是Acceptor初始化SubReactor時的下標 SubReactor(Selector selector, int num) { this.selector = selector; this.num = num; } @Override public void run() { while (!Thread.interrupted()) { System.out.println(String.format("%d號SubReactor等待注冊中...", num)); while (!Thread.interrupted() && !register) { try { if (selector.select() == 0) { continue; } } catch (IOException e) { e.printStackTrace(); } Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while (it.hasNext()) { dispatch((SelectionKey)it.next()); it.remove(); } } } } private void dispatch(SelectionKey key) { Runnable r = (Runnable) (key.attachment()); if (r != null) { r.run(); } } void registering(boolean register) { this.register = register; } }
重寫Acceptor部分:
public class Acceptor implements Runnable { private final ServerSocketChannel serverSocketChannel; private final int coreNum = Runtime.getRuntime().availableProcessors(); // 獲取CPU核心數 private final Selector[] selectors = new Selector[coreNum]; // 創建selector給SubReactor使用,個數為CPU核心數(如果不需要那么多可以自定義,畢竟這里會吞掉一個線程) private int next = 0; // 輪詢使用subReactor的下標索引 private SubReactor[] subReactors = new SubReactor[coreNum]; // subReactor private Thread[] threads = new Thread[coreNum]; // subReactor的處理線程 Acceptor(ServerSocketChannel serverSocketChannel) throws IOException { this.serverSocketChannel = serverSocketChannel; // 初始化 for (int i = 0; i < coreNum; i++) { selectors[i] = Selector.open(); subReactors[i] = new SubReactor(selectors[i], i); //初始化sub reactor threads[i] = new Thread(subReactors[i]); //初始化運行sub reactor的線程 threads[i].start(); //啟動(啟動后的執行參考SubReactor里的run方法) } } @Override public void run() { SocketChannel socketChannel; try { socketChannel = serverSocketChannel.accept(); // 阻塞獲取連接 if (socketChannel != null) { //輪詢reactors[] 處理接收到的請求 System.out.println(String.format("收到來自 %s 的連接",socketChannel.getRemoteAddress())); socketChannel.configureBlocking(false); // subReactors[next].registering(true); /*讓線下一次subReactors的while循環不去執行 selector.select,但是select我們是使用的不超時阻塞的方式, 所以下一步需要執行wakeup() * */ selectors[next].wakeup(); //使一個阻塞住的selector操作立即返回 SelectionKey selectionKey = socketChannel.register(selectors[next], SelectionKey.OP_READ); // 當前客戶端通道SocketChannel // 向selector[next]注冊一個讀事件,返回key selectors[next].wakeup(); /*使一個阻塞住的selector操作立即返回,這樣才能對剛剛注冊的SelectionKey感興趣 */ subReactors[next].registering(false); // 本次事件注冊完成后,需要再次觸發select的執行 // ,因此這里Restart要在設置回false(具體參考SubReactor里的run方法) selectionKey.attach(new AsyncHandler(socketChannel, selectors[next])); // 綁定Handler //輪詢負載 if (++next == selectors.length) { next = 0; //越界后重新分配 } } } catch (IOException e) { e.printStackTrace(); } } }
四、NioEventLoopGroup 與 Reactor 線程模型的對應
Netty的線程模型並發固定不變,通過在啟動輔助類中創建不同的EventLoopGroup實例並進行適當的參數配置,就可以支持上述三種Reactor線程模型。
/** * Netty單線程模型服務端代碼示例 * @param port */ public void bind(int port) { EventLoopGroup reactorGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(reactorGroup, reactorGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { reactorGroup.shutdownGracefully(); } }
/** * Netty多線程模型代碼 * @param port */ public void bind2(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
/** * Netty主從線程模型代碼 * @param port */ public void bind3(int port) { EventLoopGroup acceptorGroup = new NioEventLoopGroup(); NioEventLoopGroup ioGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup, ioGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("http-codec", new HttpServerCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); //后面代碼省略 } }); Channel ch = b.bind(port).sync().channel(); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); ioGroup.shutdownGracefully(); } }
說完Reacotr模型的三種形式,那么Netty是哪種呢?其實,我還有一種Reactor模型的變種沒說,那就是去掉線程池的第三種形式的變種,這也 是Netty NIO的默認模式。在實現上,Netty中的Boss類充當mainReactor,NioWorker類充當subReactor(默認 NioWorker的個數是Runtime.getRuntime().availableProcessors())。在處理新來的請求 時,NioWorker讀完已收到的數據到ChannelBuffer中,之后觸發ChannelPipeline中的ChannelHandler流。
Netty是事件驅動的,可以通過ChannelHandler鏈來控制執行流向。因為ChannelHandler鏈的執行過程是在 subReactor中同步的,所以如果業務處理handler耗時長,將嚴重影響可支持的並發數。這種模型適合於像Memcache這樣的應用場景,但 對需要操作數據庫或者和其他模塊阻塞交互的系統就不是很合適。Netty的可擴展性非常好,而像ChannelHandler線程池化的需要,可以通過在 ChannelPipeline中添加Netty內置的ChannelHandler實現類–ExecutionHandler實現,對使用者來說只是 添加一行代碼而已。對於ExecutionHandler需要的線程池模型,Netty提供了兩種可 選:1) MemoryAwareThreadPoolExecutor 可控制Executor中待處理任務的上限(超過上限時,后續進來的任務將被阻 塞),並可控制單個Channel待處理任務的上限;2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類,它還可以保證同一Channel中處理的事件流的順序性,這主要是控制事件在異步處 理模式下可能出現的錯誤的事件順序,但它並不保證同一Channel中的事件都在一個線程中執行(通常也沒必要)。一般來 說,OrderedMemoryAwareThreadPoolExecutor 是個很不錯的選擇。