全文圍繞下圖,Netty-Channel的簡化版架構體系圖展開,從頂層Channel接口開始入手,往下遞進,閑言少敘,直接開擼
概述: 從圖中可以看到,從頂級接口Channel開始,在接口中定義了一套方法當作規范,緊接着的是來兩個抽象的接口實現類,在這個抽象類中對接口中的方法,進行了部分實現,然后開始根據不同的功能分支,分成服務端的Channel和客戶端的Channel
回顧
Channel的分類
根據服務端和客戶端,Channel可以分成兩類(這兩大類的分支見上圖):
- 服務端:
NioServerSocketChannel
- 客戶端:
NioSocketChannel
什么是Channel?
channel是一個管道,用於連接字節緩沖區Buf和另一端的實體,這個實例可以是Socket,也可以是File, 在Nio網絡編程模型中, 服務端和客戶端進行IO數據交互(得到彼此推送的信息)的媒介就是Channel
Netty對Jdk原生的ServerSocketChannel
進行了封裝和增強封裝成了NioXXXChannel
, 相對於原生的JdkChannel, Netty的Channel增加了如下的組件
- id 標識唯一身份信息
- 可能存在的parent Channel
- 管道 pepiline
- 用於數據讀寫的unsafe內部類
- 關聯上相伴終生的NioEventLoop
本篇博客,會追溯上圖中的體系關系,找出NioXXXChannel
的相對於jdk原生channel在哪里添加的上面的新組件
源碼開始-Channel
現在來到上圖的Channel
部分, 他是一個接口, netty用它規定了一個Channel
應該具有的功能,在它的文檔對Channel的
是什么,以及對各個組件進行了描述
- 闡述了channel是什么,有啥用
Channel
通過ChannelPipeline
中的多個Handler
處理器,Channel
使用它處理IO數據Channel
中的所有Io操作都是異步的,一經調用就馬上返回,於是Netty基於Jdk原生的Future
進行了封裝,ChannelFuture
, 讀寫操作會返回這個對象,實現自動通知IO操作已完成Channel
是可以有parent的, 如下
// 創建客戶端channel時,會把服務端的Channel設置成自己的parent
// 於是就像下面:
服務端的channel = 客戶端的channel.parent();
服務的channel.parent()==null;
此外,Channel還定義了大量的抽象方法, 如下:
/**
* todo 返回一個僅供內部使用的unsafe對象, Chanel上 IO數據的讀寫都是借助這個類完成的
*/
Unsafe unsafe();
// 返回Channel的管道
ChannelPipeline pipeline();
ByteBufAllocator alloc();
@Override // todo 進入第一個實現 , 讀取Channel中的 IO數據
Channel read();
// 返回Channel id
ChannelId id();
// todo 返回channel所注冊的 eventLoop
EventLoop eventLoop();
// 返回當前Channel的父channel
Channel parent();
// todo 描述了 關於channel的 一些列配置信息
ChannelConfig config();
// 檢查channel是否開啟
boolean isOpen();
// 檢查channel是否注冊
boolean isRegistered();
// todo 什么是active 他說的是channel狀態, 什么狀態呢? 當前channel 若和Selector正常的通信就說明 active
boolean isActive();
// 返回channel的元數據
ChannelMetadata metadata();
// 服務器的ip地址
SocketAddress localAddress();
// remoteAddress 客戶端的ip地址
SocketAddress remoteAddress();
ChannelFuture closeFuture();
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
@Override
Channel flush();
Channel重要的內部接口 unsafe
Netty中,真正幫助Channel完成IO讀寫操作的是它的內部類unsafe
, 源碼如下, 很多重要的功能在這個接口中定義, 下面列舉的常用的方法
interface Unsafe {
// 把channel注冊進EventLoop
void register(EventLoop eventLoop, ChannelPromise promise);
// todo 給channel綁定一個 adress,
void bind(SocketAddress localAddress, ChannelPromise promise);
// 把channel注冊進Selector
void deregister(ChannelPromise promise);
// 從channel中讀取IO數據
void beginRead();
// 往channe寫入數據
void write(Object msg, ChannelPromise promise);
...
...
AbstractChanel
接着往下看,下面來到Channel
接口的直接實現類,AbstractChannel
他是個抽象類, AbstractChannel
重寫部分Channel
接口預定義的方法, 它的抽象內部類AbstractUnsafe
實現了Channel
的內部接口unsafe
我們現在是從上往下看,但是當我們創建對象使用的時候其實是使用的特化的對象,創建特化的對象就難免會調層層往上調用父類的構造方法, 所以我們看看AbstractChannel
的構造方法干了什么活? 源碼如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
// todo channelId 代表Chanel唯一的身份標志
id = newId();
// todo 創建一個unsafe對象
unsafe = newUnsafe();
// todo 在這里初始化了每一個channel都會有的pipeline組件
pipeline = newChannelPipeline();
}
我們看,AbstractChannel
構造函數, 接受的子類傳遞進來的參數只有一個parent CHannel,而且,還不有可能為空, 所以在AbstractChannel
是沒有維護jdk底層的Channel的, 相反他會維護着Channel關聯的EventLoop,我是怎么知道的呢? 首先,它的屬性中存在這個字段,而且,將channel注冊進selector的Register()方法是AbastractChannel
重寫的,Selector在哪呢? 在EventLoop里面,它怎么得到的呢? 它的子類傳遞了給了它
終於看出來點眉目,構造方法做了四件事
- 設置parent
- 如果當前創建的channel是客戶端的channel,把parent初始化為他對應的parent
- 如果為服務端的channel,這就是null
- 創建唯一的id
- 創建針對channel進行io讀寫的
unsafe
- 創建channel的處理器handler鏈
channelPipeline
AbstractChannel中維護着EventLoop
AbstractChanel
的重要抽象內部類AbstractUnsafe
繼承了Channel
的內部接口Unsafe
他的源碼如下,我貼出來了兩個重要的方法, 關於這兩個方法的解析,我寫在代碼的下面
protected abstract class AbstractUnsafe implements Unsafe {
@Override
// todo 入參 eventLoop == SingleThreadEventLoop promise == NioServerSocketChannel + Executor
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// todo 賦值給自己的 事件循環, 把當前的eventLoop賦值給當前的Channel上 作用是標記后續的所有注冊的操作都得交給我這個eventLoop處理, 正好對應着下面的判斷
// todo 保證了 即便是在多線程的環境下一條channel 也只能注冊關聯上唯一的eventLoop,唯一的線程
AbstractChannel.this.eventLoop = eventLoop;
// todo 下面的分支判斷里面執行的代碼是一樣的!!, 為什么? 這是netty的重點, 它大量的使用線程, 線程之間就會產生同步和並發的問題
// todo 下面的分支,目的就是把線程可能帶來的問題降到最低限度
// todo 進入inEventLoop() --> 判斷當前執行這行代碼的線程是否就是 SingleThreadEventExecutor里面維護的那條唯一的線程
// todo 解釋下面分支的必要性, 一個eventLoop可以注冊多個channel, 但是channel的整個生命周期中所有的IO事件,僅僅和它關聯上的thread有關系
// todo 而且,一個eventLoop在他的整個生命周期中,只和唯一的線程進行綁定,
//
// todo 當我們注冊channel的時候就得確保給他專屬它的thread,
// todo 如果是新的連接到了,
if (eventLoop.inEventLoop()) {
// todo 進入regist0()
register0(promise);
} else {
try {
// todo 如果不是,它以一個任務的形式提交 事件循環 , 新的任務在新的線程開始, 規避了多線程的並發
// todo 他是SimpleThreadEventExucutor中execute()實現的,把任務添加到執行隊列執行
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// todo 進入這個方法doRegister()
// todo 它把系統創建的ServerSocketChannel 注冊進了選擇器
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// todo 確保在 notify the promise前調用 handlerAdded(...)
// todo 這是必需的,因為用戶可能已經通過ChannelFutureListener中的管道觸發了事件。
// todo 如果需要的話,執行HandlerAdded()方法
// todo 正是這個方法, 回調了前面我們添加 Initializer 中添加 Accpter的重要方法
pipeline.invokeHandlerAddedIfNeeded();
// todo !!!!!!! 觀察者模式!!!!!! 通知觀察者,誰是觀察者? 暫時理解ChannelHandler 是觀察者
safeSetSuccess(promise);
// todo 傳播行為, 傳播什么行為呢? 在head---> ServerBootStraptAccptor ---> tail傳播事件ChannelRegistered , 也就是挨個調用它們的ChannelRegisted函數
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// todo 對於服務端: javaChannel().socket().isBound(); 即 當Channel綁定上了端口 isActive()才會返回true
// todo 對於客戶端的連接 ch.isOpen() && ch.isConnected(); 返回true , 就是說, Channel是open的 打開狀態的就是true
if (isActive()) {
if (firstRegistration) {
// todo 在pipeline中傳播ChannelActive的行為,跟進去
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
// todo 可以接受客戶端的數據了
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
// todo 由於端口的綁定未完成,所以 wasActive是 false
try {
// todo 綁定端口, 進去就是NIO原生JDK綁定端口的代碼
doBind(localAddress);
// todo 端口綁定完成 isActive()是true
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// todo 根據上面的邏輯判斷, 結果為 true
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
// todo 來到這里很重要, 向下傳遞事件行為, 傳播行為的時候, 從管道的第一個節點開始傳播, 第一個節點被封裝成 HeadContext的對象
// todo 進入方法, 去 HeadContext里面查看做了哪些事情
// todo 她會觸發channel的read, 最終重新為 已經注冊進selector 的 chanel, 二次注冊添加上感性趣的accept事件
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
// todo 觀察者模式, 設置改變狀態, 通知觀察者的方法回調
safeSetSuccess(promise);
}
AbstractChannel
抽象內部類的register(EventLoop,channelPromise)方法
這個方法,是將channel注冊進EventLoop的Selector, 它的調用順序如下:
本類方法 regist()--> 本類方法 register0() --> 本類抽象方法doRegister()
doRegister() 在這里設計成抽象方法,等着子類去具體的實現, 為啥這樣做呢?
剛才說了,AbstractChannel
本身就是個模板,而且它僅僅維護了EventLoop,沒有拿到channel引用的它根本不可能進行注冊的邏輯,那誰有jdk原生channel的引用呢? 它的直接子類AbstractNioChannel
下面是AbstractNioChannel
的構造方法, 它自己維護jdk原生的Channel,所以由他重寫doRegister()
,
*/ // todo 無論是服務端的channel 還是客戶端的channel都會使用這個方法進行初始化
// // TODO: 2019/6/23 null ServerSocketChannel accept
// todo 如果是在創建NioSocketChannel parent==NioServerSocketChannel ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);// todo 繼續向上跟,創建基本的組件
// todo 如果是創建NioSocketChannel 這就是在保存原生的jdkchannel
// todo 如果是創建NioServerSocketChannel 這就是在保存ServerSocketChannel
this.ch = ch;
// todo 設置上感興趣的事件
this.readInterestOp = readInterestOp;
try {
// todo 作為服務端, ServerSocketChannel 設置為非阻塞的
// todo 作為客戶端 SocketChannel 設置為非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
AbstractChannel
抽象內部類的bind()方法
bind()方法的調用順序, 本類方法 bind()--> 本類的抽象方法 dobind()
方法的目的是給Channel綁定上屬於它的端口,同樣有一個抽象方法,等着子類去實現,因為我們已經知道了AbstractChannel
不維護channel的引用,於是我就去找dobind()
這個抽象函數的實現, 結果發現,AbstractChannel
的直接子類AbstractNioChannel
中根本不沒有他的實現,這是被允許的,因為AbstractNioChannel
本身也是抽象類, 到底是誰實現呢? 如下圖:在NioServerSocketChannel中獲取出 Jdk原生的channel, 客戶端和服務端的channel又不同,所以綁定端口這中特化的任務,交給他們自己實現
AbstractChannel
的beginRead()()方法
上面完成注冊之后,就去綁定端口,當端口綁定完成,就會channel處於active
狀態,下一步就是執行beginRead()
,執行的流程如下
本類抽象方法 beginRead()
--> 本類抽象方法doBeginRead()
這個read()
就是從已經綁定好端口的channel中讀取IO數據,和上面的方法一樣,對於沒有channel
引用的AbstractChannel
來說,netty把它設計成抽象方法,交給擁有jdk 原生channel
引用的AbstractNioChannel
實現
小結:
AbstractChannel
作為Channel的直接實現類,本身又是抽象類,於是它實現了Channel的預留的一些抽象方法, 初始化了channel的四個組件 id pipeline unsafe parent, 更為重要的是它的抽象內部類 實現了 關於nettyChannel的注冊,綁定,讀取數據的邏輯,而且以抽象類的方法,挖好了填空題等待子類的特化實現
遞進AbstractNioChannel
跟進構造方法
依然是來到AbstractNioChannel
的構造方法,發現它做了如下的構造工作:
- 把parent傳遞給了
AbstractChannel
- 把子類傳遞過來的Channel要告訴Selector的感興趣的選項保存
- 設置channel為非阻塞
// todo 無論是服務端的channel 還是客戶端的channel都會使用這個方法進行初始化
// // TODO: 2019/6/23 null ServerSocketChannel accept
// todo 如果是在創建NioSocketChannel parent==NioServerSocketChannel ch == SocketChanel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);// todo 繼續向上跟,創建基本的組件
// todo 如果是創建NioSocketChannel 這就是在保存原生的jdkchannel
// todo 如果是創建NioServerSocketChannel 這就是在保存ServerSocketChannel
this.ch = ch;
// todo 設置上感興趣的事件
this.readInterestOp = readInterestOp;
try {
// todo 作為服務端, ServerSocketChannel 設置為非阻塞的
// todo 作為客戶端 SocketChannel 設置為非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
重寫了它父類的doRegister()
AbstractNioChannel
維護channel的引用,真正的實現把 jdk 原生的 channel注冊進 Selector中
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// todo javaChannel() -- 返回SelectableChanel 可選擇的Channel,換句話說,可以和Selector搭配使用,他是channel體系的頂級抽象類, 實際的類型是 ServerSocketChannel
// todo eventLoop().unwrappedSelector(), -- > 獲取選擇器, 現在在AbstractNioChannel中 獲取到的eventLoop是BossGroup里面的
// todo 到目前看, 他是把ServerSocketChannel(系統創建的) 注冊進了 EventLoop的選擇器
// todo 這里的 最后一個參數是 this是當前的channel , 意思是把當前的Channel當成是一個 attachment(附件) 綁定到selector上 作用???
// todo 現在知道了attachment的作用了
// todo 1. 當channel在這里注冊進 selector中返回一個selectionKey, 這個key告訴selector 這個channel是自己的
// todo 2. 當selector輪詢到 有channel出現了自己的感興趣的事件時, 需要從成百上千的channel精確的匹配出 出現Io事件的channel,
// todo 於是seleor就在這里提前把channel存放入 attachment中, 后來使用
// todo 最后一個 this 參數, 如果是服務啟動時, 他就是NioServerSocketChannel 如果是客戶端他就是 NioSocketChannel
// todo 到目前為止, 雖然注冊上了,但是它不關心任何事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
新增內部接口
AbstractNioChannel
新添加了一個內部接口,作為原Channel的擴展,源碼如下, 我們着重關心的就是這個新接口的 read()
方法, 它的作用是從channel去讀取IO數據,作為接口的抽象方法,它規范服務端和客戶端根據自己需求去不同的實現這個read()
怎么特化實現這個read方法呢? 若是服務端,它read的結果就是一個新的客戶端的連接, 如果是客戶端,它read的結果就是 客戶端發送過來的數據,所以這個read()
很有必要去特化
/**
* Read from underlying {@link SelectableChannel}
*/
// todo 兩個實現類, NioByteUnsafe , 處理關於客戶端發來的信息
// todo NioMessageUnsafe 處理客戶端新進來的連接
void read();
/**
* Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
*/
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
void forceFlush();
}
AbstractNioChannel
的抽象內部內同類時繼承了它父類的AbstractUnsafe
實現了當前的NioUnsafe
, 再往后看, 問題來了, 服務端和客戶端在的針對read的特化實現在哪里呢? 想想看肯定在它子類的unsafe內部類中,如下圖,紫框框
一會再具體看這兩個 內部類是如何特化read的 注意啊,不再是抽象的了
再進一步 AbstractNioMessageChannel
它的構造函數如下, 只是調用父類的構造函數,傳遞參數
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// todo 在進去
// todo null ServerSocketChannel accept
super(parent, ch, readInterestOp);
}
AbstractNioMessageChannel
的MessageNioUnsafe
對read()
特化實現
在read方法中,我們可以看到,他調用是本類的抽象方法doReadMessages(List<Object> buf)
, 方法的實現類是繼承體系的最底層的NioServerSocketChannel
, 因為他就是那個特化的服務端channel
當然如果我們一開始跟進read()時,來到的客戶端的AbstractNioByteChannel
,現在我們找到的doReadMessage()
就是由 客戶端的channelNioSocketChannel
完成的doReadBytes()
// todo 用於處理新鏈接進來的內部類
private final class NioMessageUnsafe extends AbstractNioUnsafe {
// todo 這個容器用於存放臨時讀到的連接
private final List<Object> readBuf = new ArrayList<Object>();
// todo 接受新鏈接的 read來到這里
@Override
public void read() {
...
doBeginRead(buf);
...
}
// todo 處理新的連接 是在 NioServerSocketChannel中實現的, 進入查看
protected abstract int doReadMessages(List<Object> buf) throws Exception;
最終,特化的channel實現
現在我們就來到了最底層,整張繼承圖就全部展現在眼前了,下面就去看看,特化的服務端Channel NioServerSocketChannel
和NioSocketChannel
對 doReadMessages()
和doReadBytes()
的各自實現
服務端, 我們看到了,它的特化read()
是在創建新的 Jdk遠程channel, 因為它在創建新的連接chanel
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// todo java Nio底層在這里 創建jdk底層的 原生channel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// todo 把java原生的channel, 封裝成 Netty自定義的封裝的channel , 這里的buf是list集合對象,由上一層傳遞過來的
// todo this -- NioServerSocketChannel
// todo ch -- SocketChnnel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
...
客戶端, 讀取客戶端發送過來的IO數據
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
小結:
Netty的channel繼承體系,到現在就完成了, 相信,當我們現在再從 NioServerEventLoop
入手,看他的初始化過程應該很簡單了, 其中我希望自己可以牢記幾個點
AbstractChannel
維護NioChannel
的EventLoop
AbstractNioChannel
維護jdk原生channel
AbstractChannel
中的AbstractUnsafe
主要是定義了一套模板,給子類提供了填空題,下面的三個填空- 注冊 把chanel注冊進Selector
- 綁定 把chanel綁定上端口
- 添加感興趣的事件, 給創建出來的channel二次注冊上netty可以處理的感興趣的事件
- channel的io操作是unsafe內部類完成的
- 服務端從channel,讀取出新連接
NioMessageUnsafe
- 客戶端從channel,讀取出數據
NioByteUnsafe
- 服務端從channel,讀取出新連接