Netty服務端啟動流程源碼分析

前記
哈嘍,自從上篇《Netty之旅二:口口相傳的高性能Netty到底是什么?》后,遲遲兩周才開啟今天的Netty源碼系列。源碼分析的第一篇文章,下一篇我會分享客戶端的啟動過程源碼分析。通過源碼的閱讀,我們將會知道,Netty 服務端啟動的調用鏈是非常長的,同時肯定也會發現一些新的問題,隨着我們源碼閱讀的不斷深入,相信這些問題我們也會一一攻破。
廢話不多說,直接上號!

一、從EchoServer示例入手

示例從哪里來?任何開源框架都會有自己的示例代碼,Netty源碼也不例外,如模塊netty-example中就包括了最常見的EchoServer示例,下面通過這個示例進入服務端啟動流程篇章。
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// 1. 聲明Main-Sub Reactor模式線程池:EventLoopGroup
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 創建 EchoServerHandler 對象
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
// 2. 聲明服務端啟動引導器,並設置相關屬性
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// 3. 綁定端口即啟動服務端,並同步等待
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// 4. 監聽服務端關閉,並阻塞等待
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// 5. 優雅地關閉兩個EventLoopGroup線程池
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
- [代碼行18、19]聲明
Main-Sub Reactor模式線程池:EventLoopGroup
創建兩個 EventLoopGroup 對象。其中,bossGroup用於服務端接受客戶端的連接,workerGroup用於進行客戶端的 SocketChannel 的數據讀寫。
(關於EventLoopGroup不是本文重點所以在后續文章中進行分析)
- [代碼行23-39]聲明服務端啟動引導器,並設置相關屬性
AbstractBootstrap是一個幫助類,通過方法鏈(method chaining)的方式,提供了一個簡單易用的方式來配置啟動一個Channel。io.netty.bootstrap.ServerBootstrap ,實現 AbstractBootstrap 抽象類,用於 Server 的啟動器實現類。io.netty.bootstrap.Bootstrap ,實現 AbstractBootstrap 抽象類,用於 Client 的啟動器實現類。如下類圖所示:

(在EchoServer示例代碼中,我們看到 ServerBootstrap 的 group、channel、option、childHandler 等屬性鏈式設置都放到關於AbstractBootstrap體系代碼中詳細介紹。)
- [代碼行43]綁定端口即啟動服務端,並同步等待
先調用 #bind(int port) 方法,綁定端口,后調用 ChannelFuture#sync() 方法,阻塞等待成功。對於bind操作就是本文要詳細介紹的"服務端啟動流程"。
- [代碼行47]監聽服務端關閉,並阻塞等待
先調用 #closeFuture() 方法,監聽服務器關閉,后調用 ChannelFuture#sync() 方法,阻塞等待成功。 注意,此處不是關閉服務器,而是channel的監聽關閉。
- [代碼行51、52]優雅地關閉兩個
EventLoopGroup線程池
finally代碼塊中執行說明服務端將最終關閉,所以調用 EventLoopGroup#shutdownGracefully() 方法,分別關閉兩個EventLoopGroup對象,終止所有線程。
二、服務啟動過程
在服務啟動過程的源碼分析之前,這里回顧一下我們在通過JDK NIO編程在服務端啟動初始的代碼:
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
這5行代碼標示一個最為熟悉的過程:
- 打開
serverSocketChannel - 配置非阻塞模式
- 為
channel的socket綁定監聽端口 - 創建
Selector - 將
serverSocketChannel注冊到selector
后面等分析完Netty的啟動過程后,會對這些步驟有一個新的認識。在EchoServer示例中,進入 #bind(int port) 方法,AbstractBootstrap#bind()其實有多個方法,方便不同地址參數的傳遞,實際調用的方法是AbstractBootstrap#doBind(final SocketAddress localAddress) 方法,代碼如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
- [代碼行2] :調用
#initAndRegister()方法,初始化並注冊一個Channel對象。因為注冊是異步的過程,所以返回一個ChannelFuture對象。詳細解析,見 「initAndRegister()」。 - [代碼行4-6]]:若發生異常,直接進行返回。
- [代碼行9-34]:因為注冊是異步的過程,有可能已完成,有可能未完成。所以實現代碼分成了【第 10 至 14 行】和【第 15 至 36 行】分別處理已完成和未完成的情況。
- 核心在[第 11 、29行],調用
#doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise)方法,綁定 Channel 的端口,並注冊 Channel 到SelectionKey中。 - 如果異步注冊對應的
ChanelFuture未完成,則調用ChannelFuture#addListener(ChannelFutureListener)方法,添加監聽器,在注冊完成后,進行回調執行#doBind0(...)方法的邏輯。
- 核心在[第 11 、29行],調用
通過doBind方法可以知道服務端啟動流程大致如下幾個步驟:

1. 創建Channel

從#doBind(final SocketAddress localAddress)進入到initAndRegister():
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
[代碼行4]調用 ChannelFactory#newChannel() 方法,創建Channel對象。 ChannelFactory類繼承如下:

可以在ChannelFactory注釋看到@deprecated Use {@link io.netty.channel.ChannelFactory} instead.,這里只是包名的調整,對於繼承結構不變。netty默認使用ReflectiveChannelFactory,我們可以看到重載方法:
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
很明顯,正如其名是通過反射機制構造Channel對象實例的。constructor是在其構造方法初始化的:this.constructor = clazz.getConstructor();這個clazz按理說應該是我們要創建的Channel的Class對象。那Class對象是什么呢?我們接着看channelFactory是怎么初始化的。
首先在AbstractBootstrap找到如下代碼:
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
調用這個方法的遞推向上看到:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
這個方法正是在EchoServer中ServerBootstrap鏈式設置時調用.channel(NioServerSocketChannel.class)的方法。我們看到,channelClass就是NioServerSocketChannel.class,channelFactory也是以ReflectiveChannelFactory作為具體實例,並且將NioServerSocketChannel.class作為構造參數傳遞初始化的,所以這回答了反射機制構造的是io.netty.channel.socket.nio.NioServerSocketChannel對象。
繼續看NioServerSocketChannel構造方法邏輯做了什么事情,看之前先給出NioServerSocketChannel類繼承關系:

NioServerSocketChannel與NioSocketChannel分別對應服務端和客戶端,公共父類都是AbstractNioChannel和AbstractChannel,下面介紹創建過程可以參照這個Channel類繼承圖。進入NioServerSocketChannel構造方法:
/**
* Create a new instance
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
點擊newSocket進去:
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
以上傳進來的provider是DEFAULT_SELECTOR_PROVIDER即默認的java.nio.channels.spi.SelectorProvider,[代碼行9]就是熟悉的jdk nio創建ServerSocketChannel。這樣newSocket(DEFAULT_SELECTOR_PROVIDER)就返回了結果ServerSocketChannel,回到NioServerSocketChannel()#this()點進去:
/**
* Create a new instance using the given {@link ServerSocketChannel}.
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
以上super代表父類AbstractNioMessageChannel構造方法,點進去看到:
/**
* @see AbstractNioChannel#AbstractNioChannel(Channel, SelectableChannel, int)
*/
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
以上super代表父類AbstractNioChannel構造方法,點進去看到:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
以上[代碼行3]將ServerSocketChannel保存到了AbstractNioChannel#ch成員變量,在上面提到的NioServerSocketChannel構造方法的[代碼行6]javaChannel()拿到的就是ch保存的ServerSocketChannel變量。
以上[代碼行6]就是熟悉的jdk nio編程設置ServerSocketChannel非阻塞方式。這里還有super父類構造方法,點擊進去看到:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
以上構造方法中:
parent屬性,代表父Channel對象。對於NioServerSocketChannel的parent為null。id屬性,Channel編號對象。在構造方法中,通過調用#newId()方法進行創建。(這里不細展開Problem-1)unsafe屬性,Unsafe對象。因為Channel真正的具體操作,是通過調用對應的Unsafe對象實施。所以需要在構造方法中,通過調用#newUnsafe()方法進行創建。這里的Unsafe並不是我們常說的jdk自帶的sun.misc.Unsafe,而是io.netty.channel.Channel#Unsafe。(這里不細展開Problem-2)pipeline屬性默認是DefaultChannelPipeline對象,賦值后在后面為channel綁定端口的時候會用到
通過以上創建channel源碼過程分析,總結的流程時序圖如下:

2. 初始化Channel

回到一開始創建Channel的initAndRegister()入口方法,在創建Channel后緊接着init(channel)進入初始化流程,因為是服務端初始化,所以是ServerBootstrap#init(Channel channel),代碼如下:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
-
[代碼 3 - 6 行]:
options0()方法返回的options保存了用戶在EchoServer中設置自定義的可選項集合,這樣ServerBootstrap將配置的選項集合,設置到了Channel的可選項集合中。 -
[代碼 8 - 15 行]:
attrs0()方法返回的attrs保存了用戶在EchoServer中設置自定義的屬性集合,這樣ServerBootstrap將配置的屬性集合,設置到了Channel的屬性集合中。 -
[代碼21-28行]:通過局部變量
currentChildOptions和currentChildAttrs保存了用戶自定義的childOptions和childAttrs,用於[代碼43行]ServerBootstrapAcceptor構造方法。 -
[代碼30-47]]:創建
ChannelInitializer對象,添加到pipeline中,用於后續初始化ChannelHandler到pipeline中,包括用戶在EchoServer配置的LoggingHandler和創建的創建ServerBootstrapAcceptor對象。-
[代碼行34-37]:添加啟動器配置的
LoggingHandler到pipeline中。 -
[代碼行39-45]:創建
ServerBootstrapAcceptor對象,添加到pipeline中。從名字上就可以看出來,ServerBootstrapAcceptor也是一個ChannelHandler實現類,專門用於接受客戶端的新連接請求,把新的請求扔給某個事件循環器,我們先不做過多分析。我們發現是使用EventLoop.execute執行添加的過程,這是為什么呢?同樣記錄問題(Problem-3) -
需要說明的是
pipeline在之前介紹Netty核心組件的時候提到是一個包含ChannelHandlerContext的雙向鏈表,每一個context對於唯一一個ChannelHandler,這里初始化后,ChannelPipeline里就是如下一個結構:
-
3. 注冊Channel

初始化Channel一些基本配置和屬性完畢后,回到一開始創建Channel的initAndRegister()入口方法,在初始化Channel后緊接着[代碼行17] ChannelFuture regFuture = config().group().register(channel);明顯這里是通過EventLoopGroup進入注冊流程(EventLoopGroup體系將在后續文章講解)
在EchoServer中啟動器同樣通過ServerBootstrap#group()設置了NioEventLoopGroup,它繼承自MultithreadEventLoopGroup,所以注冊流程會進入MultithreadEventLoopGroup重載的register(Channel channel)方法,代碼如下:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
這里會調用 next() 方法選擇出來一個 EventLoop 來注冊 Channel,里面實際上使用的是一個叫做 EventExecutorChooser 的東西來選擇,它實際上又有兩種實現方式 ——PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,本質上就是從 EventExecutor 數組中選擇一個 EventExecutor,我們這里就是 NioEventLoop,那么,它們有什么區別呢?(Problem-4:在介紹EventLoopGroup體系的后續文章中將會詳細講解,這里簡單地提一下,本質都是按數組長度取余數 ,不過,2 的 N 次方的形式更高效。)
接着,來到 NioEventLoop 的 register(channel) 方法,你會不會問找不到該方法?提示NioEventLoop 繼承SingleThreadEventLoop,所以父類方法:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
可以看到,先創建了一個叫做 ChannelPromise 的東西,它是 ChannelFuture 的子類。[代碼行9]又調回了 Channel 的 Unsafe 的 register () 方法,這里第一個參數是 this,也就是 NioEventLoop,第二個參數是剛創建的 ChannelPromise。
點擊 AbstractUnsafe#register(EventLoop eventLoop, final ChannelPromise promise) 方法進去,代碼如下:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
[代碼行15]這行代碼是設置 Channel 的 eventLoop 屬性。這行前面的代碼主要是在校驗傳入的 eventLoop 參數非空,校驗是否有注冊過以及校驗 Channel 和 eventLoop 類型是否匹配。
[代碼18、24]接着,跟蹤到 AbstractUnsafe#register0(ChannelPromise promise) 方法中:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
[代碼行9]進入 AbstractNioChannel#doRegister() 方法:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
[代碼行5]關鍵一行代碼,將 Java 原生NIO Selector與 Java 原生 NIO 的 Channel 對象(ServerSocketChannel) 綁定在一起,並將當前 Netty 的Channel通過 attachment 的形式綁定到 SelectionKey 上:
- 調用
#unwrappedSelector()方法,返回 Java 原生NIO Selector對象,而且每個NioEventLoop與Selector唯一一對應。 - 調用
SelectableChannel#register(Selector sel, int ops, Object att)方法,注冊 Java 原生NIO的Channel對象到NIO Selector對象上。
通過以上注冊channel源碼分析,總結流程的時序圖如下:

4. 綁定端口

注冊完Channel最后回到AbstractBootstrap#doBind() 方法,分析 Channel 的端口綁定邏輯。進入doBind0代碼如下:
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
- [代碼行7]:在前面
Channel注冊成功的條件下,調用EventLoop執行Channel的端口綁定邏輯。但是,實際上當前線程已經是EventLoop所在的線程了,為何還要這樣操作呢?答案在【第 5 至 6 行】的英語注釋,這里作為一個問題記着(Problem-5)。 - [代碼行11]:進入
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise),同樣立即異步返回並添加ChannelFutureListener.CLOSE_ON_FAILURE監聽事件。 - [代碼行13]:如果綁定端口之前的操作並沒有成功,自然也就不能進行端口綁定操作了,通過promise記錄異常原因。
AbstractChannel#bind(SocketAddress localAddress, ChannelPromise promise)方法如下:
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
pipeline是之前創建channel的時候創建的DefaultChannelPipeline,進入該方法:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
[在分析初始化流程的時候最后畫一個DefaultChannelPipeline內部的結構,能夠便於分析后面進入DefaultChannelPipeline一系列bind方法。]
首先,tail代表TailContext,進入AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
//省略部分代碼
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
[代碼行3]:findContextOutbound方法里主要是執行ctx = ctx.prev;那么得到的next就是綁定LoggingHandler的context
[代碼行6]:進入invokeBind(localAddress, promise)方法並直接執行LoggingHandler#bind(this, localAddress, promise),進入后的方法如下:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "BIND", localAddress));
}
ctx.bind(localAddress, promise);
}
設置了LoggingHandler的日志基本級別為默認的INFO后,進行綁定操作的信息打印。接着,繼續循環到AbstractChannelHandlerContext# bind(final SocketAddress localAddress, final ChannelPromise promise)方法執行ctx = ctx.prev取出HeadContext進入到bind方法:
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
兜兜轉轉,最終跳出了pipeline輪回到AbstractUnsafe#bind(final SocketAddress localAddress, final ChannelPromise promise) 方法,Channel 的端口綁定邏輯。代碼如下:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
//此處有省略...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//此處有省略...
}
做實事方法doBind進入后如下:
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
到了此處,服務端的 Java 原生 NIO ServerSocketChannel 終於綁定上了端口。
三、問題歸納
- Problem-1: 創建
Channel流程中AbstractChannel構造函數中為channel分配ID的算法如何實現? - Problem-2:
AbstractChannel內部類AbstractUnsafe的作用? - Problem-3: 初始化
channel流程中pipeline添加ServerBootstrapAcceptor是通過EventLoop.execute執行添加的過程,這是為什么呢? - Problem-4:注冊
channel流程中PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser的區別和優化原理? - Problem-5:綁定端口流程中調用
EventLoop執行Channel的端口綁定邏輯。但是,實際上當前線程已經是EventLoop所在的線程了,為何還要這樣操作呢?
小結
通過對Netty服務端啟動流程源碼分析,我們發現了在使用NIO的模式下,服務端啟動流程其實就是封裝了JDK NIO編程在服務端啟動的流程。只不過對原生JDK NIO進行了增強和優化,同時從架構設計上簡化了服務端流程的編寫。
最重要的是感謝彤哥、艿艿和俞超-閃電俠這些大佬前期的分享,能夠讓更多人學習源碼的旅途少走很多彎路,謝謝!
歡迎關注:

