什么是新連接接入?以及新連接接入前,Netty處於什么狀態
netty的服務端NioServerSocketChannel
初始化,注冊在BossGroup中的一條NioEventLoop
中,並且給NioServerSocketChannel
中維護的jdk原生的ServerSocketChannel
綁定好了端口后, EventLoop啟動,開始輪詢工作...
這時候 EventLoop 它在輪詢什么? 其實它在輪詢監聽當初NioServerSocketChannel經過二次注冊感興趣的事件時, 告訴 Selector,讓Selector關注自己身上可能會出現 OP_ACCEPT
事件, 這合情合理,因為對於Netty的主從Reactor線程模型中, BossGroup中的channel只關心OP_ACCEPT
也就是用戶的請求建立連接事件
netty的新連接接入要做哪些工作?
看上圖,netty的新連接接入,對應這個線程模型中我圈出來的部分, 主要步驟如下
- 服務端Selector輪詢到客戶端請求建立連接
- 處理請求
- 從服務端維護的JDK 原生ServerSocketChannel中accept()客戶端的channel
- 使用new的方法 將客戶端的Channel封裝成 NioSocketChannel
- 層層往上調用super(),初始化channel的組件
- 創建channel的配置類對象 config
- 向下傳播channelRead事件
- 給客戶端的channel設置相關參數
- 將客戶端的channel注冊在 workerGroup 中的輪詢算法選出的 EventLoop
- 將jdk原生的SocketChanel注冊進 EventLoop中的選擇器中
- 傳播channelregist事件
- 傳播channelActive事件
- 給客戶端的channel二次注冊netty可以處理的感興趣的事件
這是我總結的新連接接入的流程,從上面分析的開始檢查新鏈接,終止的標志是,把客戶端的NioSocketChannel二次注冊在EventLoop上,成為Netty可以處理的chanel為止
入口:NioEventLoop
處理IO事件
當服務端的事件循環檢測到有io事件時,使用它的processSelectedKeys();
處理,源碼如下:
private void processSelectedKeys() {
// todo selectedKeys 就是經過優化后的keys(底層是數組) , 默認不為null
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
當有了新IO請求進來, jdk原生的Selector將SelectionKey放入存放感興趣的key的集合中,而這個集合現在就是netty通過反射的方式強制替換為以數組為數據結構的selectedKeys
, 數組不為空, 跟進processSelectedKeysOptimized();
,源碼如下: 解析寫在源碼下面:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// todo 數組輸出空項, 從而允許在channel 關閉時對其進行垃圾回收
// See https://github.com/netty/netty/issues/2363
// todo 數組中當前循環對應的keys質空, 這種感興趣的事件只處理一次就行
selectedKeys.keys[i] = null;
// todo 獲取出 attachment,默認情況下就是注冊進Selector時,傳入的第三個參數 this===> NioServerSocketChannel
// todo 一個Selector中可能被綁定上了成千上萬個Channel, 通過K+attachment 的手段, 精確的取出發生指定事件的channel, 進而獲取channel中的unsafe類進行下一步處理
final Object a = k.attachment();
// todo
if (a instanceof AbstractNioChannel) {
// todo 進入這個方法, 傳進入 感興趣的key + NioSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
處理感興趣的事件, 想想,需要什么他才能進一步處理呢? 需要下面兩點:
- 這個感興趣的事件是啥?
- 在這了就是上面的 k
- 哪個channel出現的Selector感興趣的事件?
- 在這里是通過 attachment拿到的 a ,其實不就是服務端的
NioServerSocketChannel
?
- 在這里是通過 attachment拿到的 a ,其實不就是服務端的
另外它把NioServerSocketChannel
向上強轉成了AbstractNioChannel
這是為什么呢?
答:
第一點:
在我寫的上一篇Chanel的架構體系中,我們知道,Netty的NioXXXChannel其實是netty的,基於原生的jdk的chanel的封裝,而在他的整個繼承體系中,這個AbstractNioChannel
就負責維護jdk原生的channel, 知道了這有啥用? 當然有用,我們要去給客戶端channel接生了,原生服務端channel.accept()==客戶端channel
第二點:
針對數據的讀寫都是unsafe中,回想是哪個類中定義了讀取channel中IO數據的抽象模板函數呢? AbstractNioChannel
, 是它新增的內部接口,從而進客戶端和服務對針對chanel的不同特化read進行不同的實現
好, 有了這兩個條件,繼續跟進processSelectedKey(k, (AbstractNioChannel) a);
看它是如何處理, 源碼如下:
- 獲取到服務端的unsafe對象(數據讀寫)
- 根據k的readOps,進行計算決定執行
unsafe.read();
// todo 服務端啟動后,方法被用用處理新鏈接, 可以模擬 telnet localhost 8899 新鏈接的介入
// todo 處理selectedkey
// todo netty底層對數據的讀寫都是 unsafe完成的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// todo 這個unsafe 也是可channel 也是和Channel進行唯一綁定的對象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { // todo 確保Key的合法
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) { // todo 確保多線程下的安全性
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
// todo NioServerSocketChannel和selectKey都合法的話, 就進入下面的 處理階段
try {
// todo 獲取SelectedKey 的 關心的選項
int readyOps = k.readyOps();
// todo 在read() write()之前我們需要調用 finishConnect() 方法, 否則 NIO JDK拋出異常
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps( );
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// todo 同樣是檢查 readOps是否為零, 來檢查是否出現了 jdk 空輪詢的bug
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
下面我們進入 unsafe.read();
, 直接跳進去,直接進入到了AbstractNioChannel
的抽象內部類,因為上面說了,做了向上強制類型轉換,我們源碼如下:
/**
* Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
*/
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
void forceFlush();
}
具體的實現是誰? 因為我們是服務端的channel, 所以實現類是:NioMessageUnsafe
, 進入查看他的源碼: 下面這段代碼真的是挺長的, 它的解析我寫在他的下面:
@Override
public void read() {
// todo 同樣是斷言, 當前的線程必須是在 EventLoop 里面的線程才有資格執行
assert eventLoop().inEventLoop( );
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// todo 用於查看服務端接受的速率, 說白了就是控制服務端是否接着read 客戶端的IO事件
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// todo 進入
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
//todo 對讀到的連接,進行簡單的計數
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// todo 處理新的連接的邏輯來到這, 意思是讓pipeline中發生事件傳播,
// todo pipeline是誰的呢? 現在是NioMessageUnsafe 所以是服務端的,
// todo 事件是如何傳播的呢? head-->>ServerBootStraptAcceptor-->>tail 依次傳播,
// todo 傳播的什么事件? ChannelRead, 也就是說,會去調用 ServerBootStraptAcceptor的ChannelRead方法,跟進去
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
read()三部曲:
針對這段代碼,我們值關心下面幾部分, 這三部分結束, 整個新鏈接的建立就完成了,
下面三部曲的 大前提都是,當前我們是在AbstractNioMessageChannel
doReadMessages(readBuf)
allocHandle.incMessagesRead(localRead);
pipeline.fireChannelRead(readBuf.get(i));
第一步:
如何創建出jdk原生的 客戶端channel,對它做了什么?
第一步doReadMessages(readBuf)
這是AbstractNioMessageChannel
的抽象方法,從chanel讀取內容我們需要一個維護特化chanenl引用的對象,誰呢? 它的子類NioServerSocketChannel
, 源碼如下: 解析依然寫在代碼下面
//todo doReadMessage 其實就是 doChannel
// todo 處理新連接, 現在是在 NioServReaderSocketChannel里面
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// todo java Nio底層在這里 創建jdk底層的 原生channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// todo 把java原生的channel, 封裝成 Netty自定義的封裝的channel , 這里的buf是list集合對象,由上一層傳遞過來的
// todo this -- NioServerSocketChannel
// todo ch -- SocketChnnel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
這是個跨越性的操作, 上面的代碼主要進行如下面幾步工作:
- 從原生的jdk ServerSocketChannel中 accept出 jdk原生的 SocketChanel
- 將jdk原生的 Socket封裝成Netty對它的封裝類型
NioChannel
為啥,服務端的channel需要反射創建,而客戶的的channel直接new?
我的理解是,netty不僅可以做 NIO編程模型的服務器, 傳統的阻塞式IO,或者其他類型的服務器他也可以做, 我們傳遞進入的服務端Chanel的類型決定了他可以成為的服務器的類型, netty的設計者是不知道,用戶想用netty做些什么的,於是設計成通過反射創建
但是,一旦服務端的channel類型確定了,對應的客戶端的channel也一定知道了,直接new 就好了
NioSocketChannel的創建過程
我們跟進new NioSocketChannel(this, ch)
,繼續閱讀, 其中的 this,是服務端的NioServerSocketChannel
, ch 是 jdk原生的 SocketChannel, 方法調用鏈 的源碼如下:
public NioSocketChannel(Channel parent, SocketChannel socket) {
// todo 向上傳遞
super(parent, socket);
// todo 主要是設置 禁用了 NoDelay算法
config = new NioSocketChannelConfig(this, socket.socket());
}
跟進去, 看, 他把SelectionKey.OP_READ
,傳遞給了他的父類, 稍后 會用這個參數進行 cannel的二次注冊,使得NioSocketChannel
可以被netty處理它發生的感興趣的事件, 我們發現,和服務端的chanel明顯不同的是, 服務端的NioChannel關注用戶的accept,而這里的客戶端的channel關注的是read事件,它標志着,服務端的Selector會關心它當中傳遞進客戶端發送的數據,告訴Selector應該讀
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
繼續跟進,到AbstractNioChannel
, 他做了如下工作:
- super(parent) 把NioServerSocketChannel設置為NioSokcetChannel的父parent
- 自己維護原生的JDK SocketChannel
- 保存感性趣的選項
- 設置為非阻塞
源碼如下:
*/ // todo 無論是服務端的channel 還是客戶端的channel都會使用這個方法進行初始化
// // TODO: 2019/6/23 null ServerSocketChannel accept
// todo 如果是在創建NioSocketChannel parent==NioServerSocketChannel ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);// todo 繼續向上跟,創建基本的組件
// todo 如果是創建NioSocketChannel 這就是在保存原生的jdkchannel
// todo 如果是創建NioServerSocketChannel 這就是在保存ServerSocketChannel
this.ch = ch;
// todo 設置上感興趣的事件
this.readInterestOp = readInterestOp;
try {
// todo 作為服務端, ServerSocketChannel 設置為非阻塞的
// todo 作為客戶端 SocketChannel 設置為非阻塞的
ch.configureBlocking(false);
第二步:
現在NioSocketChannel已經創建完成了,代碼的調用棧重新返回上面的NioMessageUnsafe.read()
方法,我們接着往下看
//todo 對讀到的連接,進行簡單的計數
allocHandle.incMessagesRead(localRead);
第三步 pipeline.fireChannelRead(readBuf.get(i));
往下傳播channelRead(), 在管道中傳遞事件 channel, 對於服務端來說, 現在他的pipeline是怎么個狀態呢?
Header --> ServerBootStraptAcceptor --> tail
channel的pipeline組件是基於雙向鏈表實現,其中head和tail是默認的鏈表頭和尾, 中間的ServerBootStraptAcceptor是什么呢? 其實他是在創建服務端的NioServerSocketChannel
時,是在channel注冊完畢之后,通過回調,將ServerBootStrap
的init()
函數,給channel添加channelInitializer
時添加進去的; ServerBootStraptAcceptor
本質上就是handler, 回顧第一個圖, 他就是圖中的Acceptor
ok,現在我們去直接去ServerBootStraptAcceptor
中,他是ServerBootStrap
的內部類,我們看它的channelRead()方法,源碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// todo 給這個來連接的通道添加 childHandler,是我在Server中添加的childHandler, 實際上是那個MyChannelInitializer , 最終目的是添加handler
child.pipeline().addLast(childHandler);
// todo 給新來的Channel設置 options 選項
setChannelOptions(child, childOptions, logger);
// todo 給新來的Channel設置 attr屬性
for (Entry<AttributeKey<?>, Object> e : childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//todo 這里這!! 把新的channel注冊進 childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
我們可以看到,如下工作:
- 初始化屬性
- 把客戶端的channel的注冊進childGroup中的EventLoop
在這里補一張channelGroup的繼承圖
我們看這個childGroup.regist()
方法, 我們知道childGroup
是workerGroup,在本類中,它的類型是EventLoopGroup
,這是個接口類型的變量, 我們用點進去查看源碼自然跳轉進接口中,但是我們需要找他的實現類, 那么,是誰重寫的它的方法呢?
大家去看看上面的圖,它的直接實現類只有一個MultiThreadEventGroup
, 其實大家想想看,現在的任務是將原生的客戶端channel,注冊進WorkerGroup中的EventLoop,那第一步是啥呢? 不得先從這個 事件循環組中拿出一個事件循環嗎? 好,進去看MultiThreadEventGroup
是如何實現的, 源碼如下:
@Override
public ChannelFuture register(Channel channel) {
// todo next() -- 就在上面-> 根據輪詢算法獲取一個事件的執行器 EventExecutor
// todo, 而每一個EventLoop對應一個EventExecutor 這里之所以是個組, 是因為, 我的機器內核決定我的 事件循環組有八個線程,
// todo ?? ????
// todo 但是一會的責任並沒有一直循環, 難道有效的bossGroup只有一條
// todo 再進去就是SingleThreadEventLoop對此方法的實現了
return next().register(channel);
}
是的,確實在獲取事件循環,我們進行跟進 next().register(channel)
, 現在是 eventloop.regist()
,當我們進入方法時,再次來到EventLoopGroup
對這個方法的實現, ok,大家重新去看上面的圖,一個eventloop.regist(),現在不再是 循環組.regist 而是 事件循環.regist, 而在圖上,我們可以很輕松的看到 , 對EventLoopGroup
接口的實現就是 SingleThreadEventLoop
, 好,接着進去看它的實現, 源碼如下:
// todo register來到這里
@Override
public ChannelFuture register(Channel channel) {
// todo ChannelPromise == channel+Executor 跟進去
// todo 再次調用 register, 就在下面
return register(new DefaultChannelPromise(channel, this));
}
調用本類的register(new DefaultChannelPromise(channel, this)
,接着進去,源碼如下: 同樣解析寫在源碼下面
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// todo 重點來了
// todo channel() 獲取通道對象
// todo unsafe() 獲取僅供內部使用的unsafe對象 它定義在Channel接口中, 具體的對象是 Channel的子類, AbstractNioChannel
// todo unsafe對象進行下一步注冊 register
* promise.channel().unsafe().register(this, promise);
return promise;
}
promise.channel()
取出的是客戶端的NioSocketChanenl
promise.channel().unsafe()
是AbstractUnsafe
來到regist()
的實現類
方法調用鏈:
- 本類方法
register
- 本類方法
register0()
- 本類抽象方法
doRegister()
pipeline.fireChannelRegistered();
傳播channel注冊事件pipeline.fireChannelActive();
傳播channel Active事件- 二次注冊事件
- 本類抽象方法
- 本類方法
其中,上面的doRegister()
是真正的將jdk原生的channel注冊進原生的selector
pipeline.fireChannelRegistered();
是在 header --> ServerBootStraptAccptor --> 用戶自己添加的handler --> tail 中,挨個傳遞 ChannelRegistered
, 就是從頭開始調用它們的函數, 我們着重看下面的第三個
pipeline.fireChannelActive();
其實是比較繞的,涉及到了pipeline中事件的傳遞,但是它的作用很大,通過傳播channelActive挨個回調他們的狀態,netty成功的給這條客戶端的新連接注冊上了netty能處理的感興趣的事件
整體源碼太長了我不一一貼出來了, 直接看關於pipeline.fireChannelActive();
的源碼,如下:
if (isActive()) {
if (firstRegistration) {
// todo 在pipeline中傳播ChannelActive的行為,跟進去
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
// todo 可以接受客戶端的數據了
beginRead();
}
第一個判斷, if (isActive())
針對兩個channel,存在兩種情況
- 如果是服務端的channel, 只有在channel綁定完端口后,才會處於active的狀態
- 如果是客戶端的channel, 注冊到selector+處於連接狀態, 他就是active狀態
滿足條件,進入第一個分支判斷,同樣滿足第一次注冊的條件,開始傳播事件
回想一下,現在程序進行到什么狀態? 看上圖的subReactor
每一個藍色的箭頭都是一個客戶端的channel
, 問題是netty還處理不了這些channel上的會發生的感興趣的事件,因為第一步我們只是把jdk原生的chanel和原生的selector之間進行了關聯, 而netty對他們的封裝類還沒有關聯,於是下一步就通過傳播active的行為去二次注冊關聯感興趣的事件
關於pipeline中的事件傳遞太多內容了,在下篇博客中寫,連載
現在直接給結果,
傳遞到header的read()
源碼如下
@Override
public void read(ChannelHandlerContext ctx) {
// todo 如果是服務端: NioMessageUnsafe
// todo 如果是客戶端: NioSocketChannelUnsafe
unsafe.beginRead();
}
接着跟進
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
//todo 如果是服務端: 這里的SelectionKey就是我們在把NioServerSocketChannel 注冊進BoosGroup中的eventLoop時,返回的selectionKey , 就在上面
//todo 如果是客戶端: 這里的SelectionKey就是我們在把NioSocketChannel 注冊進BoosGroup中的eventLoop時,返回的selectionKey , 就在上面
final SelectionKey selectionKey = this.selectionKey;
// todo 這SelectionKey 就是我們把 NioServerSocketChannel中的ServerSocketChannel注冊進BossGroup時, 附加的第三個參數 0
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// todo 獲取這個Selection 的感興趣的事件,實際就是當時注冊時的第二個參數 0
final int interestOps = selectionKey.interestOps();
// todo 如果是服務端, readInterestOp是創建服務端channel時設置的 op_accept
// todo 如果是客戶端的新連接,readInterestOp是創建客戶端channel時設置的 op_read
if ((interestOps & readInterestOp) == 0) {
// todo interestOps | readInterestOp兩者進行或運算,原來是0事件 , 現在又增加了一個事件, accept事件或者是read
// todo 進而 從新注冊到SelectionKey上面去。。。 0P_Accept 或者 OP_Read
selectionKey.interestOps(interestOps | readInterestOp);
}
}
ok, 到這里netty的新鏈接接入就完成了....
連載下一篇, pipeline中的事件傳播