前言
前面小飛已經講解了NIO和Netty服務端啟動,這一講是Client的啟動過程。
源碼系列的文章依舊還是遵循大白話+畫圖的風格來講解,本文Netty源碼及以后的文章版本都基於:4.1.22.Final
本篇是以NettyClient啟動為切入點,帶大家一步步進入Netty源碼的世界。
Client啟動流程揭秘
1、探秘的入口:netty-client demo
這里用netty-exmaple中的EchoClient來作為例子:
public final class EchoClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
代碼沒有什么獨特的地方,我們上一篇文章時也梳理過Netty網絡編程的一些套路,這里就不再贅述了。
(忘記的小朋友可以查看Netty系列文章中查找~)
上面的客戶端代碼雖然簡單, 但是卻展示了Netty 客戶端初始化時所需的所有內容:
EventLoopGroup:Netty服務端或者客戶端,都必須指定EventLoopGroup,客戶端指定的是NioEventLoopGroupBootstrap:Netty客戶端啟動類,負責客戶端的啟動和初始化過程channel()類型:指定Channel的類型,因為這里是客戶端,所以使用的是NioSocketChannel,服務端會使用NioServerSocketChannelHandler:設置數據的處理器bootstrap.connect(): 客戶端連接netty服務的方法
2、NioEventLoopGroup 流程解析
我們先從NioEventLoopGroup開始,一行行代碼解析,先看看其類結構:

上面是大致的類結構,而 EventLoop 又繼承自EventLoopGroup,所以類的大致結構我們可想而知。這里一些核心邏輯會在MultithreadEventExecutorGroup中,包含EventLoopGroup的創建和初始化操作等。
接着從NioEventLoopGroup構造方法開始看起,一步步往下跟(代碼都只展示重點的部分,省去很多暫時不需要關心的代碼,以下代碼都遵循這個原則):
EventLoopGroup group = new NioEventLoopGroup();
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
這里通過調用this()和super()方法一路往下傳遞,期間會構造一些默認屬性,一直傳遞到MultithreadEventExecutorGroup類中,接着往西看。
2.1、MultithreadEventExecutorGroup
上面構造函數有一個重要的參數傳遞:DEFAULT_EVENT_LOOP_THREADS,這個值默認是CPU核數 * 2。
為什么要傳遞這個參數呢?我們之前說過EventLoopGroup可以理解成一個線程池,MultithreadEventExecutorGroup有一個線程數組EventExecutor[] children屬性,而傳遞過來的DEFAULT_EVENT_LOOP_THREADS就是數組的長度。
先看下MultithreadEventExecutorGroup中的構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
// ... 省略
}
這段代碼執行邏輯可以理解為:
- 通過
ThreadPerTaskExecutor構造一個Executor執行器,后面會細說,里面包含了線程執行的execute()方法 - 接着創建一個
EventExecutor數組對象,大小為傳遞進來的threads數量,這個所謂的EventExecutor可以理解為我們的EventLoop,在這個demo中就是NioEventLoop對象 - 最后調用
newChild方法逐個初始化EventLoopGroup中的EventLoop對象
上面只是大概說了下MultithreadEventExecutorGroup中的構造方法做的事情,后面還會一個個詳細展開,先不用着急,我們先有個整體的認知就好。
再回到MultithreadEventExecutorGroup中的構造方法入參中,有個EventExecutorChooserFactory對象,這里面是有個很亮眼的細節設計,通過它我們來洞悉Netty的良苦用心。
2.1、亮點設計:DefaultEventExecutorChooserFactory

EventExecutorChooserFactory這個類的作用是用來選擇EventLoop執行器的,我們知道EventLoopGroup是一個包含了CPU * 2個數量的EventLoop數組對象,那每次選擇EventLoop來執行任務是選擇數組中的哪一個呢?
我們看一下這個類的具體實現,紅框中都是需要重點查看的地方:

DefaultEventExecutorChooserFactory是一個選擇器工廠類,調用里面的next()方法達到一個輪詢選擇的目的。
數組的長度是length,執行第n次,取數組中的哪個元素就是對length取余

繼續回到代碼的實現,這里的優化就是在於先通過isPowerOfTwo()方法判斷數組的長度是否為2的n次冪,判斷的方式很巧妙,使用val & -val == val,這里我不做過多的解釋,網上還有很多判斷2的n次冪的優秀解法,我就不班門弄斧了。(可參考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)
當然我認為這里還有更容易理解的一個算法:x & (x - 1) == 0 大家可以看下面的圖就懂了,這里就不延展了:

BUT!!! 這里為什么要去煞費苦心的判斷數組的長度是2的n次冪?
不知道小伙伴們是否還記得大明湖畔的HashMap?一般我們要求HashMap數組的長度需要是2的n次冪,因為在key值尋找數組位置的方法:(n - 1) & hash n是數組長度,這里如果數組長度是2的n次冪就可以通過位運算來提升性能,當length為2的n次冪時下面公式是等價的:
n & (length - 1) <=> n % length
還記得上面說過,數組的長度默認都是CPU * 2,而一般服務器CPU核心數都是2、4、8、16等等,所以這一個小優化就很實用了,再仔細想想,原來數組長度的初始化也是很講究的。
這里位運算的好處就是效率遠遠高於與運算,Netty針對於這個小細節都做了優化,真是太棒了。
2.3、線程執行器:ThreadPerTaskExecutor
接着看下ThreadPerTaskExecutor線程執行器,每次執行任務都會通過它來創建一個線程實體。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
傳遞進來的threadFactory為DefaultThreadFactory,這里面會構造NioEventLoop線程命名規則為nioEventLoop-1-xxx,我們就不細看這個了。當線程執行的時候會調用execute()方法,這里會創建一個FastThreadLocalThread線程,具體看代碼:
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
這里通過newThread()來創建一個線程,然后初始化線程對象數據,最終會調用到Thread.init()中。
2.4、EventLoop初始化
接着繼續看MultithreadEventExecutorGroup構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
// .... 省略部分代碼
}
}
上面代碼的最后一部分是 newChild 方法, 這個是一個抽象方法, 它的任務是實例化 EventLoop 對象. 我們跟蹤一下它的代碼, 可以發現, 這個方法在 NioEventLoopGroup 類中實現了, 其內容很簡單:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
其實就是實例化一個 NioEventLoop 對象, 然后返回。NioEventLoop構造函數中會保存provider和事件輪詢器selector,在其父類中還會創建一個MpscQueue隊列,然后保存線程執行器executor。
再回過頭來想一想,MultithreadEventExecutorGroup 內部維護了一個 EventExecutor[] children數組, Netty 的 EventLoopGroup 的實現機制其實就建立在 MultithreadEventExecutorGroup 之上。
每當 Netty 需要一個 EventLoop 時, 會調用 next() 方法從EventLoopGroup數組中獲取一個可用的 EventLoop對象。其中next方法的實現是通過NioEventLoopGroup.next()來完成的,就是用的上面有過講解的通過輪詢算法來計算得出的。
最后總結一下整個 EventLoopGroup 的初始化過程:

EventLoopGroup(其實是MultithreadEventExecutorGroup) 內部維護一個類型為EventExecutor children數組,數組長度是nThreads- 如果我們在實例化
NioEventLoopGroup時, 如果指定線程池大小, 則nThreads就是指定的值, 反之是處理器核心數 * 2 MultithreadEventExecutorGroup中會調用newChild抽象方法來初始化children數組- 抽象方法
newChild是在NioEventLoopGroup中實現的, 它返回一個NioEventLoop實例. NioEventLoop屬性:SelectorProvider provider屬性:NioEventLoopGroup構造器中通過SelectorProvider.provider()獲取一個SelectorProviderSelector selector屬性:NioEventLoop構造器中通過調用通過selector = provider.openSelector()獲取一個selector對象.
2.5、NioSocketChannel
在Netty中,Channel是對Socket的抽象,每當Netty建立一個連接后,都會有一個與其對應的Channel實例。
我們在開頭的Demo中,設置了channel(NioSocketChannel.class),NioSocketChannel的類結構如下:

接着分析代碼,當我們調用b.channel()時實際上會進入AbstractBootstrap.channel()邏輯,接着看AbstractBootstrap中代碼:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
可以看到,這里ReflectiveChannelFactory其實就是返回我們指定的channelClass:NioSocketChannel, 然后指定AbstractBootstrap中的channelFactory = new ReflectiveChannelFactory()。
2.6、Channel初始化流程
到了這一步,我們已經知道NioEventLoopGroup和channel()的流程,接着來看看Channel的 初始化流程,這也是Netty客戶端啟動的的核心流程之一:
ChannelFuture f = b.connect(HOST, PORT).sync();
接着就開始從b.connect()為入口一步步往后跟,先看下NioSocketChannel構造的整體流程:

從connet往后梳理下整體流程:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
為了更易讀,這里代碼都做了簡化,只保留了一些重要的代碼。
緊接着我們看看channelFactory.newChannel()做了什么,這里channelFactory是ReflectiveChannelFactory,我們在上面的章節分析過:
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
這里的clazz是NioSocketChannel,同樣是在上面章節講到過,這里是調用NioSocketChannel的構造函數然后初始化一個Channel實例。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
}
這里其實也很簡單,就是創建一個Java NIO SocketChannel而已,接着看看NioSocketChannel的父類還做了哪些事情,這里梳理下類的關系:
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
ch.configureBlocking(false);
}
}
這里會調用父類的構造參數,並且傳遞readInterestOp = SelectionKey.OP_READ:,這里還有一個很重要的點,配置 Java NIO SocketChannel 為非阻塞的,我們之前在NIO章節的時候講解過,這里也不再贅述。
接着繼續看AbstractChannel的構造函數:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
這里創建一個ChannelId,創建一個Unsafe對象,這里的Unsafe並不是Java中的Unsafe,后面也會講到。然后創建一個ChannelPipeline,后面也會講到,到了這里,一個完整的NioSocketChannel 就初始化完成了,我們再來總結一下:
Netty的SocketChannel會與Java原生的SocketChannel綁定在一起;- 會注冊
Read事件; - 會為每一個
Channel分配一個channelId; - 會為每一個
Channel創建一個Unsafe對象; - 會為每一個
Channel分配一個ChannelPipeline;
2.7、Channel 注冊流程
還是回到最上面initAndRegister方法,我們上面都是在分析里面newChannel的操作,這個方法是NioSocketChannel創建的一個流程,接着我們在繼續跟init()和register()的過程:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
}
init()就是將一些參數options和attrs設置到channel中,我們重點需要看的是register方法,其調用鏈為:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
這里最后到了unsafe的register()方法,最終調用到AbstractNioChannel.doRegister():
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
javaChannel()就是Java NIO中的SocketChannel,這里是將SocketChannel注冊到與eventLoop相關聯的selector上。

最后我們整理一下服務啟動的整體流程:
initAndRegister()初始化並注冊什么呢?
channelFactory.newChannel()- 通過反射創建一個
NioSocketChannel - 將
Java原生Channel綁定到NettyChannel中 - 注冊
Read事件 - 為
Channel分配id - 為
Channel創建unsafe對象 - 為
Channel創建ChannelPipeline(默認是head<=>tail的雙向鏈表)
- `init(channel)``
- 把
Bootstrap中的配置設置到Channel中
register(channel)
- 把
Channel綁定到一個EventLoop上 - 把
Java原生Channel、Netty的Channel、Selector綁定到SelectionKey中 - 觸發
Register相關的事件
2.8 unsafe初始化
上面有提到過在初始化Channel的過程中會創建一個Unsafe的對象,然后綁定到Channel上:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newUnsafe直接調用到了NioSocketChannel中的方法:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
NioSocketChannelUnsafe是NioSocketChannel中的一個內部類,然后向上還有幾個父類繼承,這里主要是對應到相關Java底層的Socket操作。
2.9 pipeline初始化
我們還是回到pipeline初始化的過程,來看一下newChannelPipeline()的具體實現:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
我們調用 DefaultChannelPipeline 的構造器, 傳入了一個 channel, 而這個 channel 其實就是我們實例化的 NioSocketChannel。
DefaultChannelPipeline 會將這個 NioSocketChannel 對象保存在channel 字段中. DefaultChannelPipeline 中, 還有兩個特殊的字段, 即 head 和 tail, 而這兩個字段是一個雙向鏈表的頭和尾. 其實在 DefaultChannelPipeline 中, 維護了一個以 AbstractChannelHandlerContext 為節點的雙向鏈表, 這個鏈表是 Netty 實現 Pipeline 機制的關鍵.
關於 DefaultChannelPipeline 中的雙向鏈表以及它所起的作用, 我們會在后續章節詳細講解。這里只是對pipeline做個初步的認識。
HeadContext 的繼承層次結構如下所示:

TailContext 的繼承層次結構如下所示:

我們可以看到, 鏈表中 head 是一個 ChannelOutboundHandler, 而 tail 則是一個 ChannelInboundHandler.
3.0、客戶端connect過程
客戶端連接的入口方法還是在Bootstrap.connect()中,上面也分析過一部分內容,請求的具體流程是:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
看到這里,還是用Java NIO SocketChannel發送的connect請求進行客戶端連接請求。
總結
本篇文章以一個Netty Client demo為入口,然后解析了NioEventLoopGroup創建的流程、Channel的創建和注冊的流程,以及客戶端發起connect的具體流程,這里對於很多細節並沒有很深的深入下去,這些會放到后續的源碼分析文章,敬請期待~

