多路多線程 reactor 模型的實現


 

/**
 * @Author Niuxy
 * @Date 2020/6/10 9:42 下午
 * @Description 多 selector 多線程的 NIO 服務端
 * 使用 NIO 時一定要摒棄 BIO 的阻塞思維,我們的代碼面向的是事件,而不是連接
 * 至於多次事件完成一個連接的情況,我們可以通過 attachment 記錄該連接上次事件處理的結果。
 * 上面做法的前提是一個連接只允許注冊一個感興趣的事件。
 */
public class CurrentReactor implements Runnable {
    // CPU 核心數
    int cpuNums = Runtime.getRuntime().availableProcessors() + 3;
    // selector 數
    int selectorNums;
    // 監聽讀寫事件的循環
    Selector[] selectorArr;
    // 事件處理線程池
    ExecutorService executorService;
    ServerSocketChannel serverSocketChannel;
    //當前使用的 selector 坐標
    Integer currentSelector;


    CurrentReactor(ServerSocketChannel serverSocketChannel) {
        this.serverSocketChannel = serverSocketChannel;
        //讀寫事件從 1 開始使用,第一個 selector 用於監聽連接事件
        currentSelector = 1;
        try {
            this.selectorNums = 3;
            selectorArr = new Selector[3];
            // 四核以上服務器較好
            executorService = Executors.newFixedThreadPool(cpuNums + selectorNums);
            for (int i = 0; i < selectorNums; i++) {
                selectorArr[i] = SelectorProvider.provider().openSelector();
            }
            // 注冊 server 連接事件
            SelectionKey key = serverSocketChannel.register(selectorArr[0], SelectionKey.OP_ACCEPT);
            key.attach(new Acceptor());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    //事件分發器
    private void dispatch(SelectionKey key) {
        if (key == null || key.attachment() == null) {
            return;
        }
        try {
            executorService.execute((Runnable) key.attachment());
        } catch (Exception e) {
            //任務提交異常則什么都不做,因為 NIO 的水平觸發機制會繼續觸發事件
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            Selector selector = selectorArr[i];
            executorService.execute(
                    () -> {
                        while (!Thread.currentThread().isInterrupted()) {
                            try {
                                selector.select(100);
                                Set<SelectionKey> keys = selector.selectedKeys();
                                Iterator iterator = keys.iterator();
                                while (iterator.hasNext()) {
                                    SelectionKey key = (SelectionKey) iterator.next();
                                    iterator.remove();
                                    dispatch(key);
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
            );
        }
    }

    /**
     * @Author Niuxy
     * @Date 2020/6/10 9:01 下午
     * @Description 連接事件處理器
     */
    class Acceptor implements Runnable {
        Selector handlerSelector;

        Acceptor() {
            // 采用輪詢的負載均衡策略選取 selector
            synchronized (currentSelector) {
                if (currentSelector == 3) {
                    currentSelector = 1;
                }
                this.handlerSelector = selectorArr[currentSelector];
                currentSelector++;
            }
        }

        @Override
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel == null) {
                    return;
                }
                socketChannel.configureBlocking(false);
                SelectionKey key = socketChannel.register(this.handlerSelector, SelectionKey.OP_READ);
                //報文接收策略,與連接一一綁定
                MessageHandler messageHandler = new PrintMessageHandlerImpl();
                // register 與 select 方法競爭鎖,防止 register 被 select 阻塞
                this.handlerSelector.wakeup();
                ReciveRegister reciveRegister = new HLRegisterImpl(2, messageHandler);
                //注冊 key 的同時 將事件處理的 "回調" 函數綁定到 key 上
                key.attach(new Handler(socketChannel, key, reciveRegister));
            } catch (ClosedChannelException ce) {
                ce.printStackTrace();
                //to do
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * @Author Niuxy
     * @Date 2020/6/10 8:50 下午
     * @Description 就緒讀寫事件處理器。粗暴的將方法全部加鎖,一個連接不應該有多個線程同時處理,但
     * Reactor 模式下不同的事件提交線程池后可能造成多個線程處理同一個鏈接
     * 不能依靠 key 的 Readable 或 Writeable 狀態決定當前是讀是寫,讀寫應當由完整的請求進行分割,一讀一寫,再處理下次請求
     * 因此需要自己維護讀寫狀態位
     */
    class Handler implements Runnable {
        public static final int READING = 0, WRITING = 1;
        volatile SocketChannel socketChannel;
        volatile SelectionKey key;
        /**
         * @Author Niuxy
         * @Date 2020/6/2 9:29 下午
         * @Description 在響應上一個請求前,我們不希望處理下一個請求,因此在 Handler 維護一個狀態位,標識目前應當
         * 處理讀事件還是寫事件
         * 我們必須保證接收和回復的順序性,保證客戶端可以對響應做出正確的處理
         * 當然也有其它的處理方式,我們將響應數據裝入一個有序隊列,並順序的處理這些響應。或者通過令牌將請求和響應
         * 進行對應。
         */
        int state = READING;
        ReciveRegister reciveRegister;
        String readResult = null;
        ByteBuffer writeBuffer = null;

        Handler(SocketChannel channel, SelectionKey key, ReciveRegister reciveRegister) {
            /**
             * @Author Niuxy
             * @Date 2020/6/4 9:39 下午
             * @Description 重要!必須保證構造方法與其它方法的互斥
             *              否則可能造成構造方法沒有執行完,其它線程已開始執行該對象的其它方法
             *              場景不容易復現,但在大劑量請求到達時 Reactor 模式中時很容易出現
             */
            synchronized (this) {
                this.socketChannel = channel;
                this.key = key;
                this.reciveRegister = reciveRegister;
            }
        }

        @Override
        public synchronized void run() {
            try {
                if (state == READING) {
                    read();
                } else {
                    write();
                }
            } catch (RuntimeException rex) {
                throw rex;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        private synchronized void read() throws Exception {
            String re = reciveRegister.doRecive(socketChannel);
            if (re != null && re != "") {
                readResult = re;
                state = WRITING;
                key.interestOps(SelectionKey.OP_WRITE);
            }
        }

        private synchronized void write() throws IOException {
            if (this.readResult == null || readResult == "") {
                return;
            }
            //如果不是第一次觸發寫事件,接着上次的寫
            if (writeBuffer == null) {
                writeBuffer = ByteBuffer.wrap(this.readResult.getBytes());
            }

            //該循環處理發送緩沖區處理速度小於網卡發送速度,無法一次性將 buffer 中數據寫入發送緩沖區的情況
            socketChannel.write(writeBuffer);
            if (writeBuffer.position() != writeBuffer.limit()) {
                return;
            }
            writeBuffer = null;
            readResult = null;
            state = READING;
            //寫完將興趣移除,否則會將 CPU 跑滿
            key.interestOps(SelectionKey.OP_READ);
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM