編者注:Netty是Java領域有名的開源網絡庫,特點是高性能和高擴展性,因此很多流行的框架都是基於它來構建的,比如我們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,一般都是基於Netty來構建,比如soft-bolt。總之一句話,Java小伙伴們需要且有必要學會使用Netty並理解其實現原理。
關於Netty的入門講解可參考:Netty 入門,這一篇文章就夠了
Netty的啟動流程(ServerBootstrap
),就是創建NioEventLoopGroup
(內部可能包含多個NioEventLoop,每個eventLoop是一個線程,內部包含一個FIFO的taskQueue和Selector)和ServerBootstrap實例,並進行bind的過程(bind流程涉及到channel的創建和注冊),之后就可以對外提供服務了。
Netty的啟動流程中,涉及到多個操作,比如register、bind、注冊對應事件等,為了不影響main線程執行,這些工作以task的形式提交給NioEventLoop,由NioEventLoop來執行這些task,也就是register、bind、注冊事件等操作。
NioEventLoop(准確來說是SingleThreadEventExecutor
)中包含了private volatile Thread thread
,該thread變量的初始化是在new的線程第一次執行run方式時才賦值的,這種形式挺新穎的。
Netty啟動流程圖如下所示:
大致了解了Netty啟動流程之后,下面就按照Netty啟動流程中涉及到的源碼來進行分析。
netty啟動流程分為server端和client端,不同之處就是前者監聽端口,對外提供服務(socket->bind->listen操作),對應類ServerBootstrap;后者主動去連接遠端端口(socket->connect),對應類Bootstrap。
server端啟動流程
server端啟動流程可以理解成創建ServerBootstrap實例的過程,就以下面代碼為例進行分析(echo服務):
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// bossGroup處理connect事件
// workerGroup處理read/write事件
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 當連接建立后(register到childWorkerGroup前)初始化channel.pipeline
ch.pipeline().addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
EventLoopGroup創建
EventLoopGroup中可能包含了多個EventLoop,EventLoop是一個Reactor模型的事件處理器,一個EventLoop對應一個線程,其內部會維護一個selector和taskQueue,負責處理客戶端請求和內部任務,內部任務如ServerSocketChannel注冊和ServerSocket綁定操作等。關於NioEventLoop,后續專門寫一篇文章分析,這里就不再展開,只需知道個大概即可,其架構圖如下:
EventLoopGroup創建本質就是創建多個NioEventLoop,這里創建NioEventLoop就是初始化一個Reactor,包括selector和taskQueue。主要邏輯如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// 創建NioEventLoop實例
children = new EventExecutor[nThreads];
// 初始化NioEventLoop,實際調用的是NioEventLoopGroup.newChild方法
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
// 多個NioEventLoop中選擇策略
chooser = chooserFactory.newChooser(children);
}
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 創建taskQueue
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
// 是不是很熟悉,java nio selector操作
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
EventLoopGroup創建OK后,啟動的第一步就算完成了,接下來該進行bind、listen操作了。
ServerBootstrap流程
bind操作
bind操作是ServerBootstrap流程重要的一環,bind流程涉及到NioChannel的創建、初始化和注冊(到Selector),啟動NioEventLoop,之后就可以對外提供服務了。
public ChannelFuture bind(SocketAddress localAddress) {
validate(); // 參數校驗
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1. 初始化注冊操作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 2. doBind0操作
if (regFuture.isDone()) {
// register已完成,這里直接調用doBind0
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// register還未完成,注冊listener回調,在回調中調用doBind0
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
/**
* channel register完成(注冊到Selector並且調用了invokeHandlerAddedIfNeeded)之后,
* 會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這里的operationComplete方法
*/
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
這里涉及到2個操作,一個是channel的創建、初始化、注冊操作,另一個是bind操作,下面兵分兩路,分別來講。
注意,這里如果main線程執行到regFuture.isDone()時,register還未完成,那么main線程是不會直接調用bind操作的,而是往regFuture上注冊一個Listenner,這樣channel register完成(注冊到Selector並且調用了invokeHandlerAddedIfNeeded)之后,會調用safeSetSuccess,觸發各個ChannelFutureListener,最終會調用到這里的operationComplete方法,進而在執行bind操作。
channel初始化、注冊操作
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1.創建(netty自定義)Channel實例,並初始化
// channel為 NioServerSocketChannel 實例,NioServerSocketChannel的父類AbstractNioChannel保存有nio的ServerSocketChannel
channel = channelFactory.newChannel();
// 2.初始化channel()
init(channel);
} catch (Throwable t) {
}
// 3.向Selector注冊channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
這里重點關注下初始化channel流程,主要操作是設置channel屬性、設置channel.pipeline的ChannelInitializer,注意,ChannelInitializer是在channel注冊到selector之后被回調的。
/**
* 初始channel屬性,也就是ChannelOption對應socket的各種屬性。
* 比如 SO_KEEPALIVE SO_RCVBUF ... 可以與Linux中的setsockopt函數對應起來。
* 最后將ServerBootstrapAcceptor添加到對應channel的ChannelPipeline中。
*/
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
ChannelPipeline p = channel.pipeline();
// 獲取childGroup和childHandler,傳遞給ServerBootstrapAcceptor
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
/**
* 在register0中,將channel注冊到Selector之后,會調用invokeHandlerAddedIfNeeded,
* 進而調用到這里的initChannel方法
*/
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 這里注冊一個添加ServerBootstrapAcceptor的任務
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加ServerBootstrapAcceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
channel初始化之后就該將其注冊到selector,即下面的register流程:
public ChannelFuture register(Channel channel) {
// next()挑選一個EventLoop,默認輪詢選擇某個NioEventLoop
return next().register(channel);
}
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
// AbstractChannel
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
// 直接執行register0或者以任務方式提交執行
// 啟動時,首先執行到這里的是main線程,所以是以任務的方式來提交執行的。
// 也就是說,該任務是NioEventLoop第一次執行的任務,即調用register0
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
// 往NioEventLoop中(任務隊列)添加任務時,如果NioEventLoop線程還未啟動,則啟動該線程
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
register操作
register操作之后伴隨着多個回調及listener的觸發:
// AbstractChannel$AbstractUnsafe
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
// 這里調用的是AbstractNioChannel.doRegister
// 這里將channel注冊上去,並沒有關注對應的事件(read/write事件)
doRegister();
neverRegistered = false;
registered = true;
// 調用handlerAdd事件,這里就會調用initChannel方法,設置channel.pipeline,也就是添加 ServerBootstrapAcceptor
pipeline.invokeHandlerAddedIfNeeded();
// 調用operationComplete回調
safeSetSuccess(promise);
// 回調fireChannelRegistered
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
// 回調fireChannelActive
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
上面代碼中的initChannel回調也就是設置對外監聽channel的channelHanlder為ServerBootstrapAcceptor;operationComplete回調也就是觸發ChannelFutureListener.operationComplete
,這里會進行后續的doBind操作。
// AbstractBootstrap
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// doBind0向EventLoop任務隊列中添加一個bind任務來完成后續操作。
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// bind操作
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
});
}
bind操作
在回顧上面的bind操作代碼,bind操作是在register之后進行的,因為register0是由NioEventLoop執行的,所以main線程需要先判斷下future是否完成,如果完成直接進行doBind即可,否則添加listener回調進行doBind。
bind操作及后續初始化操作(channelActive回調、設置監聽事件)
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
// 調用底層bind操作
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
// 最后底層bind邏輯bind入參包括了backlog,也就是底層會進行listen操作
// DefaultChannelPipeline.headContext -> NioMessageUnsafe -> NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回調fireChannelActive
ctx.fireChannelActive();
// 設置selectKey監聽事件,對於監聽端口就是SelectionKey.OP_ACCEPT,對於新建連接就是SelectionKey.OP_READ
readIfIsAutoRead();
}
到這里為止整個netty啟動流程就基本接近尾聲,可以對外提供服務了。
推薦閱讀
- Netty 入門,這一篇文章就夠了
- Java常見幾種動態代理的對比
- 程序員必看| mockito原理淺析
- Eureka 原理分析
- MQ初窺門徑【面試必看的Kafka和RocketMQ存儲區別】
- java lambda 深入淺出
歡迎小伙伴關注【TopCoder】閱讀更多精彩好文。