Netty源碼—一、server啟動(1)


說明:netty源碼系列是基於4.1.25版本的netty源碼的
Netty作為一個Java生態中的網絡組件有着舉足輕重的位置,各種開源中間件都使用Netty進行網絡通信,比如Dubbo、RocketMQ。可以說Netty是對Java NIO的封裝,比如ByteBuf、channel等的封裝讓網絡編程更簡單。

在介紹Netty服務器啟動之前需要簡單了解兩件事:

  1. reactor線程模型
  2. linux中的IO多路復用

reactor線程模型

關於reactor線程模型請參考這篇文章,通過不同的配置Netty可以實現對應的三種reactor線程模型

  • reactor單線程模型
  • reactor多線程模型
  • reactor主從多線程模型
 // reactor單線程模型,accept、connect、read、write都在一個線程中執行
EventLoopGroup group = new NioEventLoopGroup(1);
bootStrap.group(group);

// reactor多線程,accept在bossGroup中的一個線程執行,IO操作在workerGroup中的線程執行
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootStrap.group(bossGroup , workerGroup);

// reactor主從多線程,用來accept連接的是在一個線程池中執行,這個時候需要bind多個port,因為Netty一個bind的port會啟動一個線程來accept
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workerGroup = new NioEventLoopGroup();
bootStrap.group(bossGroup , workerGroup);

注意:本文后面的介紹如無特別說明都是基於reactor多線程模型

linux中的IO多路復用

linux中的網絡編程模型也是在不斷演變的,下面是依次演變的順序(具體可參考《UNIX網絡編程卷1:套接字聯網API》第三版的第4、6章)

accept

阻塞等待連接,接收到新的連接后新起線程來處理接收到的連接,然后在新的線程中阻塞等待新的數據到來

select

根據入參的不同有三種情況

  1. 永遠等下去,直到監聽的描述符有任意的IO事件才返回
  2. 等待一段固定時間,如果時間到之前有IO事件則提前返回,否則等待超時后返回
  3. 不等待,檢查描述符后立即返回,稱為輪詢

select會返回就緒的文件描述符的個數,需要輪詢所有socket,判斷每個socket的狀態來確定是否有事件、是什么事件

poll

相比較於selectpoll是阻塞等待的,只有有讀寫事件的時候才會返回,返回的是有讀寫事件的socket個數,並且將對應的socket的事件置位,自己從所有socket中找到具體的socket

epoll

相比較於poll,epoll可以將只有確實有IO事件的描述符返回,大並發下只有少量活躍連接的情況下使用

較poll的優勢

  1. 不用開發者重新准備文件描述符集合(較poll入參簡單)
  2. 無需遍歷所有監聽的描述符,只要遍歷哪些被內核IO事件異步喚醒而加入ready隊列的描述符集合

Java NIO在linux的實現就是基於epoll的。epoll的編程模型:

  1. 創建socket,socket方法
  2. 綁定服務器ip,port,bind方法
  3. 監聽綁定了ip:port的文件描述符,listen方法
  4. 創建epoll句柄(文件描述符),配置最大監聽的文件描述符個數,epoll_create方法
  5. 配置epoll監聽的文件描述符的事件:注冊、修改、刪除某個文件描述符對應的事件
  6. 監聽所有已配置的描述符,epoll_wait
  7. 有新的事件的時候遍歷返回的描述符,處理對應的事件
  8. 如果是來自客戶端的連接,則將accept到的文件描述符注冊到epoll中
  9. 如果是讀寫事件則分別處理

注意:Netty封裝的Java NIO是跨平台的,后面還是以linux平台為例來介紹

接下來言歸正傳,來看看Netty的服務器啟動過程做了什么事情。Netty作為一個網絡框架,和普通網絡編程做的事情基本上一樣,對應於上面epoll的編程模型,Netty的啟動過程為

  1. 初始化線程池,初始化selector
  2. 初始化NioServerSocketChannel
  3. 綁定服務器ip:port
  4. 將NioServerSocketChannel注冊到selector中
  5. 配置NioServerSocketChannel監聽的事件
  6. 使用selector.select等待新的IO事件
  7. 如果是來自客戶端的連接則將NioSocketChannel注冊到selector上(如果是新的線程則是新的selector)
  8. 如果是普通IO事件則在worker線程中處理

線程池初始化

在介紹NioEventLoopGroup之前先看下NioEventLoop

可以看到NioEventLoop繼承自SingleThreadEventExecutor,是一個單線程的executor,在線程中死循環監聽IO事件。主要方法有

// 初始化selector
io.netty.channel.nio.NioEventLoop#openSelector
// 將channel注冊到selector
io.netty.channel.nio.NioEventLoop#register
// 監聽selector上的事件
io.netty.channel.nio.NioEventLoop#select

一個NioEventLoop會初始化一個selector,處理selector上注冊的channel。

NioEventLoopGroup從名字上就可以看出來是由多個NioEventLoop組成,類關系圖如下

NioEventLoopGroup的重要屬性為:

// 包含的EventExecutor數組
private final EventExecutor[] children;
// 選擇哪一個EventExecutor執行task的選擇器,不同的選擇器有不同的策略
private final EventExecutorChooserFactory.EventExecutorChooser chooser;

重要方法有:

// 選擇下一個執行任務的線程
io.netty.util.concurrent.MultithreadEventExecutorGroup#next
// 創建EventLoop
io.netty.channel.nio.NioEventLoopGroup#newChild
// 在線程池中執行注冊channel的任務
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
// 創建默認的threadFactory
io.netty.channel.MultithreadEventLoopGroup#newDefaultThreadFactory

線程池初始化的代碼為

EventLoopGroup workerGroup = new NioEventLoopGroup();

如果使用無參的構造方法的話,最后會執行下面這個構造方法,這里面做要做了以下幾件事

  • 如果executor沒有初始化,使用默認的executor初始化
  • 初始化線程池中每個EventLoop
  • 如果其中一個初始化過程中拋出異常,關閉所有的NioEventLoop
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            // 創建EventLoop
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    // 初始化chooser,決定選擇下一個線程的策略
    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

使用默認參數構造參數的話,上面這個構造方法的入參的值分別是

nThreads

// 默認的線程池大小
private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    // 如果配置了io.netty.eventLoopThreads參數的話,先取該參數的值
    // 如果沒有配置上面的參數,則取機器處理器個數的2倍
    // 如果上面算出的結果小於1則取1
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
        "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

// 默認沒有指定線程池大小,取DEFAULT_EVENT_LOOP_THREADS
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

executor

默認沒有指定executor,為null

chooserFactory

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
    this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// io.netty.util.concurrent.DefaultEventExecutorChooserFactory

使用默認的chooser,該類的主要功能是提供選擇下一個線程的策略

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
	// 單例
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            // 如果是2的冪次則使用這個chooser
            return new PowerOfTowEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        // 判斷一個數是否2的冪,方法很巧妙
        return (val & -val) == val;
    }

    private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 如果是2的冪次個線程,可以使用位運算計算出下一個選出的線程的index
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            // 使用求余的方法計算出下一個線程的index
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

可以看出上面兩個chooser計算出的最終結果是一致的,但是使用位運算更快一點,所以如果是線程池的大小剛好是2的冪次的話使用位運算的chooser。

args

// args[0],下面方法返回的provider,在linux平台上默認是EPollSelectorProvider
java.nio.channels.spi.SelectorProvider#provider
// args[1],決定eventLoop每次執行select還是執行隊列中的任務
io.netty.channel.DefaultSelectStrategyFactory
// args[2],等待隊列滿以后的拒絕策略
io.netty.util.concurrent.RejectedExecutionHandlers#REJECT

初始化NioEventLoopGroup過程主要是為了初始化線程池中每一個NioEventLoop,而每一個NioEventLoop包含一個selector。

初始化selector

接着上面說到的初始化NioEventLoop,調用newChild方法來初始化

// io.netty.channel.nio.NioEventLoopGroup#newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    // 下面這幾個參數上面已經介紹過
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
             SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    // 調用父類構造方法初始化taskQueue,taskQueue的大小取Math.max(16, maxPendingTasks)
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    // 校驗selectorProvider
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    // 校驗EventLoop每次執行的select策略是否為空
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    // 初始化selector
    selector = openSelector();
    selectStrategy = strategy;
}


private Selector openSelector() {
    final Selector selector;
    try {
        // 調用的是sun.nio.ch.EPollSelectorProvider#openSelector
        // 返回的是sun.nio.ch.EPollSelectorImpl
        selector = provider.openSelector();
    } catch (IOException e) {
        throw new ChannelException("failed to open a new selector", e);
    }

    // 是否使用SelectedSelectionKeySet優化,默認不禁用false
    if (DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    }

    // Netty優化過后的
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

    // 嘗試獲取SelectorImpl對象,后續會使用反射操作這個類的屬性
    Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                return Class.forName(
                    "sun.nio.ch.SelectorImpl",
                    false,
                    PlatformDependent.getSystemClassLoader());
            } catch (ClassNotFoundException e) {
                return e;
            } catch (SecurityException e) {
                return e;
            }
        }
    });

    // 確保有權限訪問該類
    if (!(maybeSelectorImplClass instanceof Class) ||
        // ensure the current selector implementation is what we can instrument.
        !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
        if (maybeSelectorImplClass instanceof Exception) {
            Exception e = (Exception) maybeSelectorImplClass;
            logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
        }
        return selector;
    }

    final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

    Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
        @Override
        public Object run() {
            try {
                // 得到字段selectedKeys
                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                // 得到字段publicSelectedKeys
                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                selectedKeysField.setAccessible(true);
                publicSelectedKeysField.setAccessible(true);

                // 將selectedKeys、publicSelectedKeys均設置為Netty自定義的SelectedSelectionKeySet
                selectedKeysField.set(selector, selectedKeySet);
                publicSelectedKeysField.set(selector, selectedKeySet);
                return null;
            } catch (NoSuchFieldException e) {
                return e;
            } catch (IllegalAccessException e) {
                return e;
            } catch (RuntimeException e) {
                // JDK 9 can throw an inaccessible object exception here; since Netty compiles
                // against JDK 7 and this exception was only added in JDK 9, we have to weakly
                // check the type
                if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {
                    return e;
                } else {
                    throw e;
                }
            }
        }
    });

    if (maybeException instanceof Exception) {
        selectedKeys = null;
        Exception e = (Exception) maybeException;
        logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
    } else {
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", selector);
    }

    return selector;
}

