原文:http://budairenqin.iteye.com/blog/2215896
源碼來自Netty5.x版本, 本系列文章不打算從架構的角度去討論netty, 只想從源碼細節展開, 又不想通篇的貼代碼, 如果沒有太大的必要, 我會盡量避免貼代碼或是去掉不影響主流程邏輯的代碼, 盡量多用語言描述. 這個過程中我會把我看到的netty對代碼進行優化的一些細節提出來探討, 大家共同學習, 更希望能拋磚引玉.
java nio api細節這里不會討論, 不過推薦一個非常好入門系列 http://ifeve.com/overview/
先從一個簡單的代碼示例開始
服務端啟動代碼示例
// Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
在看這個示例之前, 先拋出netty中幾個重要組件以及他們之間的簡單關系, 方便理解后續的代碼展開.
1.EventLoopGroup
2.EventLoop
3.boss/worker
4.channel
5.event(inbound/outbound)
6.pipeline
7.handler
--------------------------------------------------------------------
1.EventLoopGroup中包含一組EventLoop
2.EventLoop的大致數據結構是
a.一個任務隊列
b.一個延遲任務隊列(schedule)
c.EventLoop綁定了一個Thread, 這直接避免了pipeline中的線程競爭(在這里更正一下4.1.x以及5.x由於引入了FJP[4.1.x現在又去掉了FJP], 線程模型已經有所變化, EventLoop.run()可能被不同的線程執行,但大多數scheduler(包括FJP)在EventLoop這種方式的使用下都能保證在handler中不會"可見性(visibility)"問題, 所以為了理解簡單, 我們仍可以理解為為EventLoop綁定了一個Thread)
d.每個EventLoop有一個Selector, boss用Selector處理accept, worker用Selector處理read,write等
3.boss可簡單理解為Reactor模式中的mainReactor的角色, worker可簡單理解為subReactor的角色
a.boss和worker共用EventLoop的代碼邏輯
b.在不bind多端口的情況下bossEventLoopGroup中只需要包含一個EventLoop
c.workerEventLoopGroup中一般包含多個EventLoop
d.netty server啟動后會把一個監聽套接字ServerSocketChannel注冊到bossEventLoop中
e.通過上一點我們知道bossEventLoop一個主要責任就是負責accept連接(channel)然后dispatch到worker
f.worker接到boss爺賞的channel后負責處理此chanel后續的read,write等event
4.channel分兩大類ServerChannel和channel, ServerChannel對應着監聽套接字(ServerSocketChannel), channel對應着一個網絡連接
5.有兩大類event:inbound/outbound(上行/下行)
6.event按照一定順序在pipeline里面流轉, 流轉順序參見下圖
7.pipeline里面有多個handler, 每個handler節點過濾在pipeline中流轉的event, 如果判定需要自己處理這個event,則處理(用戶可以在pipeline中添加自己的handler)
IO線程組的創建:NioEventLoopGroup
構造方法:
public NioEventLoopGroup(int nEventLoops, Executor executor, final SelectorProvider selectorProvider) { super(nEventLoops, executor, selectorProvider); }
nEventLoops:
Group內EventLoop個數, 每個EventLoop都綁定一個線程, 默認值為cpu cores * 2, 對worker來說, 這是一個經驗值, 當然如果worker完全是在處理cpu密集型任務也可以設置成 cores + 1 或者是根據自己場景測試出來的最優值.
一般boss group這個參數設置為1就可以了, 除非需要bind多個端口.
boss和worker的關系可以參考Reactor模式,網上有很多資料.簡單的理解就是:boss負責accept連接然后將連接轉交給worker, worker負責處理read,write等
executor:
Netty 4.1.x版本以及5.x版本采用Doug Lea在jsr166中的ForkJoinPool作為默認的executor, 每個EventLoop在一次run方法調用的生命周期內都是綁定在fjp中一個Thread身上(EventLoop父類SingleThreadEventExecutor中的thread實例變量)
目前netty由於線程模型的關系並沒有利用fjp的work−stealing, 關於fjp可參考這個paper http://gee.cs.oswego.edu/dl/papers/fj.pdf
selectorProvider:
group內每一個EventLoop都要持有一個selector, 就由它提供了
上面反復提到過每個EventLoop都綁定了一個Thread(可以這么理解,但5.x中實際不是這樣子), 這是netty4.x以及5.x版本相對於3.x版本最大變化之一, 這個改變從根本上避免了outBound/downStream事件在pipeline中的線程競爭
父類構造方法:
private MultithreadEventExecutorGroup(int nEventExecutors, Executor executor, boolean shutdownExecutor, Object... args) { // ...... if (executor == null) { executor = newDefaultExecutorService(nEventExecutors); // 默認fjp shutdownExecutor = true; } children = new EventExecutor[nEventExecutors]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nEventExecutors; i++) { boolean success = false; try { children[i] = newChild(executor, args); // child即EventLoop success = true; } catch (Exception e) { // ...... } finally { if (!success) { // 失敗處理...... } } } // ...... }
同時參考《http://www.cnblogs.com/guazi/p/6612375.html》中關於這一塊的內容。
1.如果之前沒有指定executor默認為fjp, fjp的parallelism值即為nEventExecutors
executor(scheduler)可以由用戶指定, 這給了第三方很大的自由度, 總會有高級用戶想完全的控制scheduler, 比如Twitter的Finagle. https://github.com/netty/netty/issues/2250
2.接下來創建children數組, 即EventLoop[],現在可以知道 EventLoop與EventLoopGroup的關系了.
3.后面會講到boss把一個就緒的連接轉交給worker時會從children中取模拿出一個EventLoop然后將連接交給它.
值得注意的是由於這段代碼是熱點代碼, 作為"優化狂魔"netty團隊豈會放過這種優化細節? 如果children個數為2的n次方, 會采用和HashMap同樣的優化方式[位操作]來代替取模操作:
children[childIndex.getAndIncrement() & children.length - 1]
4.接下來的newChild()是構造EventLoop, 下面會詳細展開
接下來我們分析NioEventLoop
PS:Netty 4.0.16版本開始由Norman Maurer提供了EpollEventLoop, 基於Linux Epoll ET實現的JNI(java nio基於Epoll LT)Edge Triggered(ET) VS Level Triggered(LT)http://linux.die.net/man/7/epoll.這在一定程度上提供了更高效的傳輸層, 同時也減少了java層的gc, 這里不詳細展開了, 感興趣的可看這里 Native transport for Linux wikihttp://netty.io/wiki/native-transports.html
NioEventLoop
接上面的newchild() protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0]); } 構造方法: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) { super(parent, executor, false); // ...... provider = selectorProvider; selector = openSelector(); } 父類構造方法: protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { super(parent); // ...... this.addTaskWakesUp = addTaskWakesUp; this.executor = executor; taskQueue = newTaskQueue(); }
1.我們看到首先是打開一個selector, selector的優化細節我們下面會講到
2.接着在父類中會構造一個task queue, 這是一個lock-free的MPSC隊列, netty的線程(比如worker)一直在一個死循環狀態中(引入fjp后是不斷自己調度自己)去執行IO事件和非IO事件.
除了IO事件, 非IO事件都是先丟到這個MPSC隊列再由worker線程去異步執行.
MPSC即multi-producer single-consumer(多生產者, 單消費者) 完美貼合netty的IO線程模型(消費者就是EventLoop自己咯), 情不自禁再給"優化狂魔"點32個贊.
跑題一下:
對lock-free隊列感興趣可以仔細看看MpscLinkedQueue的代碼, 其中一些比如為了避免偽共享的long padding優化也是比較有意思的.
如果還對類似並發隊列感興趣的話請轉戰這里 https://github.com/JCTools/JCTools
另外報個八卦料曾經也有人提出在這里引入disruptor后來不了了之, 相信用disruptor也會很有趣 https://github.com/netty/netty/issues/447
接下來展開openSelector()詳細分析
private Selector openSelector() { final Selector selector; try { selector = provider.openSelector(); } catch (IOException ignored) {} if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } try { SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Class<?> selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); // Ensure the current selector implementation is what we can instrument. if (!selectorImplClass.isAssignableFrom(selector.getClass())) { return selector; } Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); selectedKeysField.setAccessible(true); publicSelectedKeysField.setAccessible(true); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); selectedKeys = selectedKeySet; logger.trace("Instrumented an optimized java.util.Set into: {}", selector); } catch (Throwable t) { selectedKeys = null; logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t); } return selector; }
1.首先openSelector, 這是jdk的api就不詳細展開了
2.接着DISABLE_KEYSET_OPTIMIZATION是判斷是否需要對sun.nio.ch.SelectorImpl中的selectedKeys進行優化, 不做配置的話默認需要優化.
3.哪些優化呢?原來SelectorImpl中的selectedKeys和publicSelectedKeys是個HashSet, 新的數據結構是雙數組A和B, 初始大小1024, 避免了HashSet的頻繁自動擴容,
processSelectedKeys時先使用數組A,再一次processSelectedKeys時調用flip的切換到數組B, 如此反復
另外我大膽胡說一下我個人對這個優化的理解, 如果對於這個優化只是看到避免了HashSet的自動擴容, 我還是認為這有點小看了"優化狂魔"們, 我們知道HashSet用拉鏈法解決哈希沖突, 也就是說它的數據結構是數組+鏈表,
而我們又知道, 對於selectedKeys, 最重要的操作是遍歷全部元素, 但是數組+鏈表的數據結構對於cpu的 cache line 來說肯定是不夠友好的.如果是直接遍歷數組的話, cpu會把數組中相鄰的元素一次加載到同一個cache line里面(一個cache line的大小一般是64個字節), 所以遍歷數組無疑效率更高.
有另一隊優化狂魔是上面論調的支持者及推廣者 disruptor https://github.com/LMAX-Exchange/disruptor
EventLoop構造方法的部分到此介紹完了, 接下來看看EventLoop怎么啟動的, 啟動后都做什么
EventLoop的父類SingleThreadEventExecutor中有一個startExecution()方法, 它最終會調用如下代碼: private final Runnable asRunnable = new Runnable() { @Override public void run() { updateThread(Thread.currentThread()); if (firstRun) { firstRun = false; updateLastExecutionTime(); } try { SingleThreadEventExecutor.this.run(); } catch (Throwable t) { cleanupAndTerminate(false); } } }; 這個Runnable不詳細解釋了, 它用來實現IO線程在fjp中死循環的自己調度自己, 只需要看 SingleThreadEventExecutor.this.run() 便知道, 接下來要轉戰EventLoop.run()方法了 protected void run() { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { cleanupAndTerminate(true); return; } } } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException ignored) {} } scheduleExecution(); }
為了避免代碼占用篇幅過大, 我去掉了注釋部分
首先強調一下EventLoop執行的任務分為兩大類:IO任務和非IO任務.
1)IO任務比如: OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE
2)非IO任務比如: bind、channelActive等
接下來看這個run方法的大致流程:
1.先調用hasTask()判斷是否有非IO任務, 如果有的話, 選擇調用非阻塞的selectNow()讓select立即返回, 否則以阻塞的方式調用select. 后續再分析select方法, 目前先把run的流程梳理完.
2.兩類任務執行的時間比例由ioRatio來控制, 你可以通過它來限制非IO任務的執行時間, 默認值是50, 表示允許非IO任務獲得和IO任務相同的執行時間, 這個值根據自己的具體場景來設置.
3.接着調用processSelectedKeys()處理IO事件, 后邊會再詳細分析.
4.執行完IO任務后就輪到非IO任務了runAllTasks().
5.最后scheduleExecution()是自己調度自己進入下一個輪回, 如此反復, 生命不息調度不止, 除非被shutDown了, isShuttingDown()方法就是去檢查state是否被標記為ST_SHUTTING_DOWN.
接下來分析阻塞select方法都做了什么, selectNow就略過吧
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } if (Thread.interrupted()) { selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } // ... } catch (CancelledKeyException ignored) {} }
1.首先執行delayNanos(currentTimeNanos), 這個方法是做什么的呢?
1)要了解delayNanos我們需要知道每個EventLoop都有一個延遲執行任務的隊列(在父類SingleThreadEventExecutor中), 是的現在我們知道EventLoop有2個隊列了.
2)delayNanos就是去這個延遲隊列里面瞄一眼是否有非IO任務未執行, 如果沒有則返回1秒鍾
3)如果很不幸延遲隊列里面有任務, delayNanos的計算結果就等於這個task的deadlineNanos到來之前的這段時間, 也即是說select在這個task按預約到期執行的時候就返回了, 不會耽誤這個task.
4)如果最終計算出來的可以無憂無慮select的時間(selectDeadLineNanos - currentTimeNanos)小於500000L納秒, 就認為這點時間是干不出啥大事業的, 還是selectNow一下直接返回吧, 以免耽誤了延遲隊列里預約好的task.
5)如果大於500000L納秒, 表示很樂觀, 就以1000000L納秒為時間片, 放肆的去執行阻塞的select了, 阻塞時間就是timeoutMillis(n * 1000000L納秒時間片).
2.阻塞的select返回后,如果遇到以下幾種情況則立即返回
a)如果select到了就緒連接(selectedKeys > 0)
b)被用戶waken up了
c)任務隊列(上面介紹的那個MPSC)來了一個任務
d)延遲隊列里面有個預約任務到期需要執行了
3.如果上面情況都不滿足, 代表select返回0了, 並且還有時間繼續愉快的玩耍
4.這其中有一個統計select次數的計數器selectCnt, select過多並且都返回0, 默認512就代表過多了, 這表示需要調用rebuildSelector()重建selector了, 為啥呢, 因為nio有個臭名昭著的epoll cpu 100%的bug, 為了規避這個bug, 無奈重建吧. 參考下面鏈接
http://bugs.java.com/view_bug.do?bug_id=6403933
https://github.com/netty/netty/issues/327
5.rebuildSelector的實際工作就是:
重新打開一個selector, 將原來的那個selector中已注冊的所有channel重新注冊到新的selector中, 並將老的selectionKey全部cancel掉, 最后將的selector關閉
6.重建selector后, 不死心的再selectNow一下
select過后, 有了一些就緒的讀啊寫啊等事件, 就需要processSelectedKeys()登場處理了, 我只分析一下優化了selectedKeys的處理方法processSelectedKeysOptimized(selectedKeys.flip())
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { for (;;) { if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; i++; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } } }
1.第一眼就看到這里要遍歷SelectionKey[]了, 上面提到HashSet-->array的優化就是為了這一步.
2.每次拿到一個之后SelectionKey立即釋放array對這個key的強引用
selectedKeys[i] = null;
這么做是為了幫助GC, 這個key處理完了就應該被GC回收了, 如果array對這個key繼續維持強引用, 在循環處理后續其他key的時候可能要消耗很長時間, 對GC, 還是能幫則幫吧, Doug lea在設計jsr166也就是jdk中juc包下面的代碼也有用到過類似小優化.
3.憑啥k.attachment()就是AbstractNioChannel呢?后續分析到register會看到如下一行代碼:
selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
其中this就是channel咯, 具體情況后續章節再詳細說
4.接下來拿到channel調用processSelectedKey(), 下面再詳細分析
5.有的時候需要select again, 比如被cancel的時候needsToSelectAgain被標記為true
6.接下來那個for循環中的處理同樣是 help gc
7. selectAgain()調用的是非阻塞的selectNow(), 然后重置index為-1重新開始新的循環
再看processSelectedKey方法:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
1.終於見到熟悉的NIO處理代碼了, 首先netty中每個channel都有一個unsafe,
1)作為NioSocketChannel它對應的unsafe是NioByteUnsafe
2)作為NioServerSocketChannel它對應的unsafe是NioMessageUnsafe
以上兩個的區別后續章節再詳細解釋, 先簡要說明下1)跟worker的channel相關, 2)跟boss的serverChannel相關
2.接下來就是根據readyOps來dispatch了, 后續都由unsafe來處理, unsafe留着以后章節分析
執行完IO任務以后, 輪到非IO任務了
protected boolean runAllTasks(long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; }
1. 先是fetchFromScheduledTaskQueue, 將延遲任務隊列中已到期的task拿到非IO任務的隊列中,此隊列即為上文中提到的MPSC隊列.
2. task即是從MPSC queue中彈出的任務
3. 又是計算一個deadline
4. 注意到 0x3F 了吧?轉換成10進制就是64-1, 就是每執行64個任務就檢查下時間, 如果到了deadline, 就退出, 沒辦法, IO任務是親生的, 非IO任務是后媽生的, 資源肯定要先緊IO任務用.
我們使用netty時也要注意, 不要產生大量耗時的非IO任務, 以免影響了IO任務.