Netty源碼閱讀(一) ServerBootstrap啟動


Netty源碼閱讀(一) ServerBootstrap啟動

轉自我的Github

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。本文講會對Netty服務啟動的過程進行分析,主要關注啟動的調用過程,從這里面進一步理解Netty的線程模型,以及Reactor模式。

netty.jpg

這是我畫的一個Netty啟動過程中使用到的主要的類的概要類圖,當然是用到的類比這個多得多,而且我也忽略了各個類的繼承關系,關於各個類的細節,可能以后會寫單獨的博客進行分析。在這里主要注意那么幾個地方:

1. ChannelPromise關聯了Channel和Executor,當然channel中也會有EventLoop的實例。
2. 每個channel有自己的pipeline實例。
3. 每個NioEventLoop中有自己的Executor實例和Selector實例。 

網絡請求在NioEventLoop中進行處理,當然accept事件也是如此,它會把接收到的channel注冊到一個EventLoop的selector中,以后這個channel的所有請求都由所注冊的EventLoop進行處理,這也是Netty用來處理競態關系的機制,即一個channel的所有請求都在一個線程中進行處理,也就不會存在跨線程的沖突,因為這些調用都線程隔離了。

下面我們先看一段Netty源碼里面帶的example代碼,直觀感受一下Netty的使用:

 1         // Configure the server.
 2         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
 3         EventLoopGroup workerGroup = new NioEventLoopGroup();
 4         try {
 5             ServerBootstrap b = new ServerBootstrap();
 6             b.group(bossGroup, workerGroup)
 7              .channel(NioServerSocketChannel.class)
 8              .option(ChannelOption.SO_BACKLOG, 100) // 設置tcp協議的請求等待隊列
 9              .handler(new LoggingHandler(LogLevel.INFO))
10              .childHandler(new ChannelInitializer<SocketChannel>() {
11                  @Override
12                  public void initChannel(SocketChannel ch) throws Exception {
13                      ChannelPipeline p = ch.pipeline();
14                      if (sslCtx != null) {
15                          p.addLast(sslCtx.newHandler(ch.alloc()));
16                      }
17                      p.addLast(new EchoServerHandler());
18                  }
19              });
20 
21             // Start the server.
22             ChannelFuture f = b.bind(PORT).sync();
23 
24             // Wait until the server socket is closed.
25             f.channel().closeFuture().sync();
26         } finally {
27             // Shut down all event loops to terminate all threads.
28             bossGroup.shutdownGracefully();
29             workerGroup.shutdownGracefully();
30         }

 

首先我們先來了解Netty的主要類:

EventLoop 這個相當於一個處理線程,是Netty接收請求和處理IO請求的線程。

EventLoopGroup 可以理解為將多個EventLoop進行分組管理的一個類,是EventLoop的一個組。

ServerBootstrap 從命名上看就可以知道,這是一個對服務端做配置和啟動的類。

ChannelPipeline 這是Netty處理請求的責任鏈,這是一個ChannelHandler的鏈表,而ChannelHandler就是用來處理網絡請求的內容的。

ChannelHandler 用來處理網絡請求內容,有ChannelInboundHandler和ChannelOutboundHandler兩種,ChannlPipeline會從頭到尾順序調用ChannelInboundHandler處理網絡請求內容,從尾到頭調用ChannelOutboundHandler處理網絡請求內容。這也是Netty用來靈活處理網絡請求的機制之一,因為使用的時候可以用多個decoder和encoder進行組合,從而適應不同的網絡協議。而且這種類似分層的方式可以讓每一個Handler專注於處理自己的任務而不用管上下游,這也是pipeline機制的特點。這跟TCP/IP協議中的五層和七層的分層機制有異曲同工之妙。

現在看上面的代碼,首先創建了兩個EventLoopGroup對象,作為group設置到ServerBootstrap中,然后設置Handler和ChildHandler,最后調用bind()方法啟動服務。下面按照Bootstrap啟動順序來看代碼。

 1     public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
 2         super.group(parentGroup);
 3         if (childGroup == null) {
 4             throw new NullPointerException("childGroup");
 5         }
 6         if (this.childGroup != null) {
 7             throw new IllegalStateException("childGroup set already");
 8         }
 9         this.childGroup = childGroup;
10         return this;
11     }

 

首先是設置EverLoopGroup,parentGroup一般用來接收accpt請求,childGroup用來處理各個連接的請求。不過根據開發的不同需求也可以用同一個group同時作為parentGroup和childGroup同時處理accpt請求和其他io請求。

1     public B channel(Class<? extends C> channelClass) {
2         if (channelClass == null) {
3             throw new NullPointerException("channelClass");
4         }
5         return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
6     }

 

接下來的channel()方法設置了ServerBootstrap的ChannelFactory,這里傳入的參數是NioServerSocketChannel.class,也就是說這個ReflectiveChannelFactory創建的就是NioServerSocketChannel的實例。

后面的option(),handler()和childHandler()分別是設置Socket連接的參數,設置parentGroup的Handler,設置childGroup的Handler。childHandler()傳入的ChannelInitializer實現了一個initChannel方法,用於初始化Channel的pipeline,以處理請求內容。

之前都是在對ServerBootstrap做設置,接下來的ServerBootstrap.bind()才是啟動的重頭戲。我們繼續按照調用順序往下看。

 1     public ChannelFuture bind(int inetPort) {
 2         return bind(new InetSocketAddress(inetPort));
 3     }
 4 
 5       /**
 6      * Create a new {@link Channel} and bind it.
 7      */
 8     public ChannelFuture bind(SocketAddress localAddress) {
 9         validate();
10         if (localAddress == null) {
11             throw new NullPointerException("localAddress");
12         }
13         return doBind(localAddress);
14     }
15 
16     // AbstractBootstrap
17       private ChannelFuture doBind(final SocketAddress localAddress) {
18         final ChannelFuture regFuture = initAndRegister();
19         final Channel channel = regFuture.channel();
20         if (regFuture.cause() != null) {
21             return regFuture;
22         }
23 
24         if (regFuture.isDone()) {
25             // At this point we know that the registration was complete and successful.
26             ChannelPromise promise = channel.newPromise();
27             doBind0(regFuture, channel, localAddress, promise);
28             return promise;
29         } else {
30             // Registration future is almost always fulfilled already, but just in case it's not.
31             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
32             regFuture.addListener(new ChannelFutureListener() {
33                 @Override
34                 public void operationComplete(ChannelFuture future) throws Exception {
35                     Throwable cause = future.cause();
36                     if (cause != null) {
37                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
38                         // IllegalStateException once we try to access the EventLoop of the Channel.
39                         promise.setFailure(cause);
40                     } else {
41                         // Registration was successful, so set the correct executor to use.
42                         // See https://github.com/netty/netty/issues/2586
43                         promise.registered();
44 
45                         doBind0(regFuture, channel, localAddress, promise);
46                     }
47                 }
48             });
49             return promise;
50         }
51     }

 

我們可以看到bind()的調用最終調用到了doBind(final SocketAddress),在這里我們看到先調用了initAndRegister()方法進行初始化和register操作。了解JavaNIO框架的同學應該能看出來是在這個方法中將channel注冊到selector中的。最后程序再調用了doBind0()方法進行綁定,先按照順序看initAndRegister方法做了什么操作。

 1     // AbstractBootstrap
 2     final ChannelFuture initAndRegister() {
 3         Channel channel = null;
 4         try {
 5             channel = channelFactory.newChannel();
 6             init(channel);
 7         } catch (Throwable t) {
 8           // ...
 9         }
10 
11         ChannelFuture regFuture = config().group().register(channel);
12         // ...
13         return regFuture;
14     }

