Netty源碼看這篇就夠了


前言

后面打算開始擼其他框架源碼,而Netty對Java NIO的一層封裝,提供了一套簡單易用的API,經常被其他框架拿來用,我先花了點時間研究了下。這里整理下對源碼的解讀,以及對幾個關鍵對象的介紹。分析了之前兩篇流水賬式的源碼分析的不足,這次嘗試聚焦幾個不同重點進行分析。

個人netty注釋版本:https://gitee.com/Nortyr/netty

原netty地址:https://github.com/netty/netty

看完能收獲什么

  • Java網絡編程介紹
  • 一個簡單的EchoServerDemo
  • Bootstrap
  • Channel
  • ChannelPipeline & ChannelHandler
  • EventLoopGroup & EventLoop
  • ChannelFuture

Java網絡編程介紹

BIO模型。龐大的線程消耗,消費消息如果很漫長,這個服務就是個災難。

public void server(int port) throws IOException{
    final ServerSocket socket=new ServerSocket(port);
    for (;;){
        //接受連接
        final Socket clientSock = socket.accept();
        System.out.println("Accept connection from"+ clientSock);
        //創建一個線程來處理連接
        new Thread(new Runnable() {
            @Override
            public void run() {
                OutputStream out;
                try {
                    out=clientSock.getOutputStream();
                    ... doSomeThing...
                    //將消息寫給已連接的客戶端
                    out.write("Hello world".getBytes(Charset.forName("UTF-8")));
                    out.flush();
                    clientSock.close();
                }
                ...略...
            }
        }).start();
    }
    
}

故此Java設計出了NIO,這里找到個Doug Lea大神的一篇 NIOppt http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 感興趣的可以看一下,十分精簡。下面這部分會結合這個ppt進行講解,看完這個ppt的可以直接略過到下一個部分。

public static void main(String[] args) throws IOException {
    //創建ServerSocketChannel,處理接入連接
    ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
    //創建Selector
    Selector selector=Selector.open();
    //設置是否為非阻塞
    serverSocketChannel.configureBlocking(false);
    //創建注冊channel進selector的創立連接時間
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    //綁定端口號
    serverSocketChannel.socket().bind(new InetSocketAddress(8080));
    while (true){
        if(serverSocketChannel.isOpen()){
            // 通過 Selector 選擇 Channel
            int selectNums = selector.select(1000L);
            if (selectNums == 0) {
                continue;
            }
            // 遍歷可選擇的 Channel 的 SelectionKey 集合
            for (SelectionKey selectKey:selector.selectedKeys()) {
                // 忽略無效的 SelectionKey
                if (!selectKey.isValid()) {
                    continue;
                }
                //新建立的連接
                if(selectKey.isAcceptable()){
                    //獲取新連接創建的channel
                    SocketChannel socketChannel= ((ServerSocketChannel) selectKey.channel()).accept();
                    if(socketChannel!=null){
                        //設置為非阻塞
                        socketChannel.configureBlocking(false);
                        //注冊進selector
                        socketChannel.register(selector,SelectionKey.OP_READ);
                    }
                }
                //處理讀時間
                if(selectKey.isReadable()){
                    SocketChannel socketChannel= (SocketChannel) selectKey.channel();
                    if(socketChannel!=null){
                        //讀取數據
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int bytesRead = socketChannel.read(buffer);
                        if(bytesRead==-1){
                            socketChannel.register(selector,0);
                            socketChannel.close();
                        }else{
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            System.arraycopy(buffer.array(), buffer.position(), bytes, 0, buffer.remaining());
                            System.out.println(new String(bytes, "UTF-8"));
                        }
                    }
                }
            }
        }
    }
}

Java NIO的幾個核心api

  • Channels
    • 與支持非阻塞讀取的文件,socket等建立連接。
  • Buffers
    • 本質是一塊內存,用於和NIO通道進行交互。
  • Selectors
    • 把Channel和需要的事件注冊到Selector上面,告訴一組channel中的哪一個有IO事件。
  • SelectionKeys
    • 維護IO事件狀態和綁定

幾個核心api的關系

  • Channel和Buffer
    • 2個交互關系如圖所示
      image.png
  • Selector/Channel/SelectionKey
    • 一個Selector可以監聽多個Channel
    • 一個Selector和Channel的綁定關系為SelectionKey

Channel

image.png

這里我們以NioServerSocketChannel為例,看一下Channel

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    ChannelId id();
    EventLoop eventLoop();
    Channel parent();
    ChannelConfig config();
    boolean isOpen();
    boolean isRegistered();
    boolean isActive();
    ChannelMetadata metadata();
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    ChannelFuture closeFuture();
    boolean isWritable();
    long bytesBeforeUnwritable();
    long bytesBeforeWritable();
    Unsafe unsafe();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
    @Override
    Channel read();
    @Override
    Channel flush();
    /**
     * 調用Javanio方法封裝
     */
    interface Unsafe {

        RecvByteBufAllocator.Handle recvBufAllocHandle();
        /**
         * 返回地址
         */
        SocketAddress localAddress();
        /**
         * 返回遠程地址
         */
        SocketAddress remoteAddress();
        /**
         * 注冊Channel,注冊完成后通知ChannelPromise
         */
        void register(EventLoop eventLoop, ChannelPromise promise);
        /**
         * 將ip地址綁定到Channel,完成后通知ChannelPromise
         */
        void bind(SocketAddress localAddress, ChannelPromise promise);
        /**
         * 連接遠程ip地址
         */
        void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
        /**
         * 斷開連接,完成后通知ChannelPromise
         */
        void disconnect(ChannelPromise promise);
        /**
         * 關閉channel,通知ChannelPromise
         */
        void close(ChannelPromise promise);
        /**
         * 關閉,不處罰任何事件
         */
        void closeForcibly();
        /**
         * 注銷channel,通知ChannelPromise
         */
        void deregister(ChannelPromise promise);
        /**
         * 調用讀取操作
         */
        void beginRead();
        /**
         * 調用寫操作
         */
        void write(Object msg, ChannelPromise promise);
        /**
         * 清空所有通過ChannelPromise預定的寫操作
         */
        void flush();
        ChannelPromise voidPromise();
        /**
         * 返回存儲待處理寫入請求的Channel的ChannelOutboundBuffer。
         */
        ChannelOutboundBuffer outboundBuffer();
    }
}
public abstract class AbstractNioChannel extends AbstractChannel {
    private final SelectableChannel ch;
    protected final int readInterestOp;
    volatile SelectionKey selectionKey;
    boolean readPending;
    private final Runnable clearReadPendingRunnable = new Runnable() {
        @Override
        public void run() {
            clearReadPending0();
        }
    };
    private ChannelPromise connectPromise;
    private Future<?> connectTimeoutFuture;
    private SocketAddress requestedRemoteAddress;
 }

以上是Channel接口和AbstractNioChannel的抽象類,這里給大家精簡了下,從Channel定義的各個方法可以看出,netty的Channel是對原始Channel的一層封裝。其中所有的nio的操作封裝在了Unsafe中,並進行了一定的增強,例如回調之類的。從AbstractNioChannel可以更加直觀的看出,netty對Channel SelectionKey的封裝,並添加了自己的回調ChannelPromise從而使方法更加易於使用。

ChannelPipeline & ChannelHandler

基礎講解

ChannelPipeline的初始化

 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        ...其余略...
        private final DefaultChannelPipeline pipeline;
        
	protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
	}

	protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
}

ChannelPipeline內部結構概述

public class DefaultChannelPipeline implements ChannelPipeline {
	final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;

    private final Channel channel;
    private final ChannelFuture succeededFuture;

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }
    ...略

    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);
            setAddComplete();
        }
    	... 略
    }

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, HeadContext.class);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
        ...略
    }


}

上面列舉了ChannelPipeline的創建,以及ChannelPipeline的內部結構。可以看出它維護了一個雙向鏈表。我們在添加handler的時候就是往這個鏈表中添加的。

image.png

7B15FA95-9788-4E6F-BE73-A78CE0F1BBC4.png

ChannelHandler 添加進ChannelPipeline后會被封裝成ChannelHandlerContext,會判斷是ChannelInboundHandler還是ChannelOutboundHandler的子類,對inboundoutbound這兩個屬性進行賦值,ChannelInboundHandler的子類inbound為true,outbound為false,ChannelOutboundHandler反之。ChannelPipeline內部調用方法時,會使用fireXXXXX()的方法,會利用責任鏈模式進行調用,這時候會用到這個屬性進行判斷,是否有對應方法,從而進行調用(后面會詳細講解下)。

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
    DefaultChannelHandlerContext(
	            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
            super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            this.handler = handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
 
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    
   
    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
}

方法調用

這里就用了責任鏈的方式調用方法,確定下一個調用哪一個節點,就是通過inbound outbound
這兩個字段決定的。

public final void read() {
    ...省略掉部分無用代碼
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);

            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } 
}


@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}


static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    //如果msg實現了ReferenceCounted,進行特殊操作
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            //調用下一個節點的channelRead方法
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            invokeExceptionCaught(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

下面就是調用你自定義的ChannelInboundHandler子類的覆蓋方法了,這里就不過多贅述。

EventLoopGroup & EventLoop

image.png

初始化

EventLoopGroup初始化創建的時候,會創建對應數量的EventLoop,如果沒有指定,默認創建cpu核心數量*2個EventLoop

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
        //默認線程數是cpu核心的2倍
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                //創建對應數量的EventLoop
                children[i] = newChild(executor, args);
                success = true;
            } 
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
}    

將EventLoop封裝進EventExecutorChooser

@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
  if (isPowerOfTwo(executors.length)) {
      return new PowerOfTwoEventExecutorChooser(executors);
  } else {
      return new GenericEventExecutorChooser(executors);
  }
}

方法調用

此處借助EchoServer啟動分析EventLoop方法執行過程(不感興趣的可以跳過)

如果你服務設置了主從線程,在啟動的時候,就會使用主線程啟動服務。

final ChannelFuture initAndRegister() {
    ...省略部分代碼
    ChannelFuture regFuture = config().group().register(channel);
}

@Override
public ChannelFuture register(Channel channel) {
    //從EventExecutorChooser獲取到EventLoop注冊Channel
    return next().register(channel);
}

protected abstract class AbstractUnsafe implements Unsafe {
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        }
    }     
}
//將任務先添加進隊列,
private void execute(Runnable task, boolean immediate) {
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        //主線程輪循,監聽事件
        startThread();

    }

    ...省略無用代碼
}
protected void addTask(Runnable task) {
    ObjectUtil.checkNotNull(task, "task");
    if (!offerTask(task)) {
        reject(task);
    }
}
final boolean offerTask(Runnable task) {
    if (isShutdown()) {
        reject();
    }
    return taskQueue.offer(task);
}

startThread(); 比較核心單獨說一下,他會啟動一個線程

private void doStartThread() {
    assert thread == null;
    executor.execute(new Runnable() {
        @Override
        public void run() {
            ...省略無用代碼
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } 
        }
    });
}
//execute沒啥好說的了,就是啟動線程
public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}
protected void run() {
    for (;;) {
        else if (strategy > 0) {
            final long ioStartTime = System.nanoTime();
            try {
                processSelectedKeys();
            } finally {
                // Ensure we always run tasks.
                final long ioTime = System.nanoTime() - ioStartTime;

                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }
        } else {
            //執行前天添加的任務
            ranTasks = runAllTasks(0); // This will run the minimum number of tasks
        }
    }
}

processSelectedKeys();就是正常執行讀取連接的操作入口,runAllTasks( ); 就是上面添加的匿名內部類的執行入口

new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
}

ChannelFuture

image.png
這個從圖中可以看出它就是對java.util.concurrent.Future的拓展。

這里我們主要看一下它擴展的回調機制

 public Promise<V> await() throws InterruptedException {
    //根據有無結果判斷當前任務是否完成
    if (isDone()) {
        return this;
    }

    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }

    checkDeadLock();

    synchronized (this) {
        while (!isDone()) {
            incWaiters();
            try {
                //線程進入等待狀態
                wait();
            } finally {
                decWaiters();
            }
        }
    }
    return this;
}

private static boolean isDone0(Object result) {
    return result != null && result != UNCANCELLABLE;
}

與之對應的就是notify了

image.png

image.png
調用回調最終會調用

private void notifyListenersNow() {
    Object listeners;
    synchronized (this) {
        // Only proceed if there are listeners to notify and we are not already notifying listeners.
        if (notifyingListeners || this.listeners == null) {
            return;
        }
        //用完就刪
        notifyingListeners = true;
        listeners = this.listeners;
        this.listeners = null;
    }
    for (;;) {
        if (listeners instanceof DefaultFutureListeners) {
            notifyListeners0((DefaultFutureListeners) listeners);
        } else {
            notifyListener0(this, (GenericFutureListener<?>) listeners);
        }
        synchronized (this) {
            if (this.listeners == null) {
                // Nothing can throw from within this method, so setting notifyingListeners back to false does not
                // need to be in a finally block.
                notifyingListeners = false;
                return;
            }
            listeners = this.listeners;
            this.listeners = null;
        }
    }
}

private static void notifyListener0(Future future, GenericFutureListener l) {
    try {
        //調用自定義listener
        l.operationComplete(future);
    } catch (Throwable t) {
        if (logger.isWarnEnabled()) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
}

Bootstrap

最后我們再來看下Bootstrap,核心部分上面已經講完了,這里就不多贅述,這就簡述下

public abstract class AbstractBootstrap{
    private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
    private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
    volatile EventLoopGroup group;
    private volatile ChannelFactory<? extends C> channelFactory;
    private volatile SocketAddress localAddress;
    private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private volatile ChannelHandler handler;
}

public class ServerBootstrap extends AbstractBootstrap{
    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    private volatile EventLoopGroup childGroup;
    private volatile ChannelHandler childHandler;
}
private ChannelFuture doBind(final SocketAddress localAddress) {
    //初始化並注冊一個 Channel 對象,pipeline中添加ServerBootstrapAcceptor,處理連理連接事件,
    //        ChannelFuture regFuture = config().group().register(channel);啟動線程循環監聽事件
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    //因為是異步,不能保證是否完成
    //綁定Channel端口,並注冊channel到selectionKey中
    if (regFuture.isDone()) {
        // 注冊完成
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 注冊還未完成
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // EventLoop 上的注冊失敗,因此一旦我們嘗試訪問 Channel 的 EventLoop,就直接使 ChannelPromise 失敗,以免導致 IllegalStateException。
                    promise.setFailure(cause);
                } else {
                    // 注冊成功,所以設置正確的執行器來使用。
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    //綁定端口
                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

至此,Netty源碼分析就結束了,大部分都已經講完,感興趣的朋友可以跟着ServerBootstrap的源碼跑一下,大部分都明白了,本來上一篇博客寫了ServerBootstrap啟動過程分析,但是覺得又臭又長,就給刪了。就是跑一邊代碼,誰不會呢,這里就簡述下關鍵的部分。還有其他部分,后面看心情決定要不要寫博客了,反正也沒人看~~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM