前言
上一篇文章,我們對 Netty
做了一個基本的概述,知道什么是Netty
以及Netty
的簡單應用。
本篇文章我們就來說說Netty
的架構設計,解密高並發之道。學習一個框架之前,我們首先要弄懂它的設計原理,然后再進行深層次的分析。
接下來我們從三個方面來分析 Netty 的架構設計。
Selector 模型
Java NIO
是基於 Selector 模型來實現非阻塞的 I/O
。Netty 底層是基於 Java NIO
實現的,因此也使用了 Selector 模型。
Selector
模型解決了傳統的阻塞 I/O 編程一個客戶端一個線程的問題。Selector 提供了一種機制,用於監視一個或多個 NIO 通道,並識別何時可以使用一個或多個 NIO 通道進行數據傳輸。這樣,一個線程就可以管理多個通道,從而管理多個網絡連接。
Selector
提供了選擇執行已經就緒的任務的能力。從底層來看,Selector 會輪詢 Channel 是否已經准備好執行每個 I/O 操作。Selector 允許單線程處理多個 Channel 。Selector 是一種多路復用的技術。
SelectableChannel
並不是所有的 Channel 都是可以被 Selector 復用的,只有抽象類 SelectableChannel
的子類才能被 Selector 復用。
例如,FileChannel
就不能被選擇器復用,因為 FileChannel
不是SelectableChannel
的子類。
為了與 Selector 一起使用,SelectableChannel
必須首先通過register
方法來注冊此類的實例。此方法返回一個新的SelectionKey
對象,該對象表示Channel
已經在Selector
進行了注冊。向Selector
注冊后,Channel
將保持注冊狀態,直到注銷為止。
一個 Channel 最多可以使用任何一個特定的 Selector 注冊一次,但是相同的 Channel 可以注冊到多個 Selector 上。可以通過調用 isRegistered
方法來確定是否向一個或多個 Selector 注冊了 Channel。
SelectableChannel
可以安全的供多個並發線程使用。
Channel 注冊到 Selector
使用 SelectableChannel
的register
方法,可將Channel
注冊到Selector
。方法接口源碼如下:
public final SelectionKey register(Selector sel, int ops)
throws ClosedChannelException
{
return register(sel, ops, null);
}
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
其中各選項說明如下:
sel
:指定Channel
要注冊的Selector
。ops
: 指定Selector
需要查詢的通道的操作。
一個Channel在Selector注冊其代表的是一個SelectionKey
事件,SelectionKey
的類型包括:
OP_READ
:可讀事件;值為:1<<0
OP_WRITE
:可寫事件;值為:1<<2
OP_CONNECT
:客戶端連接服務端的事件(tcp連接),一般為創建SocketChannel
客戶端channel;值為:1<<3
OP_ACCEPT
:服務端接收客戶端連接的事件,一般為創建ServerSocketChannel
服務端channel;值為:1<<4
具體的注冊代碼如下:
// 1.創建通道管理器(Selector)
Selector selector = Selector.open();
// 2.創建通道ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.channel要注冊到Selector上就必須是非阻塞的,所以FileChannel是不可以使用Selector的,因為FileChannel是阻塞的
serverSocketChannel.configureBlocking(false);
// 4.第二個參數指定了我們對 Channel 的什么類型的事件感興趣
SelectionKey key = serverSocketChannel.register(selector , SelectionKey.OP_READ);
// 也可以使用或運算|來組合多個事件,例如
SelectionKey key = serverSocketChannel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);
值得注意的是
:一個 Channel
僅僅可以被注冊到一個 Selector
一次, 如果將 Channel
注冊到 Selector
多次, 那么其實就是相當於更新 SelectionKey
的 interest set
。
SelectionKey
Channel
和 Selector
關系確定后之后,並且一旦 Channel
處於某種就緒狀態,就可以被選擇器查詢到。這個工作再調用 Selector
的 select
方法完成。select
方法的作用,就是對感興趣的通道操作進行就緒狀態的查詢。
// 當注冊事件到達時,方法返回,否則該方法會一直阻塞
selector.select();
SelectionKey
包含了 interest
集合,代表了所選擇的感興趣的事件集合。可以通過 SelectionKey 讀寫 interest 集合,例如:
// 返回當前感興趣的事件列表
int interestSet = key.interestOps();
// 也可通過interestSet判斷其中包含的事件
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
// 可以通過interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);
可以看到,用位與
操作 interest 集合和給定的 SelectionKey 常量,可以確定某個確定的事件是否在 interest 集合中。
SelectionKey 包含了ready
集合。ready 集合是通道已經准備就緒的操作的集合。在一次選擇之后,會首先訪問這個 ready 集合。可以這樣訪問 ready 集合:
int readySet = key.readyOps();
// 也可通過四個方法來分別判斷不同事件是否就緒
key.isReadable(); //讀事件是否就緒
key.isWritable(); //寫事件是否就緒
key.isConnectable(); //客戶端連接事件是否就緒
key.isAcceptable(); //服務端連接事件是否就緒
我們可以通過SelectionKey
來獲取當前的channel
和selector
//返回當前事件關聯的通道,可轉換的選項包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();
//返回當前事件所關聯的Selector對象
Selector selector = key.selector();
可以將一個對象或者其他信息附着到 SelectionKey 上,這樣就能方便地識別某個特定的通道。
key.attach(theObject);
Object attachedObj = key.attachment();
還可以在用 register()
方法向 Selector 注冊 Channel 的時候附加對象。
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
遍歷 SelectionKey
一旦調用了 select
方法,並且返回值表明有一個或更多個通道就緒了,然后可以通過調用 selector
的 selectedKey()
方法,訪問 SelectionKey
集合中的就緒通道,如下所示:
Set<SelectionKey> selectionKeys = selector.selectedKeys();
可以遍歷這個已選擇的鍵集合來訪問就緒的通道,代碼如下:
// 獲取監聽事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代處理
while (iterator.hasNext()) {
// 獲取事件
SelectionKey key = iterator.next();
// 移除事件,避免重復處理
iterator.remove();
// 可連接
if (key.isAcceptable()) {
...
}
// 可讀
if (key.isReadable()) {
...
}
//可寫
if(key.isWritable()){
...
}
}
事件驅動
Netty是一款異步的事件驅動的網絡應用程序框架。在 Netty 中,事件是指對某些操作感興趣的事。例如,在某個
Channel
注冊了OP_READ
,說明該Channel
對讀感興趣,當Channel
中有可讀的數據時,它會得到一個事件的通知。
在 Netty
事件驅動模型中包括以下核心組件。
Channel
Channel(管道)是 Java NIO 的一個基本抽象,代表了一個連接到如硬件設備、文件、網絡 socket 等實體的開放連接,或者是一個能夠完成一種或多種不同的
I/O
操作的程序。
回調
回調 就是一個方法,一個指向已經被提供給另外一個方法的方法的引用。這使得后者可以在適當的時候調用前者,Netty 在內部使用了回調來處理事件;當一個回調被觸發時,相關的事件可以被一個
ChannelHandler
接口處理。
例如:在上一篇文章中,Netty 開發的服務端的管道處理器代碼中,當Channel
中有可讀的消息時,NettyServerHandler
的回調方法channelRead
就會被調用。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取數據實際(這里我們可以讀取客戶端發送的消息)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx =" + ctx);
Channel channel = ctx.channel();
//將 msg 轉成一個 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + channel.remoteAddress());
}
//處理異常, 一般是需要關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
Future
Future 可以看作是一個異步操作的結果的占位符;它將在未來的某個時刻完成,並提供對其結果的訪問,Netty 提供了
ChannelFuture
用於在異步操作的時候使用,每個 Netty 的出站 I/O 操作都將返回一個ChannelFuture
(完全是異步和事件驅動的)。
以下是一個 ChannelFutureListener
使用的示例。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
//..
}
});
}
事件及處理器
在 Netty 中事件按照出/入站數據流進行分類:
入站數據或相關狀態更改觸發的事件包括:
- 連接已被激活或者失活。
- 數據讀取。
- 用戶事件。
- 錯誤事件,
出站事件是未來將會出發的某個動作的操作結果:
- 打開或者關閉到遠程節點的連接。
- 將數據寫或者沖刷到套接字。
每個事件都可以被分發給ChannelHandler
類中的某個用戶實現的方法。如下圖展示了一個事件是如何被一個這樣的ChannelHandler
鏈所處理的。
ChannelHandler
為處理器提供了基本的抽象,可理解為一種為了響應特定事件而被執行的回調。
責任鏈模式
責任鏈模式(Chain of Responsibility Pattern)是一種行為型設計模式,它為請求創建了一個處理對象的鏈。其鏈中每一個節點都看作是一個對象,每個節點處理的請求均不同,且內部自動維護一個下一節點對象。當一個請求從鏈式的首端發出時,會沿着鏈的路徑依次傳遞給每一個節點對象,直至有對象處理這個請求為止。
責任鏈模式的重點在這個 "鏈"上,由一條鏈去處理相似的請求,在鏈中決定誰來處理這個請求,並返回相應的結果。在Netty中,定義了ChannelPipeline
接口用於對責任鏈的抽象。
責任鏈模式會定義一個抽象處理器(Handler)角色,該角色對請求進行抽象,並定義一個方法來設定和返回對下一個處理器的引用。在Netty中,定義了ChannelHandler
接口承擔該角色。
責任鏈模式的優缺點
優點:
- 發送者不需要知道自己發送的這個請求到底會被哪個對象處理掉,實現了發送者和接受者的解耦。
- 簡化了發送者對象的設計。
- 可以動態的添加節點和刪除節點。
缺點:
- 所有的請求都從鏈的頭部開始遍歷,對性能有損耗。
- 不方便調試。由於該模式采用了類似遞歸的方式,調試的時候邏輯比較復雜。
使用場景:
- 一個請求需要一系列的處理工作。
- 業務流的處理,例如文件審批。
- 對系統進行擴展補充。
ChannelPipeline
Netty 的ChannelPipeline
設計,就采用了責任鏈設計模式, 底層采用雙向鏈表的數據結構,,將鏈上的各個處理器串聯起來。
客戶端每一個請求的到來,Netty都認為,ChannelPipeline
中的所有的處理器都有機會處理它,因此,對於入棧的請求,全部從頭節點開始往后傳播,一直傳播到尾節點(來到尾節點的msg會被釋放掉)。
入站事件:通常指 IO 線程生成了入站數據(通俗理解:從 socket 底層自己往上冒上來的事件都是入站)。
比如EventLoop
收到selector
的OP_READ
事件,入站處理器調用socketChannel.read(ByteBuffer)
接受到數據后,這將導致通道的ChannelPipeline
中包含的下一個中的channelRead
方法被調用。
出站事件:通常指 IO 線程執行實際的輸出操作(通俗理解:想主動往 socket 底層操作的事件的都是出站)。
比如bind
方法用意時請求server socket
綁定到給定的SocketAddress
,這將導致通道的ChannelPipeline
中包含的下一個出站處理器中的bind
方法被調用。
將事件傳遞給下一個處理器
處理器必須調用ChannelHandlerContext
中的事件傳播方法,將事件傳遞給下一個處理器。
入站事件和出站事件的傳播方法如下圖所示:
以下示例說明了事件傳播通常是如何完成的:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
System.out.println("Closing...");
ctx.close(promise);
}
}
總結
正是由於 Netty 的分層架構設計非常合理,基於 Netty 的各種應用服務器和協議棧開發才能夠如雨后春筍般得到快速發展。
結尾
我是一個正在被打擊還在努力前進的碼農。如果文章對你有幫助,記得點贊、關注喲,謝謝!