Netty源碼閱讀(一) ServerBootstrap啟動
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的調用過程,從這里面進一步理解Netty的線程模型,以及Reactor模式。

這是我畫的一個Netty啟動過程中使用到的主要的類的概要類圖,當然是用到的類比這個多得多,而且我也忽略了各個類的繼承關系,關於各個類的細節,可能以后會寫單獨的博客進行分析。在這里主要注意那么幾個地方:
1. ChannelPromise關聯了Channel和Executor,當然channel中也會有EventLoop的實例。
2. 每個channel有自己的pipeline實例。
3. 每個NioEventLoop中有自己的Executor實例和Selector實例。
網絡請求在NioEventLoop中進行處理,當然accept事件也是如此,它會把接收到的channel注冊到一個EventLoop的selector中,以后這個channel的所有請求都由所注冊的EventLoop進行處理,這也是Netty用來處理競態關系的機制,即一個channel的所有請求都在一個線程中進行處理,也就不會存在跨線程的沖突,因為這些調用都線程隔離了。
下面我們先看一段Netty源碼里面帶的example代碼,直觀感受一下Netty的使用:
1 // Configure the server. 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); 3 EventLoopGroup workerGroup = new NioEventLoopGroup(); 4 try { 5 ServerBootstrap b = new ServerBootstrap(); 6 b.group(bossGroup, workerGroup) 7 .channel(NioServerSocketChannel.class) 8 .option(ChannelOption.SO_BACKLOG, 100) // 設置tcp協議的請求等待隊列 9 .handler(new LoggingHandler(LogLevel.INFO)) 10 .childHandler(new ChannelInitializer<SocketChannel>() { 11 @Override 12 public void initChannel(SocketChannel ch) throws Exception { 13 ChannelPipeline p = ch.pipeline(); 14 if (sslCtx != null) { 15 p.addLast(sslCtx.newHandler(ch.alloc())); 16 } 17 p.addLast(new EchoServerHandler()); 18 } 19 }); 20 21 // Start the server. 22 ChannelFuture f = b.bind(PORT).sync(); 23 24 // Wait until the server socket is closed. 25 f.channel().closeFuture().sync(); 26 } finally { 27 // Shut down all event loops to terminate all threads. 28 bossGroup.shutdownGracefully(); 29 workerGroup.shutdownGracefully(); 30 }
首先我們先來了解Netty的主要類:
EventLoop 這個相當於一個處理線程,是Netty接收請求和處理IO請求的線程。
EventLoopGroup 可以理解為將多個EventLoop進行分組管理的一個類,是EventLoop的一個組。
ServerBootstrap 從命名上看就可以知道,這是一個對服務端做配置和啟動的類。
ChannelPipeline 這是Netty處理請求的責任鏈,這是一個ChannelHandler的鏈表,而ChannelHandler就是用來處理網絡請求的內容的。
ChannelHandler 用來處理網絡請求內容,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannlPipeline會從頭到尾順序調用ChannelInboundHandler處理網絡請求內容,從尾到頭調用ChannelOutboundHandler處理網絡請求內容。這也是Netty用來靈活處理網絡請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網絡協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。
現在看上面的代碼,首先創建了兩個EventLoopGroup對象,作為group設置到ServerBootstrap中,然后設置Handler和ChildHandler,最后調用bind()方法啟動服務。下面按照Bootstrap啟動順序來看代碼。
1 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { 2 super.group(parentGroup); 3 if (childGroup == null) { 4 throw new NullPointerException("childGroup"); 5 } 6 if (this.childGroup != null) { 7 throw new IllegalStateException("childGroup set already"); 8 } 9 this.childGroup = childGroup; 10 return this; 11 }
首先是設置EverLoopGroup,parentGroup一般用來接收accpt請求,childGroup用來處理各個連接的請求。不過根據開發的不同需求也可以用同一個group同時作為parentGroup和childGroup同時處理accpt請求和其他io請求。
1 public B channel(Class<? extends C> channelClass) { 2 if (channelClass == null) { 3 throw new NullPointerException("channelClass"); 4 } 5 return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); 6 }
接下來的channel()方法設置了ServerBootstrap的ChannelFactory,這里傳入的參數是NioServerSocketChannel.class,也就是說這個ReflectiveChannelFactory創建的就是NioServerSocketChannel的實例。
后面的option(),handler()和childHandler()分別是設置Socket連接的參數,設置parentGroup的Handler,設置childGroup的Handler。childHandler()傳入的ChannelInitializer實現了一個initChannel方法,用於初始化Channel的pipeline,以處理請求內容。
之前都是在對ServerBootstrap做設置,接下來的ServerBootstrap.bind()才是啟動的重頭戲。我們繼續按照調用順序往下看。
1 public ChannelFuture bind(int inetPort) { 2 return bind(new InetSocketAddress(inetPort)); 3 } 4 5 /** 6 * Create a new {@link Channel} and bind it. 7 */ 8 public ChannelFuture bind(SocketAddress localAddress) { 9 validate(); 10 if (localAddress == null) { 11 throw new NullPointerException("localAddress"); 12 } 13 return doBind(localAddress); 14 } 15 16 // AbstractBootstrap 17 private ChannelFuture doBind(final SocketAddress localAddress) { 18 final ChannelFuture regFuture = initAndRegister(); 19 final Channel channel = regFuture.channel(); 20 if (regFuture.cause() != null) { 21 return regFuture; 22 } 23 24 if (regFuture.isDone()) { 25 // At this point we know that the registration was complete and successful. 26 ChannelPromise promise = channel.newPromise(); 27 doBind0(regFuture, channel, localAddress, promise); 28 return promise; 29 } else { 30 // Registration future is almost always fulfilled already, but just in case it's not. 31 final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); 32 regFuture.addListener(new ChannelFutureListener() { 33 @Override 34 public void operationComplete(ChannelFuture future) throws Exception { 35 Throwable cause = future.cause(); 36 if (cause != null) { 37 // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an 38 // IllegalStateException once we try to access the EventLoop of the Channel. 39 promise.setFailure(cause); 40 } else { 41 // Registration was successful, so set the correct executor to use. 42 // See https://github.com/netty/netty/issues/2586 43 promise.registered(); 44 45 doBind0(regFuture, channel, localAddress, promise); 46 } 47 } 48 }); 49 return promise; 50 } 51 }
我們可以看到bind()的調用最終調用到了doBind(final SocketAddress),在這里我們看到先調用了initAndRegister()方法進行初始化和register操作。了解JavaNIO框架的同學應該能看出來是在這個方法中將channel注冊到selector中的。最后程序再調用了doBind0()方法進行綁定,先按照順序看initAndRegister方法做了什么操作。
1 // AbstractBootstrap 2 final ChannelFuture initAndRegister() { 3 Channel channel = null; 4 try { 5 channel = channelFactory.newChannel(); 6 init(channel); 7 } catch (Throwable t) { 8 // ... 9 } 10 11 ChannelFuture regFuture = config().group().register(channel); 12 // ... 13 return regFuture; 14 }
為了簡單其間,我忽略了處理異常分支的代碼,同學們有興趣可以自行下載Netty源碼對照。在這里終於看到channel的創建了,調用的是ServerBootstrap的channelFactory,之前的代碼我們也看到了這里的工廠是一個ReflectChannelFactory,在構造函數中傳入的是NioServerSocketChannel.class,所以這里創建的是一個NioServerSocketChannel的對象。接下來init(channel)對channel進行初始化。
1 // ServerBootstrap 2 void init(Channel channel) throws Exception { 3 final Map<ChannelOption<?>, Object> options = options0(); 4 synchronized (options) { 5 channel.config().setOptions(options); 6 } 7 8 // 設置channel.attr 9 final Map<AttributeKey<?>, Object> attrs = attrs0(); 10 synchronized (attrs) { 11 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { 12 @SuppressWarnings("unchecked") 13 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); 14 channel.attr(key).set(e.getValue()); 15 } 16 } 17 18 ChannelPipeline p = channel.pipeline(); 19 20 final EventLoopGroup currentChildGroup = childGroup; 21 // childGroup的handler 22 final ChannelHandler currentChildHandler = childHandler; 23 final Entry<ChannelOption<?>, Object>[] currentChildOptions; 24 final Entry<AttributeKey<?>, Object>[] currentChildAttrs; 25 synchronized (childOptions) { 26 currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); 27 } 28 synchronized (childAttrs) { 29 currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); 30 } 31 // 給channelpipeline添加handler 32 p.addLast(new ChannelInitializer<Channel>() { 33 @Override 34 public void initChannel(Channel ch) throws Exception { 35 final ChannelPipeline pipeline = ch.pipeline(); 36 // group的handler 37 ChannelHandler handler = config.handler(); 38 if (handler != null) { 39 pipeline.addLast(handler); 40 } 41 42 // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. 43 // In this case the initChannel(...) method will only be called after this method returns. Because 44 // of this we need to ensure we add our handler in a delayed fashion so all the users handler are 45 // placed in front of the ServerBootstrapAcceptor. 46 ch.eventLoop().execute(new Runnable() { 47 @Override 48 public void run() { 49 pipeline.addLast(new ServerBootstrapAcceptor( 50 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); 51 } 52 }); 53 } 54 }); 55 }
先是設置了channel的option和attr,然后將handler加入到channelpipleline的handler鏈中,這里大家請特別注意ServerBootstrapAcceptor這個Handler,因為接下來對於客戶端請求的處理以及工作channl的注冊可全是這個Handler處理的。不過由於現在channel還沒有注冊,所以還不會調用initChannel()方法,而是將這個handler對應的context加入到一個任務隊列中,等到channel注冊成功了再執行。關於ChannelPipeline的內容我們以后再說。然后在initAndRegister()方法中調用config().group().register(channel)對channel進行注冊。config().group()獲取到的其實就是bossGroup,在這個例子中就是一個NioEventLoopGroup,由於它繼承了MultithreadEventLoopGroup所以這里調用的其實是這個類的方法。
1 // MultithreadEventLoopGroup 2 public ChannelFuture register(Channel channel) { 3 return next().register(channel); 4 } 5 6 public EventLoop next() { 7 return (EventLoop) super.next(); 8 } 9 10 // SingleThreadEventLoop 11 public ChannelFuture register(Channel channel) { 12 return register(new DefaultChannelPromise(channel, this)); 13 } 14 15 @Override 16 public ChannelFuture register(final ChannelPromise promise) { 17 ObjectUtil.checkNotNull(promise, "promise"); 18 promise.channel().unsafe().register(this, promise); 19 return promise; 20 }
這里會獲取EventLoopGroup中的一個EventLoop,其實我們用的是NioEventLoopGroup所以這里獲取到的其實是NioEventLoop,而NioEventLoop繼承了SingleThreadEventLoop,這里register方法調用的就是SingleThreadEventLoop中的方法。我們重遇來到了channel最終注冊的地方,這里其實是調用了channel的unsafe對象中的register方法,也就是NioServerSocketChannel的方法,這個方法是在AbstractChannel祖先類中實現的,代碼如下:
1 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 2 if (eventLoop == null) { 3 throw new NullPointerException("eventLoop"); 4 } 5 if (isRegistered()) { 6 promise.setFailure(new IllegalStateException("registered to an event loop already")); 7 return; 8 } 9 if (!isCompatible(eventLoop)) { 10 promise.setFailure( 11 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); 12 return; 13 } 14 // 設置eventLoop 15 AbstractChannel.this.eventLoop = eventLoop; 16 // 這里是跟Netty的線程模型有關的,注冊的方法只能在channel的工作線程中執行 17 if (eventLoop.inEventLoop()) { 18 register0(promise); 19 } else { 20 try { 21 eventLoop.execute(new Runnable() { 22 @Override 23 public void run() { 24 register0(promise); 25 } 26 }); 27 } catch (Throwable t) { 28 logger.warn( 29 "Force-closing a channel whose registration task was not accepted by an event loop: {}", 30 AbstractChannel.this, t); 31 closeForcibly(); 32 closeFuture.setClosed(); 33 safeSetFailure(promise, t); 34 } 35 } 36 } 37 38 // AbstractNioChannel 39 protected void doRegister() throws Exception { 40 boolean selected = false; 41 for (;;) { 42 try { 43 selectionKey = javaChannel().register(eventLoop().selector, 0, this); 44 return; 45 } catch (CancelledKeyException e) { 46 // ... 47 } 48 } 49 } 50 51 // AbstractSelectableChannel 52 public final SelectionKey register(Selector sel, int ops,Object att) 53 throws ClosedChannelException 54 { 55 synchronized (regLock) { 56 if (!isOpen()) 57 throw new ClosedChannelException(); 58 if ((ops & ~validOps()) != 0) 59 throw new IllegalArgumentException(); 60 if (blocking) 61 throw new IllegalBlockingModeException(); 62 SelectionKey k = findKey(sel); 63 if (k != null) { 64 k.interestOps(ops); 65 k.attach(att); 66 } 67 if (k == null) { 68 // New registration 69 synchronized (keyLock) { 70 if (!isOpen()) 71 throw new ClosedChannelException(); 72 k = ((AbstractSelector)sel).register(this, ops, att); 73 addKey(k); 74 } 75 } 76 return k; 77 } 78 }
這里先設置了channel的eventLoop屬性,然后在接下來的一段代碼中判斷當前線程是否是channel的處理線程,也就是是不是eventLoop的線程,如果不是那么就將注冊作為一個任務用EventLoop.execute執行。按照這里的執行順序,當前線程肯定不是eventLoop的線程,所以會執行else分支,其實eventLoop的線程也是在這個調用中啟動的。最后的注冊是在AbstractSelectableChannel類的register()方法中執行的。這里有個很奇怪的地方,這里注冊的ops是0,也就是沒有感興趣的事件。這個地方我們后面在分析。
將channel注冊到selector的代碼就是這些了,我們回頭分析EventLoop.execute(…),其實注冊的代碼是在這里面被調用的。
1 // SingleThreadEventExecutor 2 public void execute(Runnable task) { 3 if (task == null) { 4 throw new NullPointerException("task"); 5 } 6 7 boolean inEventLoop = inEventLoop(); 8 if (inEventLoop) { 9 addTask(task); 10 } else { 11 startThread(); 12 addTask(task); 13 if (isShutdown() && removeTask(task)) { 14 reject(); 15 } 16 } 17 18 if (!addTaskWakesUp && wakesUpForTask(task)) { 19 wakeup(inEventLoop); 20 } 21 }
如果當前線程是EventLoop的線程,就把task加到任務隊列中去,如果不是,那么啟動線程,然后再把task加入到任務隊列。
1 // SingleThreadEventLoop 2 private void startThread() { 3 if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { 4 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { 5 doStartThread(); 6 } 7 } 8 } 9 10 private void doStartThread() { 11 assert thread == null; 12 executor.execute(new Runnable() { 13 @Override 14 public void run() { 15 thread = Thread.currentThread(); 16 if (interrupted) { 17 thread.interrupt(); 18 } 19 20 boolean success = false; 21 updateLastExecutionTime(); 22 try { 23 SingleThreadEventExecutor.this.run(); 24 success = true; 25 } catch (Throwable t) { 26 logger.warn("Unexpected exception from an event executor: ", t); 27 } finally { 28 // Some clean work 29 } 30 } 31 }); 32 }
其實最后的線程還是要落到EventLoop中的executor里面,而NioEventLoop初始化的時候executor屬性設置的是一個ThreadPerTaskExecutor,顧名思義也就是每個任務新建一個線程去執行,而在這個Task里面對EventLoop的thread屬性進行了設置,並且最后執行SingleThreadEventExecutor.this.run(),這個run方法在NioEventLoop中實現。
1 protected void run() { 2 for (;;) { 3 try { 4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 5 case SelectStrategy.CONTINUE: 6 continue; 7 case SelectStrategy.SELECT: 8 select(wakenUp.getAndSet(false)); 9 if (wakenUp.get()) { 10 selector.wakeup(); 11 } 12 default: 13 // fallthrough 14 } 15 16 cancelledKeys = 0; 17 needsToSelectAgain = false; 18 final int ioRatio = this.ioRatio; 19 if (ioRatio == 100) { 20 processSelectedKeys(); 21 runAllTasks(); 22 } else { 23 final long ioStartTime = System.nanoTime(); 24 25 processSelectedKeys(); 26 27 final long ioTime = System.nanoTime() - ioStartTime; 28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 29 } 30 31 if (isShuttingDown()) { 32 closeAll(); 33 if (confirmShutdown()) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 logger.warn("Unexpected exception in the selector loop.", t); 39 40 // Prevent possible consecutive immediate failures that lead to 41 // excessive CPU consumption. 42 try { 43 Thread.sleep(1000); 44 } catch (InterruptedException e) { 45 // Ignore. 46 } 47 } 48 } 49 } 50 51 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { 52 if (selectedKeys.isEmpty()) { 53 return; 54 } 55 56 Iterator<SelectionKey> i = selectedKeys.iterator(); 57 for (;;) { 58 final SelectionKey k = i.next(); 59 final Object a = k.attachment(); 60 i.remove(); 61 62 if (a instanceof AbstractNioChannel) { 63 // 處理ServerSocketChannl的事件,如accept 64 processSelectedKey(k, (AbstractNioChannel) a); 65 } else { 66 @SuppressWarnings("unchecked") 67 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 68 processSelectedKey(k, task); 69 } 70 71 if (!i.hasNext()) { 72 break; 73 } 74 75 if (needsToSelectAgain) { 76 selectAgain(); 77 selectedKeys = selector.selectedKeys(); 78 79 // Create the iterator again to avoid ConcurrentModificationException 80 if (selectedKeys.isEmpty()) { 81 break; 82 } else { 83 i = selectedKeys.iterator(); 84 } 85 } 86 } 87 }
這個就是Netty最后的Reactor模式的事件循環了,在這個循環中調用selector的select方法查詢需要處理的key,然后processSelectedKeys方法進行處理。在這里因為之前在注冊NioServerSocketChannel的時候把channel當作attachment當做attachment,所以如果key的attachement是AbstractNioChannel說明這個是ServerSocketChannel的事件,如connect,read,accept。
其實還有一些問題沒有寫清楚,如下:
1. ServerSocketChannel的interestOps的注冊
2. accept請求的處理
3. 線程模型
4. pipeline的鏈式調用
5. buffer
。。。
這些我會繼續寫文章進行說明~(希望可以=_=)
