前言
前面小飛已經講解了NIO
和Netty
服務端啟動,這一講是Client
的啟動過程。
源碼系列的文章依舊還是遵循大白話+畫圖的風格來講解,本文Netty
源碼及以后的文章版本都基於:4.1.22.Final
本篇是以NettyClient
啟動為切入點,帶大家一步步進入Netty
源碼的世界。
Client啟動流程揭秘
1、探秘的入口:netty-client demo
這里用netty-exmaple
中的EchoClient
來作為例子:
public final class EchoClient {
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
代碼沒有什么獨特的地方,我們上一篇文章時也梳理過Netty
網絡編程的一些套路,這里就不再贅述了。
(忘記的小朋友可以查看Netty
系列文章中查找~)
上面的客戶端代碼雖然簡單, 但是卻展示了Netty
客戶端初始化時所需的所有內容:
EventLoopGroup
:Netty
服務端或者客戶端,都必須指定EventLoopGroup
,客戶端指定的是NioEventLoopGroup
Bootstrap
:Netty
客戶端啟動類,負責客戶端的啟動和初始化過程channel()
類型:指定Channel
的類型,因為這里是客戶端,所以使用的是NioSocketChannel
,服務端會使用NioServerSocketChannel
Handler
:設置數據的處理器bootstrap.connect()
: 客戶端連接netty
服務的方法
2、NioEventLoopGroup 流程解析
我們先從NioEventLoopGroup
開始,一行行代碼解析,先看看其類結構:
上面是大致的類結構,而 EventLoop
又繼承自EventLoopGroup
,所以類的大致結構我們可想而知。這里一些核心邏輯會在MultithreadEventExecutorGroup
中,包含EventLoopGroup
的創建和初始化操作等。
接着從NioEventLoopGroup
構造方法開始看起,一步步往下跟(代碼都只展示重點的部分,省去很多暫時不需要關心的代碼,以下代碼都遵循這個原則):
EventLoopGroup group = new NioEventLoopGroup();
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
這里通過調用this()
和super()
方法一路往下傳遞,期間會構造一些默認屬性,一直傳遞到MultithreadEventExecutorGroup
類中,接着往西看。
2.1、MultithreadEventExecutorGroup
上面構造函數有一個重要的參數傳遞:DEFAULT_EVENT_LOOP_THREADS
,這個值默認是CPU核數 * 2
。
為什么要傳遞這個參數呢?我們之前說過EventLoopGroup
可以理解成一個線程池,MultithreadEventExecutorGroup
有一個線程數組EventExecutor[] children
屬性,而傳遞過來的DEFAULT_EVENT_LOOP_THREADS
就是數組的長度。
先看下MultithreadEventExecutorGroup
中的構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
// ... 省略
}
這段代碼執行邏輯可以理解為:
- 通過
ThreadPerTaskExecutor
構造一個Executor
執行器,后面會細說,里面包含了線程執行的execute()
方法 - 接着創建一個
EventExecutor
數組對象,大小為傳遞進來的threads
數量,這個所謂的EventExecutor
可以理解為我們的EventLoop
,在這個demo中就是NioEventLoop
對象 - 最后調用
newChild
方法逐個初始化EventLoopGroup
中的EventLoop
對象
上面只是大概說了下MultithreadEventExecutorGroup
中的構造方法做的事情,后面還會一個個詳細展開,先不用着急,我們先有個整體的認知就好。
再回到MultithreadEventExecutorGroup
中的構造方法入參中,有個EventExecutorChooserFactory
對象,這里面是有個很亮眼的細節設計,通過它我們來洞悉Netty
的良苦用心。
2.1、亮點設計:DefaultEventExecutorChooserFactory
EventExecutorChooserFactory
這個類的作用是用來選擇EventLoop
執行器的,我們知道EventLoopGroup
是一個包含了CPU * 2
個數量的EventLoop
數組對象,那每次選擇EventLoop
來執行任務是選擇數組中的哪一個呢?
我們看一下這個類的具體實現,紅框中
都是需要重點查看的地方:
DefaultEventExecutorChooserFactory
是一個選擇器工廠類,調用里面的next()
方法達到一個輪詢選擇的目的。
數組的長度是length,執行第n次,取數組中的哪個元素就是對length取余
繼續回到代碼的實現,這里的優化就是在於先通過isPowerOfTwo()
方法判斷數組的長度是否為2的n次冪,判斷的方式很巧妙,使用val & -val == val
,這里我不做過多的解釋,網上還有很多判斷2的n次冪的優秀解法,我就不班門弄斧了。(可參考:https://leetcode-cn.com/problems/power-of-two/solution/2de-mi-by-leetcode/)
當然我認為這里還有更容易理解的一個算法:x & (x - 1) == 0
大家可以看下面的圖就懂了,這里就不延展了:
BUT!!! 這里為什么要去煞費苦心的判斷數組的長度是2的n次冪?
不知道小伙伴們是否還記得大明湖畔的HashMap
?一般我們要求HashMap
數組的長度需要是2的n次冪,因為在key
值尋找數組位置的方法:(n - 1) & hash
n是數組長度,這里如果數組長度是2的n次冪就可以通過位運算來提升性能,當length
為2的n次冪時下面公式是等價的:
n & (length - 1) <=> n % length
還記得上面說過,數組的長度默認都是CPU * 2
,而一般服務器CPU核心數都是2、4、8、16等等,所以這一個小優化就很實用了,再仔細想想,原來數組長度的初始化也是很講究的。
這里位運算的好處就是效率遠遠高於與運算,Netty
針對於這個小細節都做了優化,真是太棒了。
2.3、線程執行器:ThreadPerTaskExecutor
接着看下ThreadPerTaskExecutor
線程執行器,每次執行任務都會通過它來創建一個線程實體。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
傳遞進來的threadFactory
為DefaultThreadFactory
,這里面會構造NioEventLoop
線程命名規則為nioEventLoop-1-xxx
,我們就不細看這個了。當線程執行的時候會調用execute()
方法,這里會創建一個FastThreadLocalThread
線程,具體看代碼:
public class DefaultThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
這里通過newThread()
來創建一個線程,然后初始化線程對象數據,最終會調用到Thread.init()
中。
2.4、EventLoop初始化
接着繼續看MultithreadEventExecutorGroup
構造方法:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
// .... 省略部分代碼
}
}
上面代碼的最后一部分是 newChild
方法, 這個是一個抽象方法, 它的任務是實例化 EventLoop
對象. 我們跟蹤一下它的代碼, 可以發現, 這個方法在 NioEventLoopGroup
類中實現了, 其內容很簡單:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
其實就是實例化一個 NioEventLoop
對象, 然后返回。NioEventLoop
構造函數中會保存provider
和事件輪詢器selector
,在其父類中還會創建一個MpscQueue隊列
,然后保存線程執行器executor
。
再回過頭來想一想,MultithreadEventExecutorGroup
內部維護了一個 EventExecutor[] children
數組, Netty
的 EventLoopGroup
的實現機制其實就建立在 MultithreadEventExecutorGroup
之上。
每當 Netty
需要一個 EventLoop
時, 會調用 next()
方法從EventLoopGroup
數組中獲取一個可用的 EventLoop
對象。其中next
方法的實現是通過NioEventLoopGroup.next()
來完成的,就是用的上面有過講解的通過輪詢算法來計算得出的。
最后總結一下整個 EventLoopGroup
的初始化過程:
EventLoopGroup
(其實是MultithreadEventExecutorGroup
) 內部維護一個類型為EventExecutor children
數組,數組長度是nThreads
- 如果我們在實例化
NioEventLoopGroup
時, 如果指定線程池大小, 則nThreads
就是指定的值, 反之是處理器核心數 * 2
MultithreadEventExecutorGroup
中會調用newChild
抽象方法來初始化children
數組- 抽象方法
newChild
是在NioEventLoopGroup
中實現的, 它返回一個NioEventLoop
實例. NioEventLoop
屬性:SelectorProvider provider
屬性:NioEventLoopGroup
構造器中通過SelectorProvider.provider()
獲取一個SelectorProvider
Selector selector
屬性:NioEventLoop
構造器中通過調用通過selector = provider.openSelector()
獲取一個selector
對象.
2.5、NioSocketChannel
在Netty
中,Channel
是對Socket
的抽象,每當Netty
建立一個連接后,都會有一個與其對應的Channel
實例。
我們在開頭的Demo
中,設置了channel(NioSocketChannel.class)
,NioSocketChannel
的類結構如下:
接着分析代碼,當我們調用b.channel()
時實際上會進入AbstractBootstrap.channel()
邏輯,接着看AbstractBootstrap
中代碼:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
可以看到,這里ReflectiveChannelFactory
其實就是返回我們指定的channelClass:NioSocketChannel
, 然后指定AbstractBootstrap
中的channelFactory = new ReflectiveChannelFactory()
。
2.6、Channel初始化流程
到了這一步,我們已經知道NioEventLoopGroup
和channel()
的流程,接着來看看Channel
的 初始化流程,這也是Netty
客戶端啟動的的核心流程之一:
ChannelFuture f = b.connect(HOST, PORT).sync();
接着就開始從b.connect()
為入口一步步往后跟,先看下NioSocketChannel
構造的整體流程:
從connet
往后梳理下整體流程:
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> AbstractBootstrap.initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
為了更易讀,這里代碼都做了簡化,只保留了一些重要的代碼。
緊接着我們看看channelFactory.newChannel()
做了什么,這里channelFactory
是ReflectiveChannelFactory
,我們在上面的章節分析過:
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
這里的clazz
是NioSocketChannel
,同樣是在上面章節講到過,這里是調用NioSocketChannel
的構造函數然后初始化一個Channel
實例。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
private static SocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
}
這里其實也很簡單,就是創建一個Java NIO SocketChannel
而已,接着看看NioSocketChannel
的父類還做了哪些事情,這里梳理下類的關系:
NioSocketChannel -> extends AbstractNioByteChannel -> exntends AbstractNioChannel
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
ch.configureBlocking(false);
}
}
這里會調用父類的構造參數,並且傳遞readInterestOp = SelectionKey.OP_READ:
,這里還有一個很重要的點,配置 Java NIO SocketChannel
為非阻塞的,我們之前在NIO
章節的時候講解過,這里也不再贅述。
接着繼續看AbstractChannel
的構造函數:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
}
這里創建一個ChannelId
,創建一個Unsafe
對象,這里的Unsafe
並不是Java中的Unsafe,后面也會講到。然后創建一個ChannelPipeline
,后面也會講到,到了這里,一個完整的NioSocketChannel
就初始化完成了,我們再來總結一下:
Netty
的SocketChannel
會與Java
原生的SocketChannel
綁定在一起;- 會注冊
Read
事件; - 會為每一個
Channel
分配一個channelId
; - 會為每一個
Channel
創建一個Unsafe
對象; - 會為每一個
Channel
分配一個ChannelPipeline
;
2.7、Channel 注冊流程
還是回到最上面initAndRegister
方法,我們上面都是在分析里面newChannel
的操作,這個方法是NioSocketChannel
創建的一個流程,接着我們在繼續跟init()
和register()
的過程:
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
}
init()
就是將一些參數options
和attrs
設置到channel
中,我們重點需要看的是register
方法,其調用鏈為:
AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register
這里最后到了unsafe
的register()
方法,最終調用到AbstractNioChannel.doRegister()
:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
}
javaChannel()
就是Java NIO
中的SocketChannel
,這里是將SocketChannel
注冊到與eventLoop
相關聯的selector
上。
最后我們整理一下服務啟動的整體流程:
initAndRegister()
初始化並注冊什么呢?
channelFactory.newChannel()
- 通過反射創建一個
NioSocketChannel
- 將
Java
原生Channel
綁定到NettyChannel
中 - 注冊
Read
事件 - 為
Channel
分配id
- 為
Channel
創建unsafe
對象 - 為
Channel
創建ChannelPipeline
(默認是head<=>tail
的雙向鏈表)
- `init(channel)``
- 把
Bootstrap
中的配置設置到Channel
中
register(channel)
- 把
Channel
綁定到一個EventLoop
上 - 把
Java
原生Channel、Netty
的Channel、Selector
綁定到SelectionKey
中 - 觸發
Register
相關的事件
2.8 unsafe初始化
上面有提到過在初始化Channel
的過程中會創建一個Unsafe
的對象,然后綁定到Channel
上:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newUnsafe
直接調用到了NioSocketChannel
中的方法:
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
NioSocketChannelUnsafe
是NioSocketChannel
中的一個內部類,然后向上還有幾個父類繼承,這里主要是對應到相關Java
底層的Socket
操作。
2.9 pipeline初始化
我們還是回到pipeline
初始化的過程,來看一下newChannelPipeline()
的具體實現:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
我們調用 DefaultChannelPipeline
的構造器, 傳入了一個 channel
, 而這個 channel
其實就是我們實例化的 NioSocketChannel
。
DefaultChannelPipeline
會將這個 NioSocketChannel
對象保存在channel
字段中. DefaultChannelPipeline
中, 還有兩個特殊的字段, 即 head
和 tail
, 而這兩個字段是一個雙向鏈表的頭和尾. 其實在 DefaultChannelPipeline
中, 維護了一個以 AbstractChannelHandlerContext
為節點的雙向鏈表, 這個鏈表是 Netty
實現 Pipeline
機制的關鍵.
關於 DefaultChannelPipeline
中的雙向鏈表以及它所起的作用, 我們會在后續章節詳細講解。這里只是對pipeline
做個初步的認識。
HeadContext
的繼承層次結構如下所示:
TailContext
的繼承層次結構如下所示:
我們可以看到, 鏈表中 head
是一個 ChannelOutboundHandler
, 而 tail
則是一個 ChannelInboundHandler
.
3.0、客戶端connect過程
客戶端連接的入口方法還是在Bootstrap.connect()
中,上面也分析過一部分內容,請求的具體流程是:
Bootstrap.connect() -> AbstractChannel.coonnect() -> NioSocketChannel.doConnect()
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
看到這里,還是用Java NIO SocketChannel
發送的connect
請求進行客戶端連接請求。
總結
本篇文章以一個Netty Client demo
為入口,然后解析了NioEventLoopGroup
創建的流程、Channel
的創建和注冊的流程,以及客戶端發起connect
的具體流程,這里對於很多細節並沒有很深的深入下去,這些會放到后續的源碼分析文章,敬請期待~