netty--NioEventLoop滴干活


netty是最近項目要用到的nio框架,找了各種資料,發現稱贊它的有點多,所以決定用它:其實也就二選一嘛,mina或netty或自己寫。對於mina,也不熟,不過看各種介紹,貌似netty干活還是很不錯的,尤其是最新的4.x和5.x重構后,且使用結構清晰就先了解了解了。


首先要把應用跑起來啦(官網的例子比較多),我這是一個關於mqtt的一個例子:

 

 1 m_bossGroup = new NioEventLoopGroup();
 2         m_workerGroup = new NioEventLoopGroup();
 3         
 4         final NettyMQTTHandler handler = new NettyMQTTHandler();
 5         handler.setMessaging(messaging);
 6 
 7         ServerBootstrap b = new ServerBootstrap();
 8             b.group(m_bossGroup, m_workerGroup)
 9              .channel(NioServerSocketChannel.class) 
10              .childHandler(new ChannelInitializer<SocketChannel>() { 
11                  @Override
12                  public void initChannel(SocketChannel ch) throws Exception {
13                     ChannelPipeline pipeline = ch.pipeline();
14                     //pipeline.addFirst("metrics", new BytesMetricsHandler(m_metricsCollector));
15                     pipeline.addFirst("idleStateHandler", new IdleStateHandler(0, 0, Constants.DEFAULT_CONNECT_TIMEOUT));
16                     pipeline.addAfter("idleStateHandler", "idleEventHandler", new MoquetteIdleTimoutHandler());
17                     //pipeline.addLast("logger", new LoggingHandler("Netty", LogLevel.ERROR));
18                     pipeline.addLast("decoder", new MQTTDecoder());
19                     pipeline.addLast("encoder", new MQTTEncoder());
20                     pipeline.addLast("metrics", new MessageMetricsHandler(m_metricsCollector));
21                     pipeline.addLast("handler", handler);
22                  }
23              })
24              .option(ChannelOption.SO_BACKLOG, 128)
25              .option(ChannelOption.SO_REUSEADDR, true)
26              .childOption(ChannelOption.SO_KEEPALIVE, true); 
27         try {    
28             // Bind and start to accept incoming connections.
29             ChannelFuture f = b.bind(Constants.PORT);
30             LOG.info("Server binded");
31             f.sync();
32         } catch (InterruptedException ex) {
33             LOG.error(null, ex);
34         }

再回想下,我們自己寫serversocket的時候是怎么寫的呢(這是一個笨拙的實例代碼):

    ServerSocket socket;
        channel = ServerSocketChannel.open(); // 打開通道 
        socket = channel.socket(); //得到與通到相關的socket對象 
        socket.bind(new InetSocketAddress(port)); //將scoket榜定在制定的端口上 
        //配置通到使用非阻塞模式,在非阻塞模式下,可以編寫多道程序同時避免使用復雜的多線程 
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_ACCEPT);
        try {
            while (true) {
                this.selector.select();
                Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    this.handleKey(key);

                }
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }

原理還是那些,channel.open(),然后register key,然后遍歷,再然后才進行handleKey()的干活。

那netty的寫法為什么那么瀟灑呢,懷着這個莫名的疑問,我先不管它的結構什么的,直接進行search,發現了這么個東東:

1   NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
2         super(parent, threadFactory, false);
3         if (selectorProvider == null) {
4             throw new NullPointerException("selectorProvider");
5         }
6         provider = selectorProvider;
7         selector = openSelector();
8     }

其中第8行從名稱上來看,有點點意思了,往下看:

1  private Selector openSelector() {
2         final Selector selector;
3         try {
4             selector = provider.openSelector();

其中的provider就是我們熟悉的:java.nio.channels.spi.SelectorProvider類。

所以這個就是做了selector.open的工作。

接下來能看到NioEventLoop:

1     protected void run() {
2         for (;;) {
3             oldWakenUp = wakenUp.getAndSet(false);
4             try {
5                 if (hasTasks()) {
6                     selectNow();
7                 } else {
8                     select();

再繼續看,該類中處理的selectedKey:

 1         final NioUnsafe unsafe = ch.unsafe();
 2         if (!k.isValid()) {
 3             // close the channel if the key is not valid anymore
 4             unsafe.close(unsafe.voidPromise());
 5             return;
 6         }
 7 
 8         try {
 9             int readyOps = k.readyOps();
10             if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
11                 unsafe.read();
12                 if (!ch.isOpen()) {
13                     // Connection already closed - no need to handle write.
14                     return;
15                 }
16             }
17             if ((readyOps & SelectionKey.OP_WRITE) != 0) {
18                 // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
19                 ch.unsafe().forceFlush();
20             }
21             if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
22                 // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
23                 // See https://github.com/netty/netty/issues/924
24                 int ops = k.interestOps();
25                 ops &= ~SelectionKey.OP_CONNECT;
26                 k.interestOps(ops);
27 
28                 unsafe.finishConnect();
29             }
30         } catch (CancelledKeyException e) {
31             unsafe.close(unsafe.voidPromise());
32         }
33     

現在明白了吧,其實netty也是走這么一套邏輯。

然后再網上看,邏輯是這樣:

NioEventLoopGroup extends MultithreadEventExecutorGroup,其初始化了n個單線程的線程池(children = new SingleThreadEventExecutor[nThreads];)

每個單線程的對象child[i]=NioEventLoop對象,每個NioEventLoop有一個Selector字段。

其run方法是該group都需要干活的具體業務邏輯代碼。

后續再加上別的類說明。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM