(二)Netty源碼學習筆記之服務端啟動


 

  尊重原創,轉載注明出處,原文地址:http://www.cnblogs.com/cishengchongyan/p/6129971.html 

 

  本文將不會對netty中每個點分類講解,而是一個服務端啟動的代碼走讀,在這個過程中再去了解和學習,這也是博主自己的學習歷程。下面開始正文~~~~

  眾所周知,在寫netty服務端應用的時候一般會有這樣的啟動代碼:

 (代碼一)
1
EventLoopGroup bossGroup = new NioEventLoopGroup(1); 2 EventLoopGroup workerGroup = new NioEventLoopGroup(); 3 try { 4 ServerBootstrap bootStrap = new ServerBootstrap(); 5 bootStrap.group(bossGroup, workerGroup) 6   .channel(NioServerSocketChannel.class) 7   .childHandler(new WebsocketChatServerInitializer()) 8   .option(ChannelOption.SO_BACKLOG, 128) 9   .childOption(ChannelOption.SO_KEEPALIVE, true); 10 11 ChannelFuture f = bootStrap.bind(port).sync(); 12 f.channel().closeFuture().sync(); 13 } finally { 14 ... 15 }

  本文將沿着這條主線來走讀代碼,但是在走讀之前首先要先認識一下Netty中的reactor模式是怎么玩的。

  首先先借用Doug Lea在Scalable IO in Java中的經典的圖示:

  

  這張圖是經典的運用了多路復用的Reactor模式,也大致說明了在netty中各線程的工作模式,mainReactor負責處理客戶端的請求,subReacor負責處理I/O的讀寫操作,同時還會有一些用戶的線程,用於異步處理I/O數據,在整個過程中通過角色細化,有效地將線程資源充分利用起來,構建了一條無阻塞通道,最后將耗時的業務邏輯交由業務線程去處理。本文不會對reactor做過多的解讀,而是結合netty的線程池模式來學習。

  回到剛剛的主題,在服務端啟動的時候首先會new兩個NioEventLoopGroup,一個叫bossGroup(boss線程池),一個叫workerGroup(worker線程池),而這兩個就分別對應了上述的mainReactor和subReacor。接下來我們來看在new的過程中發生了什么。

  代碼走到MultithreadEventLoopGroup的構造方法中:

(代碼二)
1
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { 2 3 private static final int DEFAULT_EVENT_LOOP_THREADS; 4 5 static { 6 DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( 7 "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); 8 9 if (logger.isDebugEnabled()) { 10 logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); 11 } 12 } 13 14 protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { 15 super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); 16 } 17 ... 18 }

  

  可以看到如果參數傳入了thread個數就取這個數目,如果沒有傳入就取可用處理器(CPU)個數的2倍。因此【代碼一】中boss只有1個線程,而worker有2*cpu個數個線程。

  繼續往下走到了核心代碼MultithreadEventExecutorGroup中:

(代碼三)
1
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup { 2 3 private final EventExecutor[] children; 4 private final AtomicInteger childIndex = new AtomicInteger(); 5 private final AtomicInteger terminatedChildren = new AtomicInteger(); 6 private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); 7 private final EventExecutorChooser chooser; 8 9 protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { 10 if (nThreads <= 0) { 11 throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); 12 } 13 14 if (threadFactory == null) { 15 threadFactory = newDefaultThreadFactory(); 16 } 17 18 children = new SingleThreadEventExecutor[nThreads]; 19 if (isPowerOfTwo(children.length)) { 20 chooser = new PowerOfTwoEventExecutorChooser(); 21 } else { 22 chooser = new GenericEventExecutorChooser(); 23 } 24 25 for (int i = 0; i < nThreads; i ++) { 26 boolean success = false; 27 try { 28 children[i] = newChild(threadFactory, args); 29 success = true; 30 } catch (Exception e) { 31 // TODO: Think about if this is a good exception type 32 throw new IllegalStateException("failed to create a child event loop", e); 33 } finally { 34 if (!success) { 35 ...50 } 51 } 52 } 53 54 final FutureListener<Object> terminationListener = new FutureListener<Object>() { 55 @Override 56 public void operationComplete(Future<Object> future) throws Exception { 57 if (terminatedChildren.incrementAndGet() == children.length) { 58 terminationFuture.setSuccess(null); 59 } 60 } 61 }; 62 63 for (EventExecutor e: children) { 64 e.terminationFuture().addListener(terminationListener); 65 } 66 }

  

  首先new一個線程工廠newDefaultThreadFactory,然后給變量children賦值【PS:children是線程執行器的集合,幾個線程就會有幾個EventExecutor。因此EventExecutor是Reactor模式中真正執行工作的對象,它繼承自ScheduledExecutorService,所以應該明白它本質上是什么了吧】

  children是賦值new了給定線程數數量的SingleThreadEventExecutor,看其內部代碼,SingleThreadEventExecutor構造方法:

(代碼四)
1
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor { 2 ... 3 private final EventExecutorGroup parent; 4 private final Queue<Runnable> taskQueue; 5 private final Thread thread; 6 ... 7 protected SingleThreadEventExecutor( 8 EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { 9 10 if (threadFactory == null) { 11 throw new NullPointerException("threadFactory"); 12 } 13 14 this.parent = parent; 15 this.addTaskWakesUp = addTaskWakesUp; 16 17 thread = threadFactory.newThread(new Runnable() { 18 @Override 19 public void run() { 20 boolean success = false; 21 updateLastExecutionTime(); 22 try { 23 SingleThreadEventExecutor.this.run(); 24 success = true; 25 } catch (Throwable t) { 26 logger.warn("Unexpected exception from an event executor: ", t); 27 } finally { 28 ... 29 } 30 } 31 }); 32 threadProperties = new DefaultThreadProperties(thread); 33 taskQueue = newTaskQueue(); 34 } 35 ... 36 }

 

  回到剛剛的主題(代碼三),發現在children[i] = newChild(threadFactory, args);而newChild是抽象方法,由於最開始我們初始化的是NioEventLoopGroup,因此是在NioEventLoopGroup中調用的:

(代碼五)
1
protected EventExecutor newChild( 2 ThreadFactory threadFactory, Object... args) throws Exception { 3 return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); 4 }

  因此相當於我們有多少個work或boss線程就有多少個NioEventLoop,而每一個NioEventLoop都綁定了一個selector。所以,相當於一個NioEventLoopGroup有自定義線程數量的NioEventLoop。

  【PS:EventLoopGroup顧名思義是EventLoop的group,即包含了一組EventGroup。在實際的業務處理中,EventLoopGroup會通過EventLoop next()方法選擇一個 EventLoop,然后將實際的業務處理交給這個被選出的EventLoop去做。對於 NioEventLoopGroup來說,其真實功能都會交給EventLoopGroup去實現。】

  接下來我們重點去看一下EventLoop和EventLoopGroup,自己畫了這一塊的UML圖來理一下類關系:

  可以看出,EventLoop也繼承自EventLoopGroup,因此也是EventLoopGroup的一種。同時看到,這一堆類都實現自ScheduledExecutorService,那么大家應該理解EventLoop和EventLoopGroup本質上是什么東西了吧。這里先不鋪展開,下文中在講注冊邏輯時會對EventLoopGroup做一個更詳細的了解。

  我們先回到【代碼五主線】,我們接下來繼續看初始化邏輯:

(代碼六)
1
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { 2 super(parent, threadFactory, false); 3 if (selectorProvider == null) { 4 throw new NullPointerException("selectorProvider"); 5 } 6 provider = selectorProvider; 7 selector = openSelector(); 8 }

  初始化NioEventLoop時調用了openSelector來打開當前操作系統中一個默認的selector實現。

  回到【代碼一主線】,服務端初始化了boss和worker線程之后調用ServerBootstrap.group()來綁定兩個線程池調度器。接下來調用ServerBootstrap.channel(NioServerSocketChannel.class)。這塊邏輯很簡單就是在bootstrap內部初始化了一個class類型是NioServerSocketChannel的ChannelFactory,【PS:ChannelFactory不會指定生產對象的具體類型,只要繼承自Channel就可以了】。

  接下來,ServerBootstrap.childHandler()作用就是設置ChannelHandler來響應Channel的請求。一般這里都會設置抽象類ChannelInitializer,並且實現模板方法initChannel,在ChannelHandler注冊(初始化)的時候會調用initChannel來完成ChannelPipeline的初始化。

(代碼七)
1
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter { 2 3 protected abstract void initChannel(C ch) throws Exception; 4 5 @Override 6 @SuppressWarnings("unchecked") 7 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { 8 initChannel((C) ctx.channel()); 9 ctx.pipeline().remove(this); 10 ctx.fireChannelRegistered(); 11 } 12 ... 13 }

  關於ChannelHandler我們后面會做詳細的介紹,這里只需要了解到此就可以了。

  回到【代碼一主線】,接下來bootStrap.option()和childOption()分別是給boss線程和worder線程設置參數,這里先忽略。

  然后是綁定端口ChannelFuture f = bootStrap.bind(port);在這一步中不僅僅是綁定端口,實際上需要做大量的初始化工作。我們先看一下AbstractBootstrap中的核心代碼:

 (代碼八)
1
  private ChannelFuture doBind(final SocketAddress localAddress) { 2 final ChannelFuture regFuture = initAndRegister(); 3 final Channel channel = regFuture.channel(); 4 if (regFuture.cause() != null) { 5 return regFuture; 6 } 7 8 if (regFuture.isDone()) { 9 ChannelPromise promise = channel.newPromise(); 10 doBind0(regFuture, channel, localAddress, promise); 11 return promise; 12 } else { 13 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 14 regFuture.addListener(new ChannelFutureListener() { 15 @Override 16 public void operationComplete(ChannelFuture future) throws Exception { 17 Throwable cause = future.cause(); 18 if (cause != null) { 19 promise.setFailure(cause); 20 } else { 21 promise.executor = channel.eventLoop(); 22 } 23 doBind0(regFuture, channel, localAddress, promise); 24 } 25 }); 26 return promise; 27 } 28 }

   

  【代碼八主線】首先是initAndRegister(),看一下代碼:

(代碼九)
1
final ChannelFuture initAndRegister() { 2 final Channel channel = channelFactory().newChannel(); 3 try { 4 init(channel); 5 } catch (Throwable t) { 6 channel.unsafe().closeForcibly(); 7 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); 8 } 9 10 ChannelFuture regFuture = group().register(channel); 11 if (regFuture.cause() != null) { 12 if (channel.isRegistered()) { 13 channel.close(); 14 } else { 15 channel.unsafe().closeForcibly(); 16 } 17 } 18 19 return regFuture; 20 }

  

  首先調用工廠方法生成一個新Channel,我們剛剛說過,ChannelFactory不限定Channel的具體類型,而我們注冊的是NioServerSocketChannel,那么這里生產的就是該類型的Channel,然后調用init(),具體實現在ServerBootstrap中:

(代碼十)
1
  @Override 2 void init(Channel channel) throws Exception { 3 final Map<ChannelOption<?>, Object> options = options(); 4 synchronized (options) { 5 channel.config().setOptions(options); 6 } 7 8 final Map<AttributeKey<?>, Object> attrs = attrs(); 9 synchronized (attrs) { 10 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 11 @SuppressWarnings("unchecked") 12 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); 13 channel.attr(key).set(e.getValue()); 14 } 15 } 16 17 ChannelPipeline p = channel.pipeline(); 18 19 final EventLoopGroup currentChildGroup = childGroup; 20 final ChannelHandler currentChildHandler = childHandler; 21 final Entry<ChannelOption<?>, Object>[] currentChildOptions; 22 final Entry<AttributeKey<?>, Object>[] currentChildAttrs; 23 synchronized (childOptions) { 24 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 25 } 26 synchronized (childAttrs) { 27 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 28 } 29 30 p.addLast(new ChannelInitializer<Channel>() { 31 @Override 32 public void initChannel(Channel ch) throws Exception { 33 ChannelPipeline pipeline = ch.pipeline(); 34 ChannelHandler handler = handler(); 35 if (handler != null) { 36 pipeline.addLast(handler); 37 } 38 pipeline.addLast(new ServerBootstrapAcceptor( 39 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 40 } 41 }); 42 }

  在init()中做了大致這么幾件事:1,配置channel的option;2,配置channel的attr;3,ChannelPipeline增加兩個Handler,一個是bootstrap中的私有handler,一個是ServerBootstrapAcceptor(這個Handler用於接收客戶連接后設置其初始化參數)。

  【代碼九主線】完成了init之后調用EventLoopGroup.register(channel)完成了channel的注冊,實際上就是將channel注冊到EventLoop中的selector上。這塊我們可以了解一下其中的實現:

  先看一下EventLoopGroup接口:

(代碼十一)
1
public interface EventLoopGroup extends EventExecutorGroup { 2 3 @Override 4 EventLoop next(); 5 6 ChannelFuture register(Channel channel); 7 8 ChannelFuture register(Channel channel, ChannelPromise promise); 9 }

 

  其中next方法返回EventLoopGroup里的一個EventLoop,register用於注冊Channel到EventLoop里。【PS:EventLoopGroup顧名思義是EventLoop的group,即包含了一組EventGroup。在實際的業務處理中,EventLoopGroup會通過EventLoop next()方法選擇一個 EventLoop,然后將實際的業務處理交給這個被選出的EventLoop去做。對於 NioEventLoopGroup來說,其真實功能都會交給EventLoopGroup去實現】

  我們詳細看一下register到底如何實現的,往下看是在SingleThreadEventLoop里實現了該方法:  

 

(代碼十二)
1
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { 2   ... 3 @Override 4 public ChannelFuture register(Channel channel) { 5 return register(channel, new DefaultChannelPromise(channel, this)); 6 } 7 8 @Override 9 public ChannelFuture register(final Channel channel, final ChannelPromise promise) { 10 if (channel == null) { 11 throw new NullPointerException("channel"); 12 } 13 if (promise == null) { 14 throw new NullPointerException("promise"); 15 } 16 17 channel.unsafe().register(this, promise); 18 return promise; 19 } 20   ... 21 }

 

  注意,在這里調用了Channel的Unsafe內部類完成了注冊,因此接下來的東西都是NIO中的 【PS:Unsafe是定義在Channel中的內部接口,是不會被用戶代碼調用到的,但是在channel的I/O操作中實際上都是由unsafe來完成的。Unsafe不論是接口還是類,都會定義到channel的內部(例如Channel接口中定義了Unsafe接口,AbstractChannel抽象類中定義了AbstractUnsafe抽象類),因此如果將nio類比為一個linux系統的話,那么unsafe就是其中的內核空間】

  具體的register操作是在AbstractUnsafe中完成,在register()方法中調用了模板方法,我們看一下在AbstractNioChannel中的核心實現:

 

(代碼十三)
1
@Override 2 protected void doRegister() throws Exception { 3 boolean selected = false; 4 for (;;) { 5 try { 6 selectionKey = javaChannel().register(eventLoop().selector, 0, this); 7 return; 8 } catch (CancelledKeyException e) { 9 if (!selected) { 10 eventLoop().selectNow(); 11 selected = true; 12 } else { 13 throw e; 14 } 15 } 16 } 17 } 18 }

  

  這里實際上調用的是SelectableChannel中的register方法,作用就是將本channel注冊到本channel的eventLoop的Selector中,那么問題又來了,什么是SelectableChannel?【PS:它實現Channel接口,代碼注釋說明其是一種可以被Selector使用用於多路復用的Channel,SelectableChannel可以通過 register方法將自己注冊在Selector上,並提供其所關注的事件類型。因此,繼承自SelectableChannel的Channel才可以真正和Selector打交道,例如ServerSocketChannel和SocketChannel】

  繼續看其中的SelectableChannel中的實現:

  

 (代碼十四)
1
public final SelectionKey register(Selector sel, int ops, 2 Object att)throws ClosedChannelException{ 3 synchronized (regLock) { 4 ... 5 SelectionKey k = findKey(sel); 6 if (k != null) { 7 k.interestOps(ops); 8 k.attach(att); 9 } 10 if (k == null) { 11 // New registration 12 synchronized (keyLock) { 13 if (!isOpen()) 14 throw new ClosedChannelException(); 15 k = ((AbstractSelector)sel).register(this, ops, att); 16 addKey(k); 17 } 18 } 19 return k; 20 } 21 }

 

  這里的邏輯很清晰,如果該channel有在Selector中注冊過(有對應的SelectionKey),那么將這個key強制綁定到入參的Channel中(可能會導致之前綁定失效),如果該channel沒有在Selector中注冊過,那么調用AbstractSelector(底層JDK實現)該register邏輯。至此我們完成了register邏輯代碼的走讀。

  繼續回歸【代碼八主線】,我們已經完成了initAndRegister邏輯,如果不出意外那么regFuture.isDone()將是true,接下來調用了doBind0():

 (代碼十五)
1
  private static void doBind0( 2 final ChannelFuture regFuture, final Channel channel, 3 final SocketAddress localAddress, final ChannelPromise promise) { 4 5 channel.eventLoop().execute(new Runnable() { 6 @Override 7 public void run() { 8 if (regFuture.isSuccess()) { 9 channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); 10 } else { 11 promise.setFailure(regFuture.cause()); 12 } 13 } 14 }); 15 }

  

  這里有必要了解一下ChannelPromise,它擴展了Promise和ChannelFuture,是一個可寫入的ChannelFuture。我再在網上搜了很多資料里說它具備監聽器的功能。但是我自己不這么認為,我們看Promise接口在future的基礎上增加了setSuccess(), setFailure()這些方法,而ChannelFuture里success和failuer都是不可寫的。為什么呢?從定義上來看,ChannelFuture本來就是異步執行的結果,既然已經異步了那么在返回的時候本來就無法確定其成功或者失敗,而有的時候我們做校驗或者寫一些業務邏輯的時候可以確定其結果,因此我覺得ChannelPromise作為一個可寫的ChannelFuture是對其的一個補充,可以標記異步任務成功或者失敗,因此它是netty異步框架中實際使用的異步執行結果。在這里調用channel.bind(localAddress, promise);作用很明確就是給該channel綁定端口,然后該方法會立即返回一個ChannelPromise(不論這個實際的異步操作有沒有做完)。一般用法也是這樣的,方法定義時返回值都是ChannelFuture,而實現時實際返回的都是ChannelPromise。

  最后給立即返回的這個ChannelFuture添加一個listener。netty中有兩種方式獲取異步執行的真正結果,一種是調用老祖宗Future的get方法來獲取(阻塞等待),一種是添加listener(異步回調),netty中推薦使用第二種方式,在整個的netty異步框架中也大量使用了這種方式。剛剛添加的那個listener的作用是:如果注冊失敗了,那么就關閉該Channel。最后bind返回異步的ChannelPromise,完成整個bind流程。

  至此【代碼一主線】走讀完畢,我們大致瀏覽了一遍server端bootstrap啟動流程。

 

  最后大致總結一下服務端啟動的主流程:

  1. 初始化boss和worker線程調度器NioEventLoopGroup,打開其中的Selector對象並配置相關參數。
  2. ServerBootstrap綁定這兩個NioEventLoopGroup。
  3. 為server端確定綁定Channel的class類型(即將要使用什么類型),在本文的例子中綁定的是NioServerSocketChannel,實質上只是初始化ChannelFactory。(此時還沒有初始化該Channel,也沒有為Selector注冊該Channel)。
  4. 初始化用戶定義的ChannelInitializer,也就是在ChannelPipeline中添加用戶自己的ChannelHandler(此時還沒有注冊,只是初始化變量而已)。
  5. 調用bind(port)啟動監聽,整個bind的過程非常復雜,做了最核心的初始化工作:

    1) ChannelFactory生成核心的NioServerSocketChannel實例,為該Channel初始化參數,然后為NioServerSocketChannel的pipeline中再添加兩個netty框架的Handler。

    2) 將NioServerSocketChannel實例綁定到boss線程調度器的Selector中,此時boss線程被激活並開始接受I/O請求,同時所有的Pipeline中的Handler也會完成注冊。

    3) 異步為NioServerSocketChannel綁定注冊的端口。

  

  至此,ServerBootstrap啟動完畢,開始接收I/O請求。本文大致走讀了一遍服務端啟動的代碼,在走讀的過程中對一些概念進行解讀,相信大家在大腦中對netty的基本成員已經有了一個輪廓。那么服務端啟動之后,netty是如何接收並分發socket請求,pipeline中又是如何組織並調用handler,以及boss和worker如何協同工作將在下一篇博客中進行解讀。

  

  

  

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM