具體實現原理和模型圖,請參考:Reactor模型介紹
單Reactor單線程實現,請參考:基於NIO-Reactor的介紹和單Reactor單線程模型的實現
單Reactor多線程模型實現,請參考:基於NIO-單Reactor多線程模型的實現
在單Reactor多線程模型,只有一個selector,selector負責accpet,read,write事件的維護。
而主從Reactor多線程模型,將使用兩個selector,一個主selector專門維護accept事件,當接收到accept事件,將該連接交給從selector,從selector維護該連接的read和write事件。
主從selector的方式,將連接和數據處理完全分開維護,將大大提高並發量。
主從Reactor的實現:
/** * 主從Reactor多線程模型 */ public class MainAndSubReactorMultiThreadMode { public static void main(String[] args) { /** * 初始化一個線程池,然后創建一個主Reactor,並加入一個從Reactor. */ ThreadPool.getPool().init(3); new MainReactor(8089).addSub(new SubReactor()).run(); } }
/** * 主Reactor */ class MainReactor{ /** * 維護一個從Reactor */ private SubReactor subReactor; private int port; private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MainReactor(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); SelectionKey sk = servChannel.register(selector, SelectionKey.OP_ACCEPT); stop=false; this.port=port; } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /** * 添加子Reactor * @param subReactor * @return */ public MainReactor addSub(SubReactor subReactor){ this.subReactor=subReactor; this.subReactor.run(); return this; } public void run() { System.out.println("主reactor開始啟動了,監聽端口:"+port+"......."); while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { disptach(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } } private void disptach(SelectionKey key) { if(key.isValid()){ /** * 主Reactor只關心Accept事件 */ if(key.isAcceptable()){ new MultiAcceptor(key).addSub(this.subReactor).run(); } //如果未使用了從Reactor if(this.subReactor==null){ if (key.isReadable()){ new MultiReadHandler(key).run(); } if(key.isWritable()){ new MultiWriteHandler(key).run(); } } } } }
/** * 從Reactor */ class SubReactor { private Selector selector; private volatile boolean stop; public SubReactor() { try { selector = SelectorProvider.provider().openSelector(); stop=false; } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /** * 將主Reactor中的Channel注冊到從Reactor中的selector * @param sc */ public void register(SocketChannel sc){ try { sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { e.printStackTrace(); } } public void run() { ThreadPool.getPool().submit(new Runnable() { @Override public void run() { System.out.println("從reactor開始啟動了。。。。。"); while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { disptach(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } } }); } private void disptach(SelectionKey key) { /** * 從Reactor只關心讀和寫事件 */ if(key.isValid()){ if (key.isReadable()){ new MultiReadHandler(key).run(); } if(key.isWritable()){ new MultiWriteHandler(key).run(); } } } }
class MultiAcceptor { private SubReactor subReactor; private SelectionKey selectionKey; public MultiAcceptor(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public MultiAcceptor addSub(SubReactor subReactor){ this.subReactor=subReactor; return this; } public void run() { try { ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); if(subReactor==null){ Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }else { System.out.println("accept"); subReactor.register(sc); } }catch (Exception e){ e.printStackTrace(); } } }
class MultiReadHandler { private SelectionKey selectionKey; public MultiReadHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用線程池,異步處理讀請求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doRead(sc); } }); //處理完讀請求,將通道注冊為寫 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE); }catch (Exception e){ e.printStackTrace(); } } private void doRead(SocketChannel ssc) { System.out.println("讀取數據,然后做一些數據處理"); } }
class MultiWriteHandler { private SelectionKey selectionKey; public MultiWriteHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } private void doWrite(SocketChannel sc) { System.out.println("處理寫。。。"); } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用線程池,異步處理寫請求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doWrite(sc); } }); //寫完后,將通道注冊為讀 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }catch (Exception e){ e.printStackTrace(); } } }
/** * 示例-線程池 */ class ThreadPool{ public static final ThreadPool pool = new ThreadPool(); private boolean init=false; private static ExecutorService executorService; private ThreadPool(){}; public synchronized void init(int size){ if(!init){ executorService=Executors.newFixedThreadPool(size); init=true; }else { System.out.println("the thread pool had inited"); } } public static ThreadPool getPool(){ return pool; } public void submit(Runnable runnable){ if(init){ executorService.submit(runnable); }else { throw new RuntimeException("the thread pool is not inited"); } } }