/** * @Author Niuxy * @Date 2020/6/10 9:42 下午 * @Description 多 selector 多線程的 NIO 服務端 * 使用 NIO 時一定要摒棄 BIO 的阻塞思維,我們的代碼面向的是事件,而不是連接 * 至於多次事件完成一個連接的情況,我們可以通過 attachment 記錄該連接上次事件處理的結果。 * 上面做法的前提是一個連接只允許注冊一個感興趣的事件。 */ public class CurrentReactor implements Runnable { // CPU 核心數 int cpuNums = Runtime.getRuntime().availableProcessors() + 3; // selector 數 int selectorNums; // 監聽讀寫事件的循環 Selector[] selectorArr; // 事件處理線程池 ExecutorService executorService; ServerSocketChannel serverSocketChannel; //當前使用的 selector 坐標 Integer currentSelector; CurrentReactor(ServerSocketChannel serverSocketChannel) { this.serverSocketChannel = serverSocketChannel; //讀寫事件從 1 開始使用,第一個 selector 用於監聽連接事件 currentSelector = 1; try { this.selectorNums = 3; selectorArr = new Selector[3]; // 四核以上服務器較好 executorService = Executors.newFixedThreadPool(cpuNums + selectorNums); for (int i = 0; i < selectorNums; i++) { selectorArr[i] = SelectorProvider.provider().openSelector(); } // 注冊 server 連接事件 SelectionKey key = serverSocketChannel.register(selectorArr[0], SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); } catch (Exception e) { e.printStackTrace(); } } //事件分發器 private void dispatch(SelectionKey key) { if (key == null || key.attachment() == null) { return; } try { executorService.execute((Runnable) key.attachment()); } catch (Exception e) { //任務提交異常則什么都不做,因為 NIO 的水平觸發機制會繼續觸發事件 e.printStackTrace(); } } @Override public void run() { for (int i = 0; i < 3; i++) { Selector selector = selectorArr[i]; executorService.execute( () -> { while (!Thread.currentThread().isInterrupted()) { try { selector.select(100); Set<SelectionKey> keys = selector.selectedKeys(); Iterator iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove(); dispatch(key); } } catch (Exception e) { e.printStackTrace(); } } } ); } } /** * @Author Niuxy * @Date 2020/6/10 9:01 下午 * @Description 連接事件處理器 */ class Acceptor implements Runnable { Selector handlerSelector; Acceptor() { // 采用輪詢的負載均衡策略選取 selector synchronized (currentSelector) { if (currentSelector == 3) { currentSelector = 1; } this.handlerSelector = selectorArr[currentSelector]; currentSelector++; } } @Override public void run() { try { SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel == null) { return; } socketChannel.configureBlocking(false); SelectionKey key = socketChannel.register(this.handlerSelector, SelectionKey.OP_READ); //報文接收策略,與連接一一綁定 MessageHandler messageHandler = new PrintMessageHandlerImpl(); // register 與 select 方法競爭鎖,防止 register 被 select 阻塞 this.handlerSelector.wakeup(); ReciveRegister reciveRegister = new HLRegisterImpl(2, messageHandler); //注冊 key 的同時 將事件處理的 "回調" 函數綁定到 key 上 key.attach(new Handler(socketChannel, key, reciveRegister)); } catch (ClosedChannelException ce) { ce.printStackTrace(); //to do } catch (Exception e) { e.printStackTrace(); } } } /** * @Author Niuxy * @Date 2020/6/10 8:50 下午 * @Description 就緒讀寫事件處理器。粗暴的將方法全部加鎖,一個連接不應該有多個線程同時處理,但 * Reactor 模式下不同的事件提交線程池后可能造成多個線程處理同一個鏈接 * 不能依靠 key 的 Readable 或 Writeable 狀態決定當前是讀是寫,讀寫應當由完整的請求進行分割,一讀一寫,再處理下次請求 * 因此需要自己維護讀寫狀態位 */ class Handler implements Runnable { public static final int READING = 0, WRITING = 1; volatile SocketChannel socketChannel; volatile SelectionKey key; /** * @Author Niuxy * @Date 2020/6/2 9:29 下午 * @Description 在響應上一個請求前,我們不希望處理下一個請求,因此在 Handler 維護一個狀態位,標識目前應當 * 處理讀事件還是寫事件 * 我們必須保證接收和回復的順序性,保證客戶端可以對響應做出正確的處理 * 當然也有其它的處理方式,我們將響應數據裝入一個有序隊列,並順序的處理這些響應。或者通過令牌將請求和響應 * 進行對應。 */ int state = READING; ReciveRegister reciveRegister; String readResult = null; ByteBuffer writeBuffer = null; Handler(SocketChannel channel, SelectionKey key, ReciveRegister reciveRegister) { /** * @Author Niuxy * @Date 2020/6/4 9:39 下午 * @Description 重要!必須保證構造方法與其它方法的互斥 * 否則可能造成構造方法沒有執行完,其它線程已開始執行該對象的其它方法 * 場景不容易復現,但在大劑量請求到達時 Reactor 模式中時很容易出現 */ synchronized (this) { this.socketChannel = channel; this.key = key; this.reciveRegister = reciveRegister; } } @Override public synchronized void run() { try { if (state == READING) { read(); } else { write(); } } catch (RuntimeException rex) { throw rex; } catch (Exception e) { e.printStackTrace(); } } private synchronized void read() throws Exception { String re = reciveRegister.doRecive(socketChannel); if (re != null && re != "") { readResult = re; state = WRITING; key.interestOps(SelectionKey.OP_WRITE); } } private synchronized void write() throws IOException { if (this.readResult == null || readResult == "") { return; } //如果不是第一次觸發寫事件,接着上次的寫 if (writeBuffer == null) { writeBuffer = ByteBuffer.wrap(this.readResult.getBytes()); } //該循環處理發送緩沖區處理速度小於網卡發送速度,無法一次性將 buffer 中數據寫入發送緩沖區的情況 socketChannel.write(writeBuffer); if (writeBuffer.position() != writeBuffer.limit()) { return; } writeBuffer = null; readResult = null; state = READING; //寫完將興趣移除,否則會將 CPU 跑滿 key.interestOps(SelectionKey.OP_READ); } } }