最近在研究Java NIO和netty,曾經一度感覺很吃力,根本原因還是對操作系統、TCP/IP、socket編程的理解不到位。
不禁感嘆,還是當初逃的課太多。
假如上天給我一次機會,能夠再回到意氣風發的校園時代,我想那些逃過的課,應該還是會逃。
畢竟在那個躁動的年紀,有很多的事情都比上課有意思。
不扯閑篇了,進入正題。
先重新理解一下socket編程,主要是基於TCP協議。上一張我從《Unix網絡編程》里面截取的一張圖

通過這張圖,能夠大概理解socket編程的幾個函數功能和調用順序,更為關鍵的是可以看出TCP協議的3次握手發生的時機。
但是這張圖並沒有很好的揭示socket是怎樣體現插座、插口的含義,所以我自己斗膽畫了一張圖,請多多指教。

借着這張圖,說幾個要點:
1、剛創建出來的socket,其實並沒有server和client之分,只是socket調用了listen方法之后,角色才改變,處理邏輯也隨之改變
2、client端的socket發送連接請求,server端的socket接收請求后,再創建一個socket與client端的socket傳遞數據,就像兩個插口在通信
3、每個socket都有發送緩存和接收緩存,操作系統可以根據這些緩存來判斷socket可讀、可寫、異常等狀態
4、server端的socket保存着2種連接隊列,后面還會說到
5、每個socket還會關聯一個文件描述符(文件句柄),操作系統通過這個文件描述符(文件句柄)操作socket。圖中並未畫出。
再來說說Linux的IO多路復用。
Linux的多種IO模型以及select、poll、epoll等的詳細介紹,我這里不贅述,主要也是因為段位不夠。
我比較關注的是IO多路復用的那些IO事件。先看看jdk里面SelectionKey類里面的幾個方法
public final boolean isReadable() { return (readyOps() & OP_READ) != 0; } public final boolean isWritable() { return (readyOps() & OP_WRITE) != 0; } public final boolean isConnectable() { return (readyOps() & OP_CONNECT) != 0; } public final boolean isAcceptable() { return (readyOps() & OP_ACCEPT) != 0; }
方法名很簡單:可讀、可寫、可連接、可接收。從socket的緩存判斷可讀、可寫倒是很好理解;可是什么時候socket是可連接或者可接收呢???
於是硬着頭皮慢慢啃《TCP-IP詳解:卷2》,終於找到了一些端倪。不得不說,欠的債遲早是要還的。

下面再引入書中的一段文字:
圖 1 6 - 5 2 顯 示 了 插 口 的 讀 、 寫 和 例 外 情 況 。 我 們 將 看 到 s o o _ s e l e c t 使用了 s o r e a d a b l e 和 s o w r i t e a b l e 宏 , 這 些 宏 在 s y s / s o c k e t v a r . h 中定義。
1. 插口可讀嗎
1 1 3 - 1 2 0 s o r e a d a b l e 宏的定義如下:
#define soreadable(so) \
((so)->so_rcv.sb_cc >= (so)->so_rcv.sb_lowat || \
((so)->so_state & SS_CANTRCVMORE) || \
(so)->so_qlen || (so)->so_error)
因為 U D P 和 T C P 的 接 收 下 限 默 認 值 為 1 ( 圖 1 6 - 4 ) , 下 列 情 況 表 示 插 口 是 可 讀 的 : 接 收 緩 存 中有數據,連接的讀通道被關閉,可以接受任何連接或有掛起的差錯。
2. 插口可寫嗎
1 2 1 - 1 2 8 s o w r i t e a b l e 宏的定義如下:
#define sowriteable(so) \
(sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
(((so)->so_state&SS_ISCONNECTED) || \
((so)->so_proto->pr_flags&PR_CONNREQUIRED)==0) || \
((so)->so_state & SS_CANTSENDMORE) || \
(so)->so_error)
T C P 和 U D P 默 認 的 發 送 低 水 位 標 記 是 2 0 4 8 。對於 U D P 而言, s o w r i t e a b l e 總 是 為 真 , 因 為 s b s p a c e 總是等於 s b _ h i w a t , 當 然 也 總 是 大 於 或 等 於 s o _ l o w a t , 且 不 要 求 連 接 。對於 T C P 而 言 , 當 發 送 緩 存 中 的 可 用 空 間 小 於 2 0 4 8 個 字 節 時 , 插 口 不 可 寫 。 其 他 的 情 況在圖 1 6 - 5 2 中討論。
3. 還有掛起的例外情況嗎
1 2 9 - 1 4 0 對於例外情況,需檢查標志 s o _ o o b m a r k 和 S S _ R E C V A T M A R K 。 直 到 進 程 讀 完 數 據流中的同步標記后,例外情況才可能存在。
原來,select調用的底層實現里面,把很多個事件都只是歸並進了可讀和可寫這兩種狀態。比如在我之前看來,server端的socket已經將連接排隊,就代表可連接狀態,可是在select看來,這就是可讀狀態。
有了前面的一些基礎,現在上一段Java NIO的代碼
// 創建一個selector Selector selector = Selector.open(); // 創建一個ServerSocketChannel ServerSocketChannel servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); // 綁定端口號 servChannel.socket().bind(new InetSocketAddress(8080), 1024); // 注冊感興趣事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); // select系統調用 selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); if (key.isValid()) { // 處理新接入的請求消息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 接收客戶端的連接,並創建一個SocketChannel SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // 將SocketChannel和感興趣事件注冊到selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // 讀數據的處理 } } }
分析這段代碼之前,先搞清楚selector、SelectionKey、pollArray等幾個數據結構以及相互持有關系。

pollArray干的是數組的活,但是並不是一個直接的數組。
selector誕生的時候,隨之關聯了一塊內存(pollArray),然后用unsafe類來小心翼翼的按字節順序寫入數據,最終實現了數組結構的功能。這種看似怪異的實現方式,應該是處於效率的考慮吧。
selector並沒有直接持有pollArray,而是持有一個pollArray的封裝類PollArrayWrapper的引用。
// The poll fd array PollArrayWrapper pollWrapper; // 在selector的父類里面 // The set of keys with data ready for an operation protected Set<SelectionKey> selectedKeys;
selectedKeys是一個集合,代表poll系統調用后返回的所有就緒事件,里面存放的數據結構是SelectionKey。
final SelChImpl channel; // package-private public final SelectorImpl selector; // Index for a pollfd array in Selector that this key is registered with private int index; // pollArray里面的索引值,保存在這里是方便實現數組操作 private volatile int interestOps; // 注冊的感興趣事件掩碼 private int readyOps; // 就緒事件掩碼
SelectionKey不但持有channel,還持有selector;interestOps、readyOps與pollArray里面的eventOps、reventOps對應。
Java定義了一些針對文件描述符的事件,其實也是對底層操作系統poll定義的事件的一個映射。事件用掩碼來表示,非常方便進行位操作。如下:
public static final short POLLIN = 0x0001; // 文件描述符可讀 public static final short POLLOUT = 0x0004; // 文件描述符可寫 public static final short POLLERR = 0x0008; // 文件描述符出現錯誤 public static final short POLLHUP = 0x0010; // 文件描述符掛斷 public static final short POLLNVAL = 0x0020; // 文件描述符不對 public static final short POLLREMOVE = 0x0800; // 文件描述符移除 @Native static final short POLLCONN = 0x0002; // 可連接
我記得POLLCONN在之前的版本中直接被賦值成POLLOUT,這里改成了0x0002,這里我是真不知道為什么。希望高手來回復一下。
最終這些事件都會傳遞到內核的poll系統調用,去監控所有傳遞給poll的文件描述符。
回到之前的NIO代碼
1、先看看 servChannel.register(selector, SelectionKey.OP_ACCEPT) 是如何實現注冊的
一路調用后,會到一個關鍵方法
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); // 這一步把channel的文件描述符fd添加到pollArray(見上圖) } k.interestOps(ops); // 這一步把感興趣事件eventOps添加到pollArray(見上圖) return k; }
具體的邏輯肯定比注釋要復雜。接下來看看pollArray的內存操作,以添加文件描述符fd為例
void putDescriptor(int i, int fd) { int offset = SIZE_POLLFD * i + FD_OFFSET; pollArray.putInt(offset, fd); } final void putInt(int offset, int value) { unsafe.putInt(offset + address, value); }
最終還是用unsafe直接修改內存
2、再看看最核心的selector.select(1000)。次方法最終調用doSelect方法,而doSelect方法的實現有多種,我們就以poll版本進行探秘
// 做了很多刪減 protected int doSelect(long timeout) throws IOException { // 執行最核心的poll系統調用 pollWrapper.poll(totalChannels, 0, timeout); // 將到來的就緒事件更新保存 int numKeysUpdated = updateSelectedKeys(); return numKeysUpdated; }
poll系統調用會把用戶空間的線程掛起,也就是阻塞調用,timeout指定多長時間后必須返回。
updateSelectedKeys方法根據poll返回的channel就緒事件,去更新pollArray對應fd的reventOps(見上圖),以及selector的selectedKeys。
/** * Copy the information in the pollfd structs into the opss * of the corresponding Channels. Add the ready keys to the * ready queue. */ protected int updateSelectedKeys() { int numKeysUpdated = 0; // Skip zeroth entry; it is for interrupts only for (int i=channelOffset; i<totalChannels; i++) { // 得到就緒事件的掩碼 int rOps = pollWrapper.getReventOps(i); if (rOps != 0) { SelectionKeyImpl sk = channelArray[i]; pollWrapper.putReventOps(i, 0); // 重置為0,即為未就緒 if (selectedKeys.contains(sk)) { // 把事件的掩碼翻譯成SelectionKey中定義的操作(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT) if (sk.channel.translateAndSetReadyOps(rOps, sk)) { numKeysUpdated++; } } else { sk.channel.translateAndSetReadyOps(rOps, sk); if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) { // 更新selectedKeys selectedKeys.add(sk); numKeysUpdated++; } } } } return numKeysUpdated; }
把就緒事件的掩碼進行翻譯,感覺就像是Java做的一層適配,讓我們用戶不用去關注事件掩碼等細節
看一下實現這一邏輯的一段代碼,在ServerSocketChannel類里面:
/** * Translates native poll revent set into a ready operation set */ public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) { int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes int oldOps = sk.nioReadyOps(); int newOps = initialOps; if ((ops & PollArrayWrapper.POLLNVAL) != 0) { // This should only happen if this channel is pre-closed while a // selection operation is in progress // ## Throw an error if this channel has not been pre-closed return false; } if ((ops & (PollArrayWrapper.POLLERR | PollArrayWrapper.POLLHUP)) != 0) { newOps = intOps; sk.nioReadyOps(newOps); return (newOps & ~oldOps) != 0; } // 這里將可連接當作可讀來看待的 if (((ops & PollArrayWrapper.POLLIN) != 0) && ((intOps & SelectionKey.OP_ACCEPT) != 0)) newOps |= SelectionKey.OP_ACCEPT; sk.nioReadyOps(newOps); return (newOps & ~oldOps) != 0; }
通過上面的分析,大概有了一個清晰的思路:
Java NIO主要是基於底層操作系統提供的的IO多路復用功能,比如Linux下的select/poll、epoll等系統調用。Java層面為每個selector開辟了一塊內存,用來保存用戶注冊的所有channel、所有感興趣事件,並最終當作參數傳遞給底層的系統調用,最后將內核返回的結果封裝成selectedKeys等數據結構。
