源碼分析netty服務器創建過程vs java nio服務器創建


1.Java NIO服務端創建

首先,我們通過一個時序圖來看下如何創建一個NIO服務端並啟動監聽,接收多個客戶端的連接,進行消息的異步讀寫。

 

示例代碼(參考文獻【2】):

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set;

/**
 * User: mihasya
 * Date: Jul 25, 2010
 * Time: 9:09:03 AM
 */
public class JServer {
    public static void main (String[] args) {
        ServerSocketChannel sch = null;
        Selector sel = null;

        try {
            // setup the socket we're listening for connections on.
            InetSocketAddress addr = new InetSocketAddress(8400);
            sch = ServerSocketChannel.open();
            sch.configureBlocking(false);
            sch.socket().bind(addr);
            // setup our selector and register the main socket on it 
            sel = Selector.open();
            sch.register(sel, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            System.out.println("Couldn't setup server socket");
            System.out.println(e.getMessage());
            System.exit(1);
        }

        // fire up the listener thread, pass it our selector
        ListenerThread listener = new ListenerThread(sel);
        listener.run();
    }

/*
 * the thread is completely unnecessary, it could all just happen
 * in main()
 */
class ListenerThread extends Thread {
    Selector sel = null;
    ListenerThread(Selector sel) {
        this.sel = sel;
    }

    public void run() {
        while (true) {
            
            // our canned response for now
            ByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());
            try {
                // loop over all the sockets that are ready for some activity
                while (this.sel.select() > 0) {
                    Set keys = this.sel.selectedKeys();
                    Iterator i = keys.iterator();
                    while (i.hasNext()) {
                        SelectionKey key = (SelectionKey)i.next();
                        if (key.isAcceptable()) {
                            // this means that a new client has hit the port our main
                            // socket is listening on, so we need to accept the  connection
                            // and add the new client socket to our select pool for reading
                            // a command later
                            System.out.println("Accepting connection!");
                            // this will be the ServerSocketChannel we initially registered
                            // with the selector in main()
                            ServerSocketChannel sch = (ServerSocketChannel)key.channel();
                            SocketChannel ch = sch.accept();
                            ch.configureBlocking(false);
                            ch.register(this.sel, SelectionKey.OP_READ);
                        } else if (key.isReadable()) {
                            // one of our client sockets has received a command and
                            // we're now ready to read it in
                            System.out.println("Accepting command!");                            
                            SocketChannel ch = (SocketChannel)key.channel();
                            ByteBuffer buf = ByteBuffer.allocate(200);
                            ch.read(buf);
                            buf.flip();
                            Charset charset = Charset.forName("UTF-8");
                            CharsetDecoder decoder = charset.newDecoder();
                            CharBuffer cbuf = decoder.decode(buf);
                            System.out.print(cbuf.toString());
                            // re-register this socket with the selector, this time
                            // for writing since we'll want to write something to it
                            // on the next go-around
                            ch.register(this.sel, SelectionKey.OP_WRITE);
                        } else if (key.isWritable()) {
                            // we are ready to send a response to one of the client sockets
                            // we had read a command from previously
                            System.out.println("Sending response!");
                            SocketChannel ch = (SocketChannel)key.channel();
                            ch.write(resp);
                            resp.rewind();
                            // we may get another command from this guy, so prepare
                            // to read again. We could also close the channel, but
                            // that sort of defeats the whole purpose of doing async
                            ch.register(this.sel, SelectionKey.OP_READ);
                        }
                        i.remove();
                    }
                }
            } catch (IOException e) {
                System.out.println("Error in poll loop");
                System.out.println(e.getMessage());
                System.exit(1);
            }
        }
    }
}
}

從上面的代碼可以看出java nio的通用步驟:

1.打開ServerSocketChannel,用於監聽客戶端的連接,它是所有客戶端連接的父通道,綁定監聽端口,設置客戶端連接方式為非阻塞模式。

2.打開多路復用器並啟動服務端監聽線程,將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監聽ACCEPT狀態。

3.多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手后,與客戶端建立物理鏈路。

 

2. Netty服務端創建

 2.1 打開ServerSocketChannel

            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)

創建一個ServerSocketChannel的過程:

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }

調用newSocket方法:

    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }

其中的provider.openServerSocketChannel()就是java nio的實現。設置非阻塞模式包含在父類中:

/**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

 

 

2.2 打開多路復用器過程

NioEventLoop負責調度和執行Selector輪詢操作,選擇准備就緒的Channel集合,相關代碼如下:

@Override
    protected void run() {
        for (;;) {
            boolean oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select(oldWakenUp);

                    // 'wakenUp.compareAndSet(false, true)' is always evaluated
                    // before calling 'selector.wakeup()' to reduce the wake-up
                    // overhead. (Selector.wakeup() is an expensive operation.)
                    //
                    // However, there is a race condition in this approach.
                    // The race condition is triggered when 'wakenUp' is set to
                    // true too early.
                    //
                    // 'wakenUp' is set to true too early if:
                    // 1) Selector is waken up between 'wakenUp.set(false)' and
                    //    'selector.select(...)'. (BAD)
                    // 2) Selector is waken up between 'selector.select(...)' and
                    //    'if (wakenUp.get()) { ... }'. (OK)
                    //
                    // In the first case, 'wakenUp' is set to true and the
                    // following 'selector.select(...)' will wake up immediately.
                    // Until 'wakenUp' is set to false again in the next round,
                    // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                    // any attempt to wake up the Selector will fail, too, causing
                    // the following 'selector.select(...)' call to block
                    // unnecessarily.
                    //
                    // To fix this problem, we wake up the selector again if wakenUp
                    // is true immediately after selector.select(...).
                    // It is inefficient in that it wakes up the selector for both
                    // the first case (BAD - wake-up required) and the second case
                    // (OK - no wake-up required).

                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    processSelectedKeys();
                    runAllTasks();
                } else {
                    final long ioStartTime = System.nanoTime();

                    processSelectedKeys();

                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }

                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        break;
                    }
                }
            } catch (Throwable t) {
                logger.warn("Unexpected exception in the selector loop.", t);

                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

2.2.1 綁定處理的key

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

以processSelectedKeysPlain為例:

 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
        // check if the set is empty and if so just return to not create garbage by
        // creating a new Iterator every time even if there is nothing to process.
        // See https://github.com/netty/netty/issues/597
        if (selectedKeys.isEmpty()) {
            return;
        }

        Iterator<SelectionKey> i = selectedKeys.iterator();
        for (;;) {
            final SelectionKey k = i.next();
            final Object a = k.attachment();
            i.remove();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (!i.hasNext()) {
                break;
            }

            if (needsToSelectAgain) {
                selectAgain();
                selectedKeys = selector.selectedKeys();

                // Create the iterator again to avoid ConcurrentModificationException
                if (selectedKeys.isEmpty()) {
                    break;
                } else {
                    i = selectedKeys.iterator();
                }
            }
        }
    }

 

 2.3 綁定端口,接收請求:

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(PORT).sync();

調用bind程序

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.executor = channel.eventLoop();
                    }
                    doBind0(regFuture, channel, localAddress, promise);
                }
            });
            return promise;
        }
    }

最終的綁定由dobind0來完成

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new OneTimeTask() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

具體實現:

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

 

參考文獻

【1】http://www.infoq.com/cn/articles/netty-server-create

【2】https://github.com/mihasya/sample-java-nio-server/blob/master/src/JServer.java


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM