前言
在前文中,我們分析了服務器是如何啟動的。而服務器啟動后肯定是要接受客戶端請求並返回客戶端想要的信息的,否則要你服務器干啥子呢?所以,我們今天就分析分析 Netty 在啟動之后是如何接受客戶端請求的。
開始吧!
1. 從源頭開始
從之前服務器啟動的源碼中,我們得知,服務器最終注冊了一個 Accept 事件等待客戶端的連接。我們也知道,NioServerSocketChannel 將自己注冊到了 boss 單例線程池(reactor 線程)上,也就是 EventLoop 。
樓主還沒有仔細介紹 EventLoop ,但樓主這里先稍微講一下他的邏輯:
EventLoop 的作用是一個死循環,而這個循環中做3件事情:
- 有條件的等待 Nio 事件。
- 處理 Nio 事件。
- 處理消息隊列中的任務。
而我們今天看的就是第二個步驟。
首先需要進入到 NioEventLoop 源碼中。
2. 開始 debug
進入到 NioEventLoop 源碼中后,找到 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 方法 ,斷點打在下方:
debug 啟動我們的 EchoServer 的 main 方法。在瀏覽器鍵入 http://localhost:8007/,開始訪問我們的 Netty 服務器,這時候,斷點開始卡住。
從上圖中的斷點我們可以看到, readyOps 是16 ,也就是 Accept 事件。說明瀏覽器的請求已經進來了。那么這個 unsafe 是誰呢?就是 boss 線程中 NioServerSocketChannel 的AbstractNioMessageChannel$NioMessageUnsafe 對象。
我們進入到 AbstractNioMessageChannel$NioMessageUnsafe 的read 方法中。
AbstractNioMessageChannel$NioMessageUnsafe # read 方法
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
樓主限於篇幅,精簡了很多代碼,我們拆解一下代碼:
- 檢查該 eventloop 線程是否是當前線程。
- 執行 doReadMessages 方法,並傳入一個 readBuf 變量,這個變量是一個 List,也就是容器。
- 循環容器,執行 pipeline.fireChannelRead(readBuf.get(i));
我們分析一下上面的步驟:doReadMessages 肯定是讀取 boss 線程中的 NioServerSocketChannel 接受到的請求。並把這些請求放進容器,然后呢?循環容器中的所有請求,調用 pipeline 的 fireChannelRead 方法,用於處理這些接受的請求或者其他事件。
那么我們就來驗證一下。進入 doReadMessages 方法。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
樓主精簡了代碼,可以看到,和我們猜的不差,該方法很簡單,通過工具類,調用 NioServerSocketChannel 內部封裝的 serverSocketChannel 的 accept 方法,熟悉的 Nio 做法。然后獲取到一個 JDK 的 SocketChannel,然后,使用 NioSocketChannel 進行封裝。最后添加到容器中。
3. NioSocketChannel 是如何創建的?
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
buf.add(new NioSocketChannel(this, ch));
return 1;
}
我們另起一段研究這段代碼,先看 SocketUtils.accept(javaChannel());
public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() {
@Override
public SocketChannel run() throws IOException {
return serverSocketChannel.accept();
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
該方法調用了 NioServerSocketChannel 中的 serverSocketChannel.accept() 方法。返回了一個 Nio 的通道,注意:這個通道,就是我們剛剛 Boss 線程監聽到的 Accept 事件,相當於一個 Tcp 連接。
然后我們看 NioSocketChannel 的創建過程,其中參數 this 是 NioServerSocketChannel ,這個就是 SocketChannel 的 parent 屬性,ch 是 SocketChannel 。構造方法如下:
和 ServerSocket 類似,還記得 ServerSocket 是怎么創建的嗎:
還是很相似的。
我們先略過 config 的創建過程,先看 super。
這里設置了 SelectableChannel 屬性為 JDK 的 Nio 的 SocketChannel 和 感興趣的事件。設置非阻塞。
進入到 super 構造方法中:
也是和 ServerSocket 一樣了,注意:這里的 unsafe 就和 ServerSocket 不同了,此方法被重寫了,返回的是
NioSocketChannel$NioSocketChannelUnsafe, 是 NioSocketChannel 的內部類。再看 pipeline ,是相同的 DefaultChannelPipeline。同樣 pipeline 也會自己創建自己的 head 節點和 tail 節點。
好了,到這里,NioSocketChannel 就創建完畢了。
回到 最初的 read 方法中。
4. 循環執行 pipeline.fireChannelRead 方法
從上面我們可以看到,doReadMessages 方法的作用是通過 ServerSocket 的 accept 方法獲取到 Tcp 連接,然后封裝成 Netty 的 NioSocketChannel 對象。最后添加到 容器中。
然后再 read 方法中,循環調用 ServerSocket 的 pipeline 的 fireChannelRead 方法。從這個方法的名字可以感受到:開始執行 管道中的 handler 的 ChannelRead 方法。
那么我們就看看:
到這里,樓主就不一個一個 dubug 了,實際上,我們知道,pipeline 里面又 4 個 handler ,分別是 Head,LoggingHandler,ServerBootstrapAcceptor,Tail。我們重點關注 ServerBootstrapAcceptor。debug 之后,斷點會進入到 ServerBootstrapAcceptor 中來。我們來看看 ServerBootstrapAcceptor 的 channelRead 方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {// 將客戶端連接注冊到 worker 線程池
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我們講該方法拆解:
- msg 強轉成 Channel ,實際上就是 NioSocketChannel 。
- 添加 NioSocketChannel 的 pipeline 的 handler ,就是我們 main 方法里面設置的 childHandler 方法里的。
- 設置 NioSocketChannel 的各種屬性。
- 最重要的,將該 NioSocketChannel 注冊到 childGroup 中的一個 EventLoop 上,並添加一個監聽器。
我們重點看最后一步,這個 childGroup 就是我們 main 方法創建的數組大小為 16 的 workerGroup。在創建 ServerBootstrapAcceptor 添加進來的。
進入 register 方法查看:
這里的 next 方法我們之前介紹過了,使用位運算獲取數組中的EventLoop。
這里創建 DefaultChannelPromise 我們之前也看過了,最后該方法返回的就是這個 DefaultChannelPromise。
這里鏈式調用說明一下:
- premise 的 channel 方法返回的是 NioSocketChannel。
- promise.channel().unsafe() 返回的是 NioSocketChannel$NioSocketChannelUnsafe。
所以最終調用的是 NioSocketChannel 的內部類的 register 方法。參數是當前的 EventLoop 和 promise。
查看這個 register 方法:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {// 開始真正的異步,boss 線程開始啟動
@Override
public void run() {
register0(promise);
}
});
}
樓主精簡了一下代碼邏輯,其實就是同步或者異步的調用 register0 方法。大家可以向一下到底是異步還是同步?應該是異步的。應為此時的線程是 boss 線程,而不是 worder 線程,所以肯定無法通過 inEventLoop 判斷。
進入到異步線程中查看 register0 方法。其實和我們之前分析的注冊 ServerSocket 的過程是一樣的。
其中最核心的就是 doRegister 方法。
doRegister 方法
pipeline.invokeHandlerAddedIfNeeded() 方法
回到 register0 中,該方法在成功注冊到 selector 的讀事件后,繼續執行管道中可能存在的任務。那么管道中會存在什么任務呢?我們來看看:
到這里,我們發出疑問,這個 task 從哪里來的?
經過查找,我們發現,這個 pendingHandlerCallbackHead 變量來自我們 addLast 的時候,如果該 pipeline 還沒有注冊到這個 eventLoop 上,則將這個包裝過 handler 的 context 放進變量 pendingHandlerCallbackHead 中,事實上,這個 pendingHandlerCallbackHead 就是個鏈表的表頭,后面的 Context 會被包裝成一個任務,追加到鏈表的尾部。
那么這個 execute 方法如何執行呢?
主要是執行 callHandlerAdded0 方法,並且傳入這個 Context:
注意:這里調用了包裝了自定義 handler 的 Context 的 handlerAdded 方法,並且傳入了這個 Context。然后這個方法我們並沒有重寫,我們看父類中方法邏輯:
完美,調用了 initChannel 方法。但注意,這里調用的並不是我們重寫的 initChannel 方法,因為參數不是同一個類型,我們重寫的方法的參數是 SocketChannel,而不是ChannelHandlerContext,所以,肯定還需要再調用一層。
這里才是調用用戶代碼的地方。
我們的用戶代碼添加了兩個處理器,還有一個自定義的處理器。當然,現在添加處理器不會再添加到那個 pendingHandlerCallbackHead 任務鏈表里了,因為已經注冊過了,if 判斷過不了。
operationComplete 方法
然后設置promise 的 operationComplete 方法。還記得我們在ServerBootstrap 的 channelRead 方法中的代碼嗎?
在這里調用了我們之前的設置的監聽器的 operationComplete 方法。
pipeline.fireChannelRegistered() 方法
好,再然后調用 pipeline.fireChannelRegistered() 的方法。大家可以按照之前的 pipeline 的路子想一下,會如何執行?pipeline 作為管道,其中有我們設置的 handler 鏈表,這里,肯定會順序執行我們的 handler,比如 main 方法中的 childerHandler。我們繼續驗證一下。
該方法會繼續調用橋梁 Context 的 fireChannelRegistered 方法,Context 包裝的就是我們自定義的 handler。當然我們沒有重寫該方法。我們只重寫了 initChannel 方法。
pipeline.fireChannelActive() 方法
回到 register0 方法中,我們繼續向下走,如果是第一次注冊的話,執行pipeline.fireChannelActive()代碼,也就是執行 pipeline 管道中的 handler 的 ChannelActive 方法。
同樣,我們也沒有重寫該方法,父類會繼續回調 fireChannelActive 方法。而這個方法里會繼續尋找下一個 Context,然后繼續調用,直到遇到 pipeline 的 channelActive(ChannelHandlerContext ctx) 方法:
這里有一行 readIfIsAutoRead 方法,我們注意一下,上面的 ChannelActive 方法都執行結束后,也就是已經連接已經成功后,便開始調用read方法。
同樣的,如果熟悉服務器啟動過程的同學肯定看出來了,這里最終會調用 doBeginRead 方法,也就是 AbstractNioChannel 類的方法。
在之前的 doRegister 方法中,只是注冊了0,為什么呢?如果直接注冊1,也就是讀事件,但系統還沒有准備好讀取,現在一切都初始化了,就可以讀取了。而這里是管道的 head 節點調用 unsafe 方法的。
到這里,針對於這個客戶端的連接就完成了,接下來就可以監聽讀事件了。
總結:服務器接受客戶端過程
- 服務器輪詢 Accept 事件,獲取事件后調用 unsafe 的 read 方法,這個 unsafe 是 ServerSocket 的內部類,該方法內部由2部分組成。
- doReadMessages 用於創建 NioSocketChannel 對象,該對象包裝 JDK 的 Nio Channel 客戶端。該方法會像創建 ServerSocketChanel 類似創建相關的 pipeline , unsafe,config。
- 隨后執行 執行 pipeline.fireChannelRead 方法,並將自己綁定到一個 chooser 選擇器選擇的 workerGroup 中的一個 EventLoop。並且注冊一個0,表示注冊成功,但並沒有注冊讀(1)事件.
- 在注冊的同時,調用用戶程序中設置的 ChannelInitializer handler,向管道中添加一個自定義的處理器,隨后立即刪除這個 ChannelInitializer handler,為什么呢?因為初始化好了,不再需要。
- 其中再調用管道的 channelActive 方法中,會將曾經注冊過的 Nio 事件改成讀事件,開始真正的讀監聽。到此完成所有客戶端連接的讀前准備。
總的來說就是:接受連接----->創建一個新的NioSocketChannel----------->注冊到一個 worker EventLoop 上--------> 注冊selecot Read 事件。
當然,關於,獲取到讀事件后該怎么處理還沒有說,限於篇幅,留在下篇文章中。
good luck!!!!