首先附上一個簡單的服務端啟動代碼
1 public void bind(int port) throws Exception { 2 // 線程組 一個用於接受客戶端連接 一個用於IO操作 3 // parentGroup用於serverBootstrap的父類AbstractBootstrap使用的線程池 4 // AbstractBootstrap是個工廠類,用於接受客戶端連接 5 // 如果只監聽一個端口則只需要一個線程即可,因為一個NioServerSocketChannel只能夠與一個NioEventLoop綁定,該channel的所有操作均由綁定的NioEventLoop進行 6 EventLoopGroup parentGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("parent")); 7 // childGroup是ServerBootstrap使用的線程池 8 EventLoopGroup childGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("child")); 9 // netty啟動輔助類 10 ServerBootstrap bootstrap = new ServerBootstrap(); 11 try { 12 // 綁定線程組 13 bootstrap.group(parentGroup, childGroup) 14 // 設置使用的channel 15 .channel(NioServerSocketChannel.class) 16 // 設置參數 17 .option(ChannelOption.SO_BACKLOG, 1024) 18 // 綁定事件處理類 19 .childHandler(new ChildChannelHandler()); 20 // 綁定端口並且同步等待操作完成 21 ChannelFuture channelFuture = bootstrap.bind(port); 22 // 等待服務端鏈路關閉之后main函數才退出 23 channelFuture.channel().closeFuture().sync(); 24 } finally { 25 // 優雅關閉線程組,釋放資源 26 parentGroup.shutdownGracefully(); 27 childGroup.shutdownGracefully(); 28 } 29 }
其中第21行的bind方法便是啟動入口方法。該方法最終會調用AbstrractBootstrap#doBind方法。
1 private ChannelFuture doBind(final SocketAddress localAddress) { 2 // 創建、初始化、注冊channel 3 final ChannelFuture regFuture = initAndRegister(); 4 final Channel channel = regFuture.channel(); 5 if (regFuture.cause() != null) { 6 return regFuture; 7 } 8 9 final ChannelPromise promise; 10 // 將channel綁定到對應地址上 11 if (regFuture.isDone()) { 12 promise = channel.newPromise(); 13 doBind0(regFuture, channel, localAddress, promise); 14 } else { 15 // Registration future is almost always fulfilled already, but just in case it's not. 16 promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE); 17 regFuture.addListener(new ChannelFutureListener() { 18 @Override 19 public void operationComplete(ChannelFuture future) throws Exception { 20 doBind0(regFuture, channel, localAddress, promise); 21 } 22 }); 23 } 24 return promise; 25 }
首先創建、初始化、注冊netty的channel,由於注冊是一個異步的過程,所以initAndRegister()方法返回的是一個ChannelFuture,該類是netty對java的Future的拓展類。
在執行綁定操作之前需要先判斷是否注冊成功,如果成功則執行綁定方法,否則添加一個listener到Future中,listener會在future成功之后會被調用。
接下來看下initAndRegister()方法
1 final ChannelFuture initAndRegister() { 2 Channel channel; 3 try { 4 // 創建channel 5 channel = createChannel(); 6 } catch (Throwable t) { 7 return VoidChannel.INSTANCE.newFailedFuture(t); 8 } 9 10 try { 11 // 初始化channel 12 init(channel); 13 } catch (Throwable t) { 14 channel.unsafe().closeForcibly(); 15 return channel.newFailedFuture(t); 16 } 17 18 ChannelPromise regFuture = channel.newPromise(); 19 // 注冊channel 20 channel.unsafe().register(regFuture); 21 if (regFuture.cause() != null) { 22 if (channel.isRegistered()) { 23 channel.close(); 24 } else { 25 channel.unsafe().closeForcibly(); 26 } 27 } 28 return regFuture; 29 }
首先是創建channel實例,這里的channel是我們啟動時設置的,該方法是通過反射的方式創建channel,源碼如下
1 @Override 2 Channel createChannel() { 3 EventLoop eventLoop = group().next(); 4 return channelFactory().newChannel(eventLoop, childGroup); 5 } 6 7 private static final class ServerBootstrapChannelFactory<T extends ServerChannel> 8 implements ServerChannelFactory<T> { 9 10 private final Class<? extends T> clazz; 11 ServerBootstrapChannelFactory(Class<? extends T> clazz) { 12 this.clazz = clazz; 13 } 14 15 @Override 16 public T newChannel(EventLoop eventLoop, EventLoopGroup childGroup) { 17 try { 18 Constructor<? extends T> constructor = clazz.getConstructor(EventLoop.class, EventLoopGroup.class); 19 return constructor.newInstance(eventLoop, childGroup); 20 } catch (Throwable t) { 21 throw new ChannelException("Unable to create Channel from class " + clazz, t); 22 } 23 } 24 25 @Override 26 public String toString() { 27 return StringUtil.simpleClassName(clazz) + ".class"; 28 } 29 }
首先是從parentGroup中獲取一個eventLoop,然后將childGroup作為參數傳遞。
通過反射調用channel的構造方法創建實例,在創建channel的過程中做了三件重要的事情
1、與eventLoop綁定以及將childGroup作為IO線程池處理IO操作
2、創建對應的java socketChannel
3、初始化readInterestOp,這個參數后面會提到
具體源碼讀者自行查看NioServerSockerChannel的構造方法,其實現簡單,讀者可以自行跟蹤。
然后是初始化channel,將channel、childHandler的參數設置到對應對象上,然后將childHandler添加到channel的pipleline中,源碼如下
1 @Override 2 void init(Channel channel) throws Exception { 3 final Map<ChannelOption<?>, Object> options = options(); 4 synchronized (options) { 5 channel.config().setOptions(options); 6 } 7 8 final Map<AttributeKey<?>, Object> attrs = attrs(); 9 synchronized (attrs) { 10 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 11 @SuppressWarnings("unchecked") 12 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); 13 channel.attr(key).set(e.getValue()); 14 } 15 } 16 17 ChannelPipeline p = channel.pipeline(); 18 if (handler() != null) { 19 p.addLast(handler()); 20 } 21 22 final ChannelHandler currentChildHandler = childHandler; 23 final Entry<ChannelOption<?>, Object>[] currentChildOptions; 24 final Entry<AttributeKey<?>, Object>[] currentChildAttrs; 25 synchronized (childOptions) { 26 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 27 } 28 synchronized (childAttrs) { 29 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 30 } 31 32 p.addLast(new ChannelInitializer<Channel>() { 33 @Override 34 public void initChannel(Channel ch) throws Exception { 35 ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, 36 currentChildAttrs)); 37 } 38 }); 39 }
前面都是在初始化屬性,重要的最后一段代碼,它注冊了一個ChannelInitializer到channelpipline上,該ChannelInitializer的initChannel方法創建了一個ServerBootstrapAcceptor注冊到channel上,這個ServerBootstrapAcceptor會在服務器處理連接請求時使用,這個在我的另一篇服務端如何接受並分發請求文章中提到。
最后是注冊方法,代碼如下
1 public final void register(final ChannelPromise promise) { 2 if (eventLoop.inEventLoop()) { 3 register0(promise); 4 } else { 5 try { 6 eventLoop.execute(new Runnable() { 7 @Override 8 public void run() { 9 register0(promise); 10 } 11 }); 12 } catch (Throwable t) { 13 logger.warn( 14 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 15 AbstractChannel.this, t); 16 closeForcibly(); 17 closeFuture.setClosed(); 18 promise.setFailure(t); 19 } 20 } 21 }
先判斷當前線程是否是線程池的線程,如果是則直接執行注冊方法,否則提交任務到線程池。為什么要這樣做呢?
《Netty權威指南 第二版》中是這樣說到——首先判斷是否是NioEventLoop自身發起的操作。如果是,則不存在並發操作,直接執行Channel注冊;如果由其他線程發起,則封裝成一個Task放入消息隊列中異步執行。此處,由於是由ServerBootstrap所在線程執行的注冊操作,所以會將其封裝成Task投遞到NioEventLoop中執行。
個人覺得並非如此,注冊方法最終會調用到AbstractSelectableChannel#register方法(下面會講到),該方法使用synchronized關鍵字同步進行注冊,是線程安全的。所以這里並不是用來防止並發操作,僅僅是由於注冊操作結果用戶並不關心且注冊是耗時操作,所以這里為了提升性能做成了異步;同時還有一個作用是如果線程池還有未啟動的線程,提交任務能夠啟動一個線程。如有錯誤,希望指正。
我們繼續看register0方法
private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!ensureOpen(promise)) { return; } doRegister(); registered = true; // 將線程執行結果設置為成功 promise.setSuccess(); // 通知ChannelRegistered事件 pipeline.fireChannelRegistered(); if (isActive()) { pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } } }
doRegister方法是真正執行注冊操作的方法,注冊操作成功后會將promise的執行結果設置為成功,該方法同時會調用注冊的listener。
接下來看看AbstractNioChannel#doRegister方法
1 protected void doRegister() throws Exception { 2 boolean selected = false; 3 for (;;) { 4 try { 5 // 將channel注冊到多路復用器上 6 selectionKey = javaChannel().register(eventLoop().selector, 0, this); 7 return; 8 } catch (CancelledKeyException e) { 9 if (!selected) { 10 // Force the Selector to select now as the "canceled" SelectionKey may still be 11 // cached and not removed because no Select.select(..) operation was called yet. 12 eventLoop().selectNow(); 13 selected = true; 14 } else { 15 // We forced a select operation on the selector before but the SelectionKey is still cached 16 // for whatever reason. JDK bug ? 17 throw e; 18 } 19 } 20 } 21 }
這里將channel注冊到執行當前操作的eventLoop的多路復用器上,並且將附件設置channel作為附件,注冊的操作為0。
在SelectionKey類中定義了4中多路復用器的操作值,如下
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;
這里注冊的操作是0,也就是不關心任何操作,這是為什么呢?
《Netty權威指南 第二版》中是這樣說到——注冊方法是多台的,它既可以被NioServerSocketChannel用來監聽客戶端的連接接入,也可以注冊socketChannel用來監聽網絡讀或者寫操作。
那么什么時候會將操作設置為正確的值呢?請往后看。
經過以上流程遍完成了channel的創建、初始化、注冊,這里只講了大致的流程,其中比較細節的,例如流程中用到的參數是何時初始化的等問題讀者可以自行閱讀源碼。
接下來回頭看doBind方法,執行完了initAndRegister方法后,接下來會執行綁定操作,讓我們看看doBind0方法源碼
1 private static void doBind0( 2 final ChannelFuture regFuture, final Channel channel, 3 final SocketAddress localAddress, final ChannelPromise promise) { 4 5 // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up 6 // the pipeline in its channelRegistered() implementation. 7 channel.eventLoop().execute(new Runnable() { 8 @Override 9 public void run() { 10 if (regFuture.isSuccess()) { 11 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); 12 } else { 13 promise.setFailure(regFuture.cause()); 14 } 15 } 16 }); 17 }
doBind0方法很簡單,將綁定操作提交到線程池中,這樣做的原因與注冊操作是一樣的。
channel的bind方法最終會執行到AbstractUnsafe#bind方法
@Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } // 省略部分代碼 // 判斷channel是否已經激活 boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } promise.setSuccess(); }
doBind方法是真正執行綁定操作的方法,是調用java的的ServerSocket#bind方法。
綁定成功之后判斷是否是第一次注冊,如果是則通知channelActive事件,代碼如下
1 @Override 2 public ChannelPipeline fireChannelActive() { 3 head.fireChannelActive(); 4 if (channel.config().isAutoRead()) { 5 channel.read(); 6 } 7 return this; 8 }
通知完channelActive事件后會進行判斷,channel是否是自動讀,該值默認為true,所以會默認調用channel.read方法,該方法最終會調用AbstractNioUnsafe#doBeginRead方法,該方法代碼如下
1 protected void doBeginRead() throws Exception { 2 if (inputShutdown) { 3 return; 4 } 5 final SelectionKey selectionKey = this.selectionKey; 6 if (!selectionKey.isValid()) { 7 return; 8 } 9 final int interestOps = selectionKey.interestOps(); 10 if ((interestOps & readInterestOp) == 0) { 11 selectionKey.interestOps(interestOps | readInterestOp); 12 } 13 }
注意第11行代碼,將selectionKey的注冊操作改為了readInterestOp,該值是一個NioServerSocketChannel的父類AbstractNioChannel的屬性,讓我們看下NioServerSocketChannel的構造函數
1 public NioServerSocketChannel(EventLoop eventLoop, EventLoopGroup childGroup) { 2 super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT); 3 config = new DefaultServerSocketChannelConfig(this, javaChannel().socket()); 4 }
可以看到,NioServerSocketChannel將readInterestOp設置成了OP_ACCEPT。所以當服務端channel完成綁定操作之后會將注冊到多路復用器上的操作變為OP_ACCEPT。
以上便是服務端的啟動流程,客戶端的啟動流程實際上與服務端類似,讀者可以自行閱讀源碼。
在文章開頭的啟動代碼中,還有一個地方沒有講到,那就是最后的優雅停機,這個實現較為簡單但是涉及到許多代碼這里不贅述,這里僅大概說一下他的原理。優雅停機與java的線程池關閉類似,都是通過一個state變量來表示線程池的狀態,當線程池中的線程判斷到state變為關閉等狀態時,便會執行退出操作,當所有線程都退出之后便完成了優雅停機。
大家可以看到在netty服務端啟動過程中大量的涉及到了線程池操作,線程池是netty的核心之一,所有的操作都是在NioEventLoop中進行,感興趣的可以自行了解一下NioEventLoop的run方法,關於netty的線程模型可以看下這篇文章https://www.infoq.cn/article/netty-threading-model。