netty是最近項目要用到的nio框架,找了各種資料,發現稱贊它的有點多,所以決定用它:其實也就二選一嘛,mina或netty或自己寫。對於mina,也不熟,不過看各種介紹,貌似netty干活還是很不錯的,尤其是最新的4.x和5.x重構后,且使用結構清晰就先了解了解了。
首先要把應用跑起來啦(官網的例子比較多),我這是一個關於mqtt的一個例子:
1 m_bossGroup = new NioEventLoopGroup(); 2 m_workerGroup = new NioEventLoopGroup(); 3 4 final NettyMQTTHandler handler = new NettyMQTTHandler(); 5 handler.setMessaging(messaging); 6 7 ServerBootstrap b = new ServerBootstrap(); 8 b.group(m_bossGroup, m_workerGroup) 9 .channel(NioServerSocketChannel.class) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override 12 public void initChannel(SocketChannel ch) throws Exception { 13 ChannelPipeline pipeline = ch.pipeline(); 14 //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector)); 15 pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT)); 16 pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler()); 17 //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR)); 18 pipeline.addLast("decoder", new MQTTDecoder()); 19 pipeline.addLast("encoder", new MQTTEncoder()); 20 pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector)); 21 pipeline.addLast("handler", handler); 22 } 23 }) 24 .option(ChannelOption.SO_BACKLOG, 128) 25 .option(ChannelOption.SO_REUSEADDR, true) 26 .childOption(ChannelOption.SO_KEEPALIVE, true); 27 try { 28 // Bind and start to accept incoming connections. 29 ChannelFuture f = b.bind(Constants.PORT); 30 LOG.info("Server binded"); 31 f.sync(); 32 } catch (InterruptedException ex) { 33 LOG.error(null, ex); 34 }
再回想下,我們自己寫serversocket的時候是怎么寫的呢(這是一個笨拙的實例代碼):
ServerSocket socket; channel = ServerSocketChannel.open(); // 打開通道 socket = channel.socket(); //得到與通到相關的socket對象 socket.bind(new InetSocketAddress(port)); //將scoket榜定在制定的端口上 //配置通到使用非阻塞模式,在非阻塞模式下,可以編寫多道程序同時避免使用復雜的多線程 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_ACCEPT); try { while (true) { this.selector.select(); Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); this.handleKey(key); } } } catch (IOException ex) { ex.printStackTrace(); }
原理還是那些,channel.open(),然后register key,然后遍歷,再然后才進行handleKey()的干活。
那netty的寫法為什么那么瀟灑呢,懷着這個莫名的疑問,我先不管它的結構什么的,直接進行search,發現了這么個東東:
1 NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { 2 super(parent, threadFactory, false); 3 if (selectorProvider == null) { 4 throw new NullPointerException("selectorProvider"); 5 } 6 provider = selectorProvider; 7 selector = openSelector(); 8 }
其中第8行從名稱上來看,有點點意思了,往下看:
1 private Selector openSelector() { 2 final Selector selector; 3 try { 4 selector = provider.openSelector();
其中的provider就是我們熟悉的:java.nio.channels.spi.SelectorProvider類。
所以這個就是做了selector.open的工作。
接下來能看到NioEventLoop:
1 protected void run() { 2 for (;;) { 3 oldWakenUp = wakenUp.getAndSet(false); 4 try { 5 if (hasTasks()) { 6 selectNow(); 7 } else { 8 select();
再繼續看,該類中處理的selectedKey:
1 final NioUnsafe unsafe = ch.unsafe(); 2 if (!k.isValid()) { 3 // close the channel if the key is not valid anymore 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7 8 try { 9 int readyOps = k.readyOps(); 10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 11 unsafe.read(); 12 if (!ch.isOpen()) { 13 // Connection already closed - no need to handle write. 14 return; 15 } 16 } 17 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 18 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write 19 ch.unsafe().forceFlush(); 20 } 21 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 22 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking 23 // See https://github.com/netty/netty/issues/924 24 int ops = k.interestOps(); 25 ops &= ~SelectionKey.OP_CONNECT; 26 k.interestOps(ops); 27 28 unsafe.finishConnect(); 29 } 30 } catch (CancelledKeyException e) { 31 unsafe.close(unsafe.voidPromise()); 32 } 33
現在明白了吧,其實netty也是走這么一套邏輯。
然后再網上看,邏輯是這樣:
NioEventLoopGroup extends MultithreadEventExecutorGroup,其初始化了n個單線程的線程池(children = new SingleThreadEventExecutor[nThreads];)
每個單線程的對象child[i]=NioEventLoop對象,每個NioEventLoop有一個Selector字段。
其run方法是該group都需要干活的具體業務邏輯代碼。
后續再加上別的類說明。