具体实现原理和模型图,请参考:Reactor模型介绍
单Reactor单线程实现,请参考:基于NIO-Reactor的介绍和单Reactor单线程模型的实现
单Reactor多线程模型实现,请参考:基于NIO-单Reactor多线程模型的实现
在单Reactor多线程模型,只有一个selector,selector负责accpet,read,write事件的维护。
而主从Reactor多线程模型,将使用两个selector,一个主selector专门维护accept事件,当接收到accept事件,将该连接交给从selector,从selector维护该连接的read和write事件。
主从selector的方式,将连接和数据处理完全分开维护,将大大提高并发量。
主从Reactor的实现:
/** * 主从Reactor多线程模型 */ public class MainAndSubReactorMultiThreadMode { public static void main(String[] args) { /** * 初始化一个线程池,然后创建一个主Reactor,并加入一个从Reactor. */ ThreadPool.getPool().init(3); new MainReactor(8089).addSub(new SubReactor()).run(); } }
/** * 主Reactor */ class MainReactor{ /** * 维护一个从Reactor */ private SubReactor subReactor; private int port; private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; public MainReactor(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); SelectionKey sk = servChannel.register(selector, SelectionKey.OP_ACCEPT); stop=false; this.port=port; } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /** * 添加子Reactor * @param subReactor * @return */ public MainReactor addSub(SubReactor subReactor){ this.subReactor=subReactor; this.subReactor.run(); return this; } public void run() { System.out.println("主reactor开始启动了,监听端口:"+port+"......."); while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { disptach(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } } private void disptach(SelectionKey key) { if(key.isValid()){ /** * 主Reactor只关心Accept事件 */ if(key.isAcceptable()){ new MultiAcceptor(key).addSub(this.subReactor).run(); } //如果未使用了从Reactor if(this.subReactor==null){ if (key.isReadable()){ new MultiReadHandler(key).run(); } if(key.isWritable()){ new MultiWriteHandler(key).run(); } } } } }
/** * 从Reactor */ class SubReactor { private Selector selector; private volatile boolean stop; public SubReactor() { try { selector = SelectorProvider.provider().openSelector(); stop=false; } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /** * 将主Reactor中的Channel注册到从Reactor中的selector * @param sc */ public void register(SocketChannel sc){ try { sc.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { e.printStackTrace(); } } public void run() { ThreadPool.getPool().submit(new Runnable() { @Override public void run() { System.out.println("从reactor开始启动了。。。。。"); while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { disptach(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } } }); } private void disptach(SelectionKey key) { /** * 从Reactor只关心读和写事件 */ if(key.isValid()){ if (key.isReadable()){ new MultiReadHandler(key).run(); } if(key.isWritable()){ new MultiWriteHandler(key).run(); } } } }
class MultiAcceptor { private SubReactor subReactor; private SelectionKey selectionKey; public MultiAcceptor(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public MultiAcceptor addSub(SubReactor subReactor){ this.subReactor=subReactor; return this; } public void run() { try { ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); if(subReactor==null){ Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }else { System.out.println("accept"); subReactor.register(sc); } }catch (Exception e){ e.printStackTrace(); } } }
class MultiReadHandler { private SelectionKey selectionKey; public MultiReadHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用线程池,异步处理读请求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doRead(sc); } }); //处理完读请求,将通道注册为写 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_WRITE); }catch (Exception e){ e.printStackTrace(); } } private void doRead(SocketChannel ssc) { System.out.println("读取数据,然后做一些数据处理"); } }
class MultiWriteHandler { private SelectionKey selectionKey; public MultiWriteHandler(SelectionKey selectionKey) { this.selectionKey = selectionKey; } private void doWrite(SocketChannel sc) { System.out.println("处理写。。。"); } public void run() { try { final SocketChannel sc = (SocketChannel) selectionKey.channel(); //使用线程池,异步处理写请求 ThreadPool.getPool().submit(new Runnable() { @Override public void run() { doWrite(sc); } }); //写完后,将通道注册为读 Selector selector = selectionKey.selector(); SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); }catch (Exception e){ e.printStackTrace(); } } }
/** * 示例-线程池 */ class ThreadPool{ public static final ThreadPool pool = new ThreadPool(); private boolean init=false; private static ExecutorService executorService; private ThreadPool(){}; public synchronized void init(int size){ if(!init){ executorService=Executors.newFixedThreadPool(size); init=true; }else { System.out.println("the thread pool had inited"); } } public static ThreadPool getPool(){ return pool; } public void submit(Runnable runnable){ if(init){ executorService.submit(runnable); }else { throw new RuntimeException("the thread pool is not inited"); } } }