初始化selector的過程中主要做了幾件事:

  • 使用平台相關的provider初始化對應的SelectorImpl,這里使用了Java的SPI來加載平台相關的provider,每一種provider又對應一種SelectorImpl
  • 如果沒有禁用selectedKey優化,Netty會使用自定的SelectedSelectionKeySet替換SelectorImpl的publicSelectedKeys、selectedKeys

對SelectorImpl.selectedKey優化的說明

  1. 利用反射將SelectorImpl.selectedKey替換成了SelectedSelectionKeySet,SelectedSelectionKeySet利用數組實現元素存放
  2. 在調用select方法的時候如果有事件進來的時候會調用SelectedSelectionKeySet#add,將有IO事件的selectKey添加到keyset中
  3. 使用數組遍歷(processSelectedKeysOptimized)要比使用set遍歷快一些,參考文后第一篇參考文章
  4. 在Java9以后這個優化就失效了,因為Java9引入了Jigsaw

接下來看看Selector創建過程,上面調用了EPollSelectorProvider#openSelector來開始初始化selector

public AbstractSelector openSelector() throws IOException {
    // 直接new 一個EPollSelectorImpl
    return new EPollSelectorImpl(this);
}

// 該構造方法只能是包內使用,供provider來調用
EPollSelectorImpl(SelectorProvider sp) throws IOException {
    // 調用父類SelectorImpl的構造方法初始化selectedKeys、publicKeys、publicSelectedKeys
    // 上面已經說過了,如果使用Netty的優化,publicKeys、publicSelectedKey會被替換
    super(sp);
    // 調用linux的pipe方法,創建一個管道,配置為非阻塞的
    long pipeFds = IOUtil.makePipe(false);
    // 高32為讀文件描述符
    fd0 = (int) (pipeFds >>> 32);
    // 低32位為寫文件描述符
    fd1 = (int) pipeFds;
    // EPollArrayWrapper包含一系列native方法來調用EPollArrayWrapper.c本地方法
    pollWrapper = new EPollArrayWrapper();
    pollWrapper.initInterrupt(fd0, fd1);
    // fdToKey用來保存文件描述符和SelectionKeyImpl的映射
    fdToKey = new HashMap<>();
}

EPollArrayWrapper() throws IOException {
    // creates the epoll file descriptor
    // 創建epoll的文件描述符
    epfd = epollCreate();

    // the epoll_event array passed to epoll_wait
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();

    // eventHigh needed when using file descriptors > 64k
    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}

終於看到創建epoll文件描述符相關代碼了,上面這個還是看不到究竟調用了哪些本地方法,我們看看相關的c代碼

// jdk/src/solaris/native/sun/nio/ch/IOUtil.c
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];

    // 打開pipe
    if (pipe(fd) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
        return 0;
    }
    if (blocking == JNI_FALSE) {
        // 配置管道為非阻塞
        if ((configureBlocking(fd[0], JNI_FALSE) < 0)
            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
            close(fd[0]);
            close(fd[1]);
            return 0;
        }
    }
    // 將讀寫文件描述符放入一個long型中返回
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}

// jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
    // 這里調用linux函數epoll_create創建epoll的文件描述符
    int epfd = epoll_create(256);
    if (epfd < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

總結

經過上面說明,現在對於Netty啟動過程中線程池的初始化過程和selector初始化過程已經比較清晰了,對於native方法的分析讓我們對比linux中epoll編程,對於原理更加清楚。

接下來就是將需要監聽的描述符注冊到epoll上,對應到Netty就是講channel注冊到selector上,下一篇文章繼續寫Netty源碼—二、server啟動(2)

參考

Netty源碼分析——Reactor的processSelectedKeys

關於SelectedSelectionKeySet優化的討論

https://github.com/netty/netty/issues/2363

https://github.com/netty/netty/commit/cd579f75d2b5f236f35bc47f454cc07e50ae8037


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM