尊重原創,轉載注明出處,原文地址:http://www.cnblogs.com/cishengchongyan/p/6160194.html
本文我們將先從NioEventLoop開始來學習服務端的處理流程。話不多說,開始學習~~~~
我們從上文中已經知道server在啟動的時候會開啟兩個線程:bossGroup和workerGroup,這兩個線程分別是boss線程池(用於接收client請求)和worker線程池(用於處理具體的讀寫操作),這兩個線程調度器都是NioEventLoopGroup,bossGroup有一個NioEventLoop,而worker線程池有n*cup數量個NioEventLoop。那么我們看看在NioEventLoop中的是如何開始的:
NioEventLoop本質上是一個線程調度器(繼承自ScheduledExecutorService),當bind之后就開始run起一個線程:
(代碼一)
1 @Override 2 protected void run() { 3 for (;;) { 4 boolean oldWakenUp = wakenUp.getAndSet(false); 5 try { 6 if (hasTasks()) { 7 selectNow(); 8 } else { 9 select(oldWakenUp); 10
11 if (wakenUp.get()) { 12 selector.wakeup(); 13 } 14 } 15
16 cancelledKeys = 0; 17 needsToSelectAgain = false; 18 final int ioRatio = this.ioRatio; 19 if (ioRatio == 100) { 20 processSelectedKeys(); 21 runAllTasks(); 22 } else { 23 final long ioStartTime = System.nanoTime(); 24
25 processSelectedKeys(); 26
27 final long ioTime = System.nanoTime() - ioStartTime; 28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 29 } 30
31 if (isShuttingDown()) { 32 closeAll(); 33 if (confirmShutdown()) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 ... 39 } 40 } 41 }
這個for(;;)里面就是boss線程的核心處理流程:
【代碼一主線】1,不斷地監聽selector拿到socket句柄然后創建channel。每次run的時候先拿到wakeup的值,並且set進去false(PS:wakeup是什么鬼?一個AtomicBoolean,代表是否用戶喚醒,如果不人為將其set成true,永遠是false)。
【代碼一主線】2,如果任務隊列中已有任務,那么selectNow(),(PS:selectNow是什么鬼?我們知道selector.select()是一個阻塞調用,而selectNow方法是個非阻塞方法,如果沒有到達的socket句柄則返回0),因此若隊列中已有任務的話應該立即開始執行,而不能阻塞到selector.select()上,否則則調用select()方法,繼續看select()里面:
(代碼二)
1 private void select(boolean oldWakenUp) throws IOException { 2 Selector selector = this.selector; 3 try { 4 int selectCnt = 0; 5 long currentTimeNanos = System.nanoTime(); 6 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 7 for (;;) { 8 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 9 if (timeoutMillis <= 0) { 10 if (selectCnt == 0) { 11 selector.selectNow(); 12 selectCnt = 1; 13 } 14 break; 15 } 16
17 int selectedKeys = selector.select(timeoutMillis); 18 selectCnt ++; 19
20 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { 21 // 如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,則break
22 break; 23 } 24 if (Thread.interrupted()) { 25 //如果線程被中斷則重置selectedKeys,同時break出本次循環,所以不會陷入一個繁忙的循環。
26 selectCnt = 1; 27 break; 28 } 29
30 long time = System.nanoTime(); 31 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { 32 // selector超時
33 selectCnt = 1; 34 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { 35 // selector多次過早返回,重新建立並打開Selector
36 ... 37 } 38
39 currentTimeNanos = time; 40 } 41 ... 42 } catch (CancelledKeyException e) { 43 ... 44 } 45 }
我們看到,select()方法進入一個for循環去select阻塞等待socket(這里的selector的實現在是根據操作系統和netty的版本來定的,在最新的netty中是使用的linux的epoll模型),同時入參里有“超時時間”,如果超過了這個時間仍然沒有socket到來則重新將selectCnt置為1重新循環等待,直到有socket到來。如果selectedKeys不為空、或者被用戶喚醒、或者隊列中有待處理任務、或者調度器中有任務,那么就是說該eventLoop有活干了,先break出去去干活,完了再打開selector重新阻塞等待。正常情況下會等待到一個socket,break出去之后回到代碼一
【代碼一主線】3,根據ioRatio來選擇任務執行策略(PS:ioRatio是什么鬼?看了下用途應該是這樣的,這個ioRatio代表該eventLoop期望在I/O操作上花費時間的比例)。而NioEventLoop中有兩類操作,一類是I/O操作(讀寫之類),調用processSelectedKeys;一類是非I/O操作(例如register等),調用runAllTasks。如果ioRatio是100的話那么會按照順序執行I/O操作->非I/O操作;如果不是會按照這個比例算出一個超時時間,在run任務隊列的時候如果超過了這個時間會立即返回,確保I/O操作可以得到及時的調用。
我們關心的是I/O操作,那么進入processSelectedKeys()看下發生了什么吧。
(代碼三)
1 private void processSelectedKeys() { 2 if (selectedKeys != null) { 3 processSelectedKeysOptimized(selectedKeys.flip()); 4 } else { 5 processSelectedKeysPlain(selector.selectedKeys()); 6 } 7 }
正常情況下會走到processSelectedKeysOptimized中:
(代碼四)
1 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2 for (int i = 0;; i ++) { 3 final SelectionKey k = selectedKeys[i]; 4 if (k == null) { 5 break; 6 } 7 selectedKeys[i] = null; 8
9 final Object a = k.attachment(); 10
11 if (a instanceof AbstractNioChannel) { 12 processSelectedKey(k, (AbstractNioChannel) a); 13 } else { 14 @SuppressWarnings("unchecked") 15 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 16 processSelectedKey(k, task); 17 } 18
19 if (needsToSelectAgain) { 20 for (;;) { 21 if (selectedKeys[i] == null) { 22 break; 23 } 24 selectedKeys[i] = null; 25 i++; 26 } 27
28 selectAgain(); 29 selectedKeys = this.selectedKeys.flip(); 30 i = -1; 31 } 32 } 33 }
遍歷拿到所有的SelectionKey,然后判斷每個SelectionKey的attachment,上篇文章中已經分析過給ServerBootstrap注冊的Channel是NioServerSocketChannel(繼承自AbstractNioChannel),因此進入processSelectedKey中:
(代碼五)
1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2 final NioUnsafe unsafe = ch.unsafe(); 3 if (!k.isValid()) { 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7
8 try { 9 int readyOps = k.readyOps(); 10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 11 unsafe.read(); 12 if (!ch.isOpen()) { 13 return; 14 } 15 } 16 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 17 ch.unsafe().forceFlush(); 18 } 19 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 20 int ops = k.interestOps(); 21 ops &= ~SelectionKey.OP_CONNECT; 22 k.interestOps(ops); 23
24 unsafe.finishConnect(); 25 } 26 } catch (CancelledKeyException ignored) { 27 unsafe.close(unsafe.voidPromise()); 28 } 29 }
在這里根據傳入的SelectionKey的已就緒操作類型來決定下一步的操作,如果是一個讀操作,那么進入AbstractNioMessageChannel$NioMessageUnsafe的read實現,這里代碼很多,我們只貼一下核心的代碼:
(代碼六)
1 @Override 2 public void read() { 3 ... 4 final ChannelPipeline pipeline = pipeline(); 5 ... 6 try { 7 int size = readBuf.size(); 8 for (int i = 0; i < size; i ++) { 9 pipeline.fireChannelRead(readBuf.get(i)); 10 } 11 ... 12 readBuf.clear(); 13 pipeline.fireChannelReadComplete(); 14 } finally { 15 } 16 }
核心就是這個pipeline.fireChannelRead(readBuf.get(i));,這已經到了pipeline階段,可能有些人會誤以為這是不是已經到了worker線程中,但是不可能啊,我們的代碼其實在處於processSelectedKeys的邏輯里面。實際上,不論是boss還是worker,他們都是NioEventLoopGroup,玩法都是一樣的,只不過職責不一樣而已。boss也有自己的handler,上篇文章中我們提到了netty中的reactor模式的玩法,從Doug Lea的圖中可以看出,boss(實際上就是mainReactor)的handler其實就是這個acceptor。
在此我們順便學習一下netty中的handler:

從用途上來說,handler分為ChannelInboundHandler(讀)和ChannelOutboundHandler(寫),增加一層適配器產生了兩handler的Adapter,我們使用到的類都是繼承自這兩個Adapter。我們經常用到的SimpleChannelInboundHandler就繼承ChannelInboundHandlerAdapter,用於初始化用戶handler鏈的ChannelInitializer和boss線程綁定的ServerBootstrapAcceptor也都繼承於此。
回到【代碼六主線】我們從pipeline.fireChannelRead繼續追蹤下去會追到ChannelInboundHandler的channelRead的實現,而這里的Hander就是ServerBootstrapAcceptor。
(代碼七)
1 @Override 2 @SuppressWarnings("unchecked") 3 public void channelRead(ChannelHandlerContext ctx, Object msg) { 4 final Channel child = (Channel) msg; 5
6 child.pipeline().addLast(childHandler); 7
8 for (Entry<ChannelOption<?>, Object> e: childOptions) { 9 try { 10 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { 11 } 12 } catch (Throwable t) { 13 } 14 } 15
16 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 17 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 18 } 19
20 try { 21 childGroup.register(child).addListener(new ChannelFutureListener() { 22 @Override 23 public void operationComplete(ChannelFuture future) throws Exception { 24 if (!future.isSuccess()) { 25 forceClose(child, future.cause()); 26 } 27 } 28 }); 29 } catch (Throwable t) { 30 forceClose(child, t); 31 } 32 }
由於ServerBootstrapAcceptor 很重要,我們先看一下都有什么內容:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; }
我自己的理解:
childGroup就是subReactor(也就是worker線程);childHandler就是xxx;childOptions和childAttrs是為channel准備的一些參數。
回到【代碼七主線】在這里做了3件事:
1.為客戶端channel的pipeline中添加childHandler,那么這個childHandler是什么鬼呢?回憶一下上文中的服務端啟動代碼,有bootStrap.childHandler(xxx)這樣的代碼,所以此處就是把在服務端啟動時我們定義好的Handler鏈綁定給每個channel。
2.把我們服務端初始化時的參數綁定到每個channel中。
3.childGroup.register(child).addListener(new ChannelFutureListener()),后面這個異步listener作用很明確,問題是這個childGroup是什么鬼?我理解應該就是worker線程了。詳細說一下childGroup.register(child),繼續跟下去,跟到AbstractChannel$AbstractUnsafe中
(代碼八)
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 ... 4 AbstractChannel.this.eventLoop = eventLoop; 5
6 if (eventLoop.inEventLoop()) { 7 register0(promise); 8 } else { 9 ... 10 } catch (Throwable t) { 11 } 12 } 13 }
繼續register0:
(代碼九)
1 private void register0(ChannelPromise promise) { 2 try { 3 if (!promise.setUncancellable() || !ensureOpen(promise)) { 4 return; 5 } 6 boolean firstRegistration = neverRegistered; 7 doRegister(); 8 neverRegistered = false; 9 registered = true; 10 safeSetSuccess(promise); 11 pipeline.fireChannelRegistered(); 12 if (firstRegistration && isActive()) { 13 pipeline.fireChannelActive(); 14 } 15 } catch (Throwable t) { 16 } 17 }
這里核心有兩步:
1.doRegister(),其實我們在上篇文章中分析過,就是將channel綁定到selector上。此處有點懵逼,我猜測是綁定到worker線程的selector中,如果有大神知道請留言我的微博。
2.pipeline.fireChannelRegistered(),繼續往下跟跟進到ChannelInboundHandler的channelRegistered方法中,而此時會調用我們定義的ChannelInitializer,將我們定義的handler注冊到pipeline中。
至此【代碼一主線】執行完畢,我們瀏覽了一遍boss線程的在接收socket請求期間的處理流程,過程中是結合reactor模式去理解的,有些地方自己也有點不懂,還請各位指正。
總結一下:
1.boss線程就是個loop循環,打開selector -> 獲得監聽到的SelectionKey -> 處理I/O請求 -> 處理非I/O請求,而我們最關心的就是處理I/O請求(在processSelectedKeys()方法中完成)。
2.遍歷准備就緒的SelectionKey,根據其可操作類型(read or write。。)來決定下一步的具體操作,我們着重去了解了read邏輯。
3.NioServerSocketChannel調用父類AbstractNioMessageChannel的unsafe類NioMessageUnsafe來處理讀取邏輯:調用pipeline處理readbuf。
4.pipeline.fireChannelRead會調用ServerBootstrapAcceptor的channelRead:初始化客戶端channel參數,將該channel綁定到worker線程的selector中,為channel注冊用戶定義的handler鏈。
再精煉一點:
boss線程只是接收客戶端socket並初始化客戶端channle,將channel丟給acceptor,acceptor會將這個channel注冊到worker線程中。整個loop過程都是一個非阻塞過程(全部異步化),同時boss中不會做耗時的I/O讀取,只是將channel丟給worker。因此是一個高效的loop過程。
下文中我們將分析worker線程的處理流程,敬請期待。。。