為了簡單其間,我忽略了處理異常分支的代碼,同學們有興趣可以自行下載Netty源碼對照。在這里終於看到channel的創建了,調用的是ServerBootstrap的channelFactory,之前的代碼我們也看到了這里的工廠是一個ReflectChannelFactory,在構造函數中傳入的是NioServerSocketChannel.class,所以這里創建的是一個NioServerSocketChannel的對象。接下來init(channel)對channel進行初始化。

 1     // ServerBootstrap
 2     void init(Channel channel) throws Exception {
 3         final Map<ChannelOption<?>, Object> options = options0();
 4         synchronized (options) {
 5             channel.config().setOptions(options);
 6         }
 7         
 8         // 設置channel.attr
 9         final Map<AttributeKey<?>, Object> attrs = attrs0();
10         synchronized (attrs) {
11             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
12                 @SuppressWarnings("unchecked")
13                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
14                 channel.attr(key).set(e.getValue());
15             }
16         }
17 
18         ChannelPipeline p = channel.pipeline();
19 
20         final EventLoopGroup currentChildGroup = childGroup;
21         // childGroup的handler
22         final ChannelHandler currentChildHandler = childHandler;
23         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
24         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
25         synchronized (childOptions) {
26             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
27         }
28         synchronized (childAttrs) {
29             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
30         }
31         // 給channelpipeline添加handler
32         p.addLast(new ChannelInitializer<Channel>() {
33             @Override
34             public void initChannel(Channel ch) throws Exception {
35                 final ChannelPipeline pipeline = ch.pipeline();
36                 // group的handler
37                 ChannelHandler handler = config.handler();
38                 if (handler != null) {
39                     pipeline.addLast(handler);
40                 }
41 
42                 // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
43                 // In this case the initChannel(...) method will only be called after this method returns. Because
44                 // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
45                 // placed in front of the ServerBootstrapAcceptor.
46                 ch.eventLoop().execute(new Runnable() {
47                     @Override
48                     public void run() {
49                         pipeline.addLast(new ServerBootstrapAcceptor(
50                                 currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
51                     }
52                 });
53             }
54         });
55     }

先是設置了channel的option和attr,然后將handler加入到channelpipleline的handler鏈中,這里大家請特別注意ServerBootstrapAcceptor這個Handler,因為接下來對於客戶端請求的處理以及工作channl的注冊可全是這個Handler處理的。不過由於現在channel還沒有注冊,所以還不會調用initChannel()方法,而是將這個handler對應的context加入到一個任務隊列中,等到channel注冊成功了再執行。關於ChannelPipeline的內容我們以后再說。然后在initAndRegister()方法中調用config().group().register(channel)對channel進行注冊。config().group()獲取到的其實就是bossGroup,在這個例子中就是一個NioEventLoopGroup,由於它繼承了MultithreadEventLoopGroup所以這里調用的其實是這個類的方法。

 1     // MultithreadEventLoopGroup
 2     public ChannelFuture register(Channel channel) {
 3         return next().register(channel);
 4     }
 5 
 6     public EventLoop next() {
 7         return (EventLoop) super.next();
 8     }
 9 
10     // SingleThreadEventLoop
11     public ChannelFuture register(Channel channel) {
12         return register(new DefaultChannelPromise(channel, this));
13     }
14 
15     @Override
16     public ChannelFuture register(final ChannelPromise promise) {
17         ObjectUtil.checkNotNull(promise, "promise");
18         promise.channel().unsafe().register(this, promise);
19         return promise;
20     }

這里會獲取EventLoopGroup中的一個EventLoop,其實我們用的是NioEventLoopGroup所以這里獲取到的其實是NioEventLoop,而NioEventLoop繼承了SingleThreadEventLoop,這里register方法調用的就是SingleThreadEventLoop中的方法。我們重遇來到了channel最終注冊的地方,這里其實是調用了channel的unsafe對象中的register方法,也就是NioServerSocketChannel的方法,這個方法是在AbstractChannel祖先類中實現的,代碼如下:

 1          public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 2             if (eventLoop == null) {
 3                 throw new NullPointerException("eventLoop");
 4             }
 5             if (isRegistered()) {
 6                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 7                 return;
 8             }
 9             if (!isCompatible(eventLoop)) {
10                 promise.setFailure(
11                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
12                 return;
13             }
14             // 設置eventLoop
15             AbstractChannel.this.eventLoop = eventLoop;
16             // 這里是跟Netty的線程模型有關的,注冊的方法只能在channel的工作線程中執行
17             if (eventLoop.inEventLoop()) {
18                 register0(promise);
19             } else {
20                 try {
21                     eventLoop.execute(new Runnable() {
22                         @Override
23                         public void run() {
24                             register0(promise);
25                         }
26                     });
27                 } catch (Throwable t) {
28                     logger.warn(
29                             "Force-closing a channel whose registration task was not accepted by an event loop: {}",
30                             AbstractChannel.this, t);
31                     closeForcibly();
32                     closeFuture.setClosed();
33                     safeSetFailure(promise, t);
34                 }
35             }
36         }
37       
38       // AbstractNioChannel
39       protected void doRegister() throws Exception {
40         boolean selected = false;
41         for (;;) {
42             try {
43                 selectionKey = javaChannel().register(eventLoop().selector, 0, this);
44                 return;
45             } catch (CancelledKeyException e) {
46               // ...
47             }
48         }
49       }
50     
51     // AbstractSelectableChannel
52     public final SelectionKey register(Selector sel, int ops,Object att)
53         throws ClosedChannelException
54     {
55         synchronized (regLock) {
56             if (!isOpen())
57                 throw new ClosedChannelException();
58             if ((ops & ~validOps()) != 0)
59                 throw new IllegalArgumentException();
60             if (blocking)
61                 throw new IllegalBlockingModeException();
62             SelectionKey k = findKey(sel);
63             if (k != null) {
64                 k.interestOps(ops);
65                 k.attach(att);
66             }
67             if (k == null) {
68                 // New registration
69                 synchronized (keyLock) {
70                     if (!isOpen())
71                         throw new ClosedChannelException();
72                     k = ((AbstractSelector)sel).register(this, ops, att);
73                     addKey(k);
74                 }
75             }
76             return k;
77         }
78     }

這里先設置了channel的eventLoop屬性,然后在接下來的一段代碼中判斷當前線程是否是channel的處理線程,也就是是不是eventLoop的線程,如果不是那么就將注冊作為一個任務用EventLoop.execute執行。按照這里的執行順序,當前線程肯定不是eventLoop的線程,所以會執行else分支,其實eventLoop的線程也是在這個調用中啟動的。最后的注冊是在AbstractSelectableChannel類的register()方法中執行的。這里有個很奇怪的地方,這里注冊的ops是0,也就是沒有感興趣的事件。這個地方我們后面在分析。

將channel注冊到selector的代碼就是這些了,我們回頭分析EventLoop.execute(…),其實注冊的代碼是在這里面被調用的。

 1     // SingleThreadEventExecutor
 2     public void execute(Runnable task) {
 3         if (task == null) {
 4             throw new NullPointerException("task");
 5         }
 6 
 7         boolean inEventLoop = inEventLoop();
 8         if (inEventLoop) {
 9             addTask(task);
10         } else {
11             startThread();
12             addTask(task);
13             if (isShutdown() && removeTask(task)) {
14                 reject();
15             }
16         }
17 
18         if (!addTaskWakesUp && wakesUpForTask(task)) {
19             wakeup(inEventLoop);
20         }
21     }

如果當前線程是EventLoop的線程,就把task加到任務隊列中去,如果不是,那么啟動線程,然后再把task加入到任務隊列。

 1     // SingleThreadEventLoop
 2     private void startThread() {
 3         if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
 4             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
 5                 doStartThread();
 6             }
 7         }
 8     }
 9 
10     private void doStartThread() {
11         assert thread == null;
12         executor.execute(new Runnable() {
13             @Override
14             public void run() {
15                 thread = Thread.currentThread();
16                 if (interrupted) {
17                     thread.interrupt();
18                 }
19 
20                 boolean success = false;
21                 updateLastExecutionTime();
22                 try {
23                     SingleThreadEventExecutor.this.run();
24                     success = true;
25                 } catch (Throwable t) {
26                     logger.warn("Unexpected exception from an event executor: ", t);
27                 } finally {
28                     // Some clean work
29                 }
30             }
31         });
32     }

其實最后的線程還是要落到EventLoop中的executor里面,而NioEventLoop初始化的時候executor屬性設置的是一個ThreadPerTaskExecutor,顧名思義也就是每個任務新建一個線程去執行,而在這個Task里面對EventLoop的thread屬性進行了設置,並且最后執行SingleThreadEventExecutor.this.run(),這個run方法在NioEventLoop中實現。

 1  protected void run() {
 2         for (;;) {
 3             try {
 4                 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
 5                     case SelectStrategy.CONTINUE:
 6                         continue;
 7                     case SelectStrategy.SELECT:
 8                         select(wakenUp.getAndSet(false));
 9                         if (wakenUp.get()) {
10                             selector.wakeup();
11                         }
12                     default:
13                         // fallthrough
14                 }
15 
16                 cancelledKeys = 0;
17                 needsToSelectAgain = false;
18                 final int ioRatio = this.ioRatio;
19                 if (ioRatio == 100) {
20                     processSelectedKeys();
21                     runAllTasks();
22                 } else {
23                     final long ioStartTime = System.nanoTime();
24 
25                     processSelectedKeys();
26 
27                     final long ioTime = System.nanoTime() - ioStartTime;
28                     runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
29                 }
30 
31                 if (isShuttingDown()) {
32                     closeAll();
33                     if (confirmShutdown()) {
34                         break;
35                     }
36                 }
37             } catch (Throwable t) {
38                 logger.warn("Unexpected exception in the selector loop.", t);
39 
40                 // Prevent possible consecutive immediate failures that lead to
41                 // excessive CPU consumption.
42                 try {
43                     Thread.sleep(1000);
44                 } catch (InterruptedException e) {
45                     // Ignore.
46                 }
47             }
48         }
49     }
50     
51       private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
52         if (selectedKeys.isEmpty()) {
53             return;
54         }
55 
56         Iterator<SelectionKey> i = selectedKeys.iterator();
57         for (;;) {
58             final SelectionKey k = i.next();
59             final Object a = k.attachment();
60             i.remove();
61 
62             if (a instanceof AbstractNioChannel) {
63                 // 處理ServerSocketChannl的事件,如accept
64                 processSelectedKey(k, (AbstractNioChannel) a);
65             } else {
66                 @SuppressWarnings("unchecked")
67                 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
68                 processSelectedKey(k, task);
69             }
70 
71             if (!i.hasNext()) {
72                 break;
73             }
74 
75             if (needsToSelectAgain) {
76                 selectAgain();
77                 selectedKeys = selector.selectedKeys();
78 
79                 // Create the iterator again to avoid ConcurrentModificationException
80                 if (selectedKeys.isEmpty()) {
81                     break;
82                 } else {
83                     i = selectedKeys.iterator();
84                 }
85             }
86         }
87     }

這個就是Netty最后的Reactor模式的事件循環了,在這個循環中調用selector的select方法查詢需要處理的key,然后processSelectedKeys方法進行處理。在這里因為之前在注冊NioServerSocketChannel的時候把channel當作attachment當做attachment,所以如果key的attachement是AbstractNioChannel說明這個是ServerSocketChannel的事件,如connect,read,accept。

其實還有一些問題沒有寫清楚,如下:

1. ServerSocketChannel的interestOps的注冊
2. accept請求的處理
3. 線程模型
4. pipeline的鏈式調用
5. buffer 
。。。
這些我會繼續寫文章進行說明~(希望可以=_=)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM