通讀本文,你會了解到
1.netty如何接受新的請求
2.netty如何給新請求分配reactor線程
3.netty如何給每個新連接增加ChannelHandler
netty中的reactor線程
netty中最核心的東西莫過於兩種類型的reactor線程,可以看作netty中兩種類型的發動機,驅動着netty整個框架的運轉
一種類型的reactor線程是boos線程組,專門用來接受新的連接,然后封裝成channel對象扔給worker線程組;還有一種類型的reactor線程是worker線程組,專門用來處理連接的讀寫
不管是boos線程還是worker線程,所做的事情均分為以下三個步驟
- 輪詢注冊在selector上的IO事件
- 處理IO事件
- 執行異步task
對於boos線程來說,第一步輪詢出來的基本都是 accept 事件,表示有新的連接,而worker線程輪詢出來的基本都是read/write事件,表示網絡的讀寫事件
新連接的建立
簡單來說,新連接的建立可以分為三個步驟
1.檢測到有新的連接
2.將新的連接注冊到worker線程組
3.注冊新連接的讀事件
檢測到有新連接進入
我們已經知道,當服務端綁啟動之后,服務端的channel已經注冊到boos reactor線程中,reactor不斷檢測有新的事件,直到檢測出有accept事件發生
NioEventLoop.java
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); //檢查該SelectionKey是否有效,如果無效,則關閉channel if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 如果准備好READ或ACCEPT則觸發unsafe.read() ,檢查是否為0,如上面的源碼英文注釋所說:解決JDK可能會產生死循環的一個bug。 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件 // Connection already closed - no need to handle write. return; } } // 如果准備好了WRITE則將緩沖區中的數據發送出去,如果緩沖區中數據都發送完成,則清除之前關注的OP_WRITE標記 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 如果是OP_CONNECT,則需要移除OP_CONNECT否則Selector.select(timeout)將立即返回不會有任何阻塞,這樣可能會出現cpu 100% if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
該方法主要是對SelectionKey k進行了檢查,有如下幾種不同的情況
1)OP_ACCEPT,接受客戶端連接
2)OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取。
3)OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數據。
4)OP_CONNECT, 連接建立事件, 即 TCP 連接已經建立, Channel 處於 active 狀態。
本篇博文主要來看下當boss線程 selector檢測到OP_ACCEPT事件時,內部干了些什么。
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) {//如果已經關閉,則直接返回即可,不需要再處理該channel的其他事件 // Connection already closed - no need to handle write. return; } }
boos reactor線程已經輪詢到 SelectionKey.OP_ACCEPT
事件,說明有新的連接進入,此時將調用channel的 unsafe
來進行實際的操作,此時的channel為 NioServerSocketChannel,則unsafe為NioServerSocketChannel的屬性NioMessageUnsafe
那么,我們進入到它的read
方法,進入新連接處理的第二步
注冊到reactor線程
NioMessageUnsafe.java
private final List<Object> readBuf = new ArrayList<Object>(); public void read() { assert eventLoop().inEventLoop(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } } while (allocHandle.continueReading()); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); }
調用 doReadMessages
方法不斷地讀取消息,用 readBuf
作為容器,這里,其實可以猜到讀取的是一個個連接,然后調用 pipeline.fireChannelRead()
,將每條新連接經過一層服務端channel的洗禮,之后清理容器,觸發 pipeline.fireChannelReadComplete()
下面我們具體看下這兩個方法
1.doReadMessages(List)
2.pipeline.fireChannelRead(NioSocketChannel)
doReadMessages()
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
我們終於窺探到netty調用jdk底層nio的邊界 javaChannel().accept();
,由於netty中reactor線程第一步就掃描到有accept事件發生,因此,這里的accept
方法是立即返回的,返回jdk底層nio創建的一條channel
ServerSocketChannel有阻塞和非阻塞兩種模式:
a、阻塞模式:ServerSocketChannel.accept() 方法監聽新進來的連接,當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。阻塞模式下, accept()方法會一直阻塞到有新連接到達。
b、非阻塞模式:,accept() 方法會立刻返回,如果還沒有新進來的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null.
在NioServerSocketChannel的構造函數分析中,我們知道,其通過ch.configureBlocking(false);語句設置當前的ServerSocketChannel為非阻塞的。
netty將jdk的 SocketChannel
封裝成自定義的 NioSocketChannel
,加入到list里面,這樣外層就可以遍歷該list,做后續處理
從上一篇文章中,我們已經知道服務端的創建過程中會創建netty中一系列的核心組件,包括pipeline,unsafe等等,那么,接受一條新連接的時候是否也會創建這一系列的組件呢?
帶着這個疑問,我們跟進去
NioSocketChannel.java
public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); }
我們重點分析 super(parent, socket),
NioSocketChannel
的父類為 AbstractNioByteChannel
AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { super(parent, ch, SelectionKey.OP_READ); }
這里,我們看到jdk nio里面熟悉的影子—— SelectionKey.OP_READ
,一般在原生的jdk nio編程中,也會注冊這樣一個事件,表示對channel的讀感興趣
我們繼續往上,追蹤到AbstractNioByteChannel
的父類 AbstractNioChannel
, 這里,我相信讀了上一篇文章你對於這部分代碼肯定是有印象的
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { try { ch.close(); } catch (IOException e2) { if (logger.isWarnEnabled()) { logger.warn( "Failed to close a partially initialized socket.", e2); } } throw new ChannelException("Failed to enter non-blocking mode.", e); } }
在創建服務端channel的時候,最終也會進入到這個方法,super(parent)
, 便是在AbstractChannel
中創建一系列和該channel綁定的組件,如下
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
而這里的 readInterestOp
表示該channel關心的事件是 SelectionKey.OP_READ
,后續會將該事件注冊到selector,之后設置該通道為非阻塞模式,在channel中創建 unsafe 和一條 pipeline
pipeline.fireChannelRead(NioSocketChannel)
對於 pipeline
我們前面已經了解過,在netty的各種類型的channel中,都會包含一個pipeline,字面意思是管道,我們可以理解為一條流水線工藝,流水線工藝有起點,有結束,中間還有各種各樣的流水線關卡,一件物品,在流水線起點開始處理,經過各個流水線關卡的加工,最終到流水線結束
對應到netty里面,流水線的開始就是HeadContxt
,流水線的結束就是TailConext
,HeadContxt
中調用Unsafe
做具體的操作,TailConext
中用於向用戶拋出pipeline中未處理異常以及對未處理消息的警告
通過前面的文章中,我們已經知道在服務端的channel初始化時,在pipeline中,已經自動添加了一個pipeline處理器 ServerBootstrapAcceptor
, 並已經將用戶代碼中設置的一系列的參數傳入了構造函數,接下來,我們就來看下ServerBootstrapAcceptor
ServerBootstrapAcceptor.java
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; ServerBootstrapAcceptor( EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; } public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } } }
前面的 pipeline.fireChannelRead(NioSocketChannel);
最終通過head->unsafe->ServerBootstrapAcceptor的調用鏈,調用到這里的 ServerBootstrapAcceptor
的channelRead
方法,而 channelRead
一上來就把這里的msg強制轉換為 Channel
然后,拿到該channel,也就是我們之前new出來的 NioSocketChannel中
對應的pipeline,將用戶代碼中的 childHandler
,添加到pipeline,這里的 childHandler
在用戶代碼中的體現為
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } });
其實對應的是 ChannelInitializer
,到了這里,NioSocketChannel
中pipeline對應的處理器為 head->ChannelInitializer->tail,牢記,后面會再次提到!
接着,設置 NioSocketChannel
對應的 attr和option,然后進入到 childGroup.register(child)
,這里的childGroup就是我們在啟動代碼中new出來的NioEventLoopGroup
我們進入到NioEventLoopGroup
的register
方法,代理到其父類MultithreadEventLoopGroup
MultithreadEventLoopGroup.java
public ChannelFuture register(Channel channel) { return next().register(channel); }
這里又扯出來一個 next()方法,我們跟進去
MultithreadEventLoopGroup.java
@Override public EventLoop next() { return (EventLoop) super.next(); }
回到其父類
MultithreadEventExecutorGroup.java
@Override public EventExecutor next() { return chooser.next(); }
這里的chooser對應的類為 EventExecutorChooser
,字面意思為事件執行器選擇器,放到我們這里的上下文中的作用就是從worker reactor線程組中選擇一個reactor線程
public interface EventExecutorChooserFactory { /** * Returns a new {@link EventExecutorChooser}. */ EventExecutorChooser newChooser(EventExecutor[] executors); /** * Chooses the next {@link EventExecutor} to use. */ @UnstableApi interface EventExecutorChooser { /** * Returns the new {@link EventExecutor} to use. */ EventExecutor next(); } }
chooser的實現有兩種
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory { public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory() { } @SuppressWarnings("unchecked") @Override public EventExecutorChooser newChooser(EventExecutor[] executors) { if (isPowerOfTwo(executors.length)) { return new PowerOfTowEventExecutorChooser(executors); } else { return new GenericEventExecutorChooser(executors); } } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; } private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTowEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; } } private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } } }
默認情況下,chooser通過 DefaultEventExecutorChooserFactory
被創建,在創建reactor線程選擇器的時候,會判斷reactor線程的個數,如果是2的冪,就創建PowerOfTowEventExecutorChooser
,否則,創建GenericEventExecutorChooser
兩種類型的選擇器在選擇reactor線程的時候,都是通過Round-Robin的方式選擇reactor線程,唯一不同的是,PowerOfTowEventExecutorChooser
是通過與運算,而GenericEventExecutorChooser
是通過取余運算,與運算的效率要高於求余運算
選擇完一個reactor線程,即 NioEventLoop
之后,我們回到注冊的地方
public ChannelFuture register(Channel channel) { return next().register(channel); }
SingleThreadEventLoop.java
@Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
其實,這里已經和服務端啟動的過程一樣了,可以參考我前面的文章
AbstractNioChannel.java
private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } }
和服務端啟動過程一樣,先是調用 doRegister();
做真正的注冊過程,如下
protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
將該條channel綁定到一個selector
上去,一個selector被一個reactor線程使用,后續該channel的事件輪詢,以及事件處理,異步task執行都是由此reactor線程來負責
綁定完reactor線程之后,調用 pipeline.invokeHandlerAddedIfNeeded()
前面我們說到,到目前為止NioSocketChannel
的pipeline中有三個處理器,head->ChannelInitializer->tail,最終會調用到 ChannelInitializer
的 handlerAdded
方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { initChannel(ctx); } }
handlerAdded
方法調用 initChannel
方法之后,調用remove(ctx);
將自身刪除,如下
AbstractNioChannel.java
private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }
而這里的 initChannel
方法又是神馬玩意?讓我們回到用戶方法,比如下面這段用戶代碼
用戶代碼
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoServerHandler()); } });
原來最終跑到我們自己的代碼里去了啊!完了之后,NioSocketChannel
綁定的pipeline的處理器就包括 head->LoggingHandler->EchoServerHandler->tail
注冊讀事件
接下來,我們還剩下這些代碼沒有分析完
AbstractNioChannel.java
private void register0(ChannelPromise promise) { // .. pipeline.fireChannelRegistered(); if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } }
pipeline.fireChannelRegistered();
,其實沒有干啥有意義的事情,最終無非是再調用一下業務pipeline中每個處理器的 ChannelHandlerAdded
方法處理下回調
isActive()
在連接已經建立的情況下返回true,所以進入方法塊,進入到 pipeline.fireChannelActive();
在這里我詳細步驟先省略,直接進入到關鍵環節
AbstractNioChannel.java
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
這里其實就是將 SelectionKey.OP_READ
事件注冊到selector中去,表示這條通道已經可以開始處理read事件了
總結
至此,netty中關於新連接的處理已經向你展示完了,我們做下總結
1.boos reactor線程輪詢到有新的連接進入
2.通過封裝jdk底層的channel創建 NioSocketChannel
以及一系列的netty核心組件
3.將該條連接通過chooser,選擇一條worker reactor線程綁定上去
4.注冊讀事件,開始新連接的讀寫