Java NIO之網絡編程


最近在研究Java NIO和netty,曾經一度感覺很吃力,根本原因還是對操作系統、TCP/IP、socket編程的理解不到位。

不禁感嘆,還是當初逃的課太多。

假如上天給我一次機會,能夠再回到意氣風發的校園時代,我想那些逃過的課,應該還是會逃。

畢竟在那個躁動的年紀,有很多的事情都比上課有意思。

不扯閑篇了,進入正題。

先重新理解一下socket編程,主要是基於TCP協議。上一張我從《Unix網絡編程》里面截取的一張圖

通過這張圖,能夠大概理解socket編程的幾個函數功能和調用順序,更為關鍵的是可以看出TCP協議的3次握手發生的時機。

但是這張圖並沒有很好的揭示socket是怎樣體現插座、插口的含義,所以我自己斗膽畫了一張圖,請多多指教。

借着這張圖,說幾個要點:

1、剛創建出來的socket,其實並沒有server和client之分,只是socket調用了listen方法之后,角色才改變,處理邏輯也隨之改變

2、client端的socket發送連接請求,server端的socket接收請求后,再創建一個socket與client端的socket傳遞數據,就像兩個插口在通信

3、每個socket都有發送緩存和接收緩存,操作系統可以根據這些緩存來判斷socket可讀、可寫、異常等狀態

4、server端的socket保存着2種連接隊列,后面還會說到

5、每個socket還會關聯一個文件描述符(文件句柄),操作系統通過這個文件描述符(文件句柄)操作socket。圖中並未畫出。

 

再來說說Linux的IO多路復用。

Linux的多種IO模型以及select、poll、epoll等的詳細介紹,我這里不贅述,主要也是因為段位不夠。

我比較關注的是IO多路復用的那些IO事件。先看看jdk里面SelectionKey類里面的幾個方法

 

    public final boolean isReadable() {
        return (readyOps() & OP_READ) != 0;
    }

   public final boolean isWritable() {
        return (readyOps() & OP_WRITE) != 0;
    }

    public final boolean isConnectable() {
        return (readyOps() & OP_CONNECT) != 0;
    }

    public final boolean isAcceptable() {
        return (readyOps() & OP_ACCEPT) != 0;
    }

 

方法名很簡單:可讀、可寫、可連接、可接收。從socket的緩存判斷可讀、可寫倒是很好理解;可是什么時候socket是可連接或者可接收呢???

於是硬着頭皮慢慢啃《TCP-IP詳解:卷2》,終於找到了一些端倪。不得不說,欠的債遲早是要還的。

 

下面再引入書中的一段文字:

圖 1 6 - 5 2 顯 示 了 插 口 的 讀 、 寫 和 例 外 情 況 。 我 們 將 看 到 s o o _ s e l e c t 使用了 s o r e a d a b l e 和 s o w r i t e a b l e 宏 , 這 些 宏 在 s y s / s o c k e t v a r . h 中定義。

1. 插口可讀嗎
1 1 3 - 1 2 0 s o r e a d a b l e 宏的定義如下:

    #define soreadable(so) \
        ((so)->so_rcv.sb_cc >= (so)->so_rcv.sb_lowat || \
        ((so)->so_state & SS_CANTRCVMORE) || \
       (so)->so_qlen || (so)->so_error)

因為 U D P 和 T C P 的 接 收 下 限 默 認 值 為 1 ( 圖 1 6 - 4 ) , 下 列 情 況 表 示 插 口 是 可 讀 的 : 接 收 緩 存 中有數據,連接的讀通道被關閉,可以接受任何連接或有掛起的差錯。

2. 插口可寫嗎
1 2 1 - 1 2 8 s o w r i t e a b l e 宏的定義如下:

    #define sowriteable(so) \
       (sbspace(&(so)->so_snd) >= (so)->so_snd.sb_lowat && \
        (((so)->so_state&SS_ISCONNECTED) || \
       ((so)->so_proto->pr_flags&PR_CONNREQUIRED)==0) || \
       ((so)->so_state & SS_CANTSENDMORE) || \
        (so)->so_error)

T C P 和 U D P 默 認 的 發 送 低 水 位 標 記 是 2 0 4 8 。對於 U D P 而言, s o w r i t e a b l e 總 是 為 真 , 因 為 s b s p a c e 總是等於 s b _ h i w a t , 當 然 也 總 是 大 於 或 等 於 s o _ l o w a t , 且 不 要 求 連 接 。對於 T C P 而 言 , 當 發 送 緩 存 中 的 可 用 空 間 小 於 2 0 4 8 個 字 節 時 , 插 口 不 可 寫 。 其 他 的 情 況在圖 1 6 - 5 2 中討論。

3. 還有掛起的例外情況嗎

1 2 9 - 1 4 0 對於例外情況,需檢查標志 s o _ o o b m a r k 和 S S _ R E C V A T M A R K 。 直 到 進 程 讀 完 數 據流中的同步標記后,例外情況才可能存在。 

原來,select調用的底層實現里面,把很多個事件都只是歸並進了可讀和可寫這兩種狀態。比如在我之前看來,server端的socket已經將連接排隊,就代表可連接狀態,可是在select看來,這就是可讀狀態。

 

有了前面的一些基礎,現在上一段Java NIO的代碼

        // 創建一個selector
        Selector selector = Selector.open();
        // 創建一個ServerSocketChannel
        ServerSocketChannel servChannel = ServerSocketChannel.open();
        servChannel.configureBlocking(false);
        // 綁定端口號
        servChannel.socket().bind(new InetSocketAddress(8080), 1024);
        // 注冊感興趣事件
        servChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        // select系統調用
        selector.select(1000);
        
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        SelectionKey key = null;
        while (it.hasNext()) {
            key = it.next();
            it.remove();
            if (key.isValid()) {
                // 處理新接入的請求消息
                if (key.isAcceptable()) {
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    // 接收客戶端的連接,並創建一個SocketChannel
                    SocketChannel sc = ssc.accept(); 
                    sc.configureBlocking(false);
                    // 將SocketChannel和感興趣事件注冊到selector
                    sc.register(selector, SelectionKey.OP_READ); 
                }
                if (key.isReadable()) {
                    // 讀數據的處理
                }
            }
        }            

分析這段代碼之前,先搞清楚selector、SelectionKey、pollArray等幾個數據結構以及相互持有關系。

 

pollArray干的是數組的活,但是並不是一個直接的數組。

selector誕生的時候,隨之關聯了一塊內存(pollArray),然后用unsafe類來小心翼翼的按字節順序寫入數據,最終實現了數組結構的功能。這種看似怪異的實現方式,應該是處於效率的考慮吧。

selector並沒有直接持有pollArray,而是持有一個pollArray的封裝類PollArrayWrapper的引用。

    // The poll fd array
    PollArrayWrapper pollWrapper; // 在selector的父類里面

    // The set of keys with data ready for an operation
    protected Set<SelectionKey> selectedKeys;

selectedKeys是一個集合,代表poll系統調用后返回的所有就緒事件,里面存放的數據結構是SelectionKey。

    final SelChImpl channel;                            // package-private
    public final SelectorImpl selector;

    // Index for a pollfd array in Selector that this key is registered with
    private int index; // pollArray里面的索引值,保存在這里是方便實現數組操作

    private volatile int interestOps; // 注冊的感興趣事件掩碼
    private int readyOps; // 就緒事件掩碼

SelectionKey不但持有channel,還持有selector;interestOps、readyOps與pollArray里面的eventOps、reventOps對應。

 

Java定義了一些針對文件描述符的事件,其實也是對底層操作系統poll定義的事件的一個映射。事件用掩碼來表示,非常方便進行位操作。如下:

    public static final short POLLIN       = 0x0001; // 文件描述符可讀
    public static final short POLLOUT      = 0x0004; // 文件描述符可寫
    public static final short POLLERR      = 0x0008; // 文件描述符出現錯誤
    public static final short POLLHUP      = 0x0010; // 文件描述符掛斷
    public static final short POLLNVAL     = 0x0020; // 文件描述符不對
    public static final short POLLREMOVE   = 0x0800; // 文件描述符移除

    @Native static final short POLLCONN   = 0x0002; // 可連接

我記得POLLCONN在之前的版本中直接被賦值成POLLOUT,這里改成了0x0002,這里我是真不知道為什么。希望高手來回復一下。

最終這些事件都會傳遞到內核的poll系統調用,去監控所有傳遞給poll的文件描述符。

 

回到之前的NIO代碼

1、先看看 servChannel.register(selector, SelectionKey.OP_ACCEPT) 是如何實現注冊的

一路調用后,會到一個關鍵方法

    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k); // 這一步把channel的文件描述符fd添加到pollArray(見上圖)
        }
        k.interestOps(ops); // 這一步把感興趣事件eventOps添加到pollArray(見上圖)
        return k;
    }

具體的邏輯肯定比注釋要復雜。接下來看看pollArray的內存操作,以添加文件描述符fd為例

    void putDescriptor(int i, int fd) {
        int offset = SIZE_POLLFD * i + FD_OFFSET;
        pollArray.putInt(offset, fd);
    }

    final void putInt(int offset, int value) {
        unsafe.putInt(offset + address, value);
    }

最終還是用unsafe直接修改內存

 

2、再看看最核心的selector.select(1000)。次方法最終調用doSelect方法,而doSelect方法的實現有多種,我們就以poll版本進行探秘

    // 做了很多刪減
    protected int doSelect(long timeout)
        throws IOException
    {
        // 執行最核心的poll系統調用
        pollWrapper.poll(totalChannels, 0, timeout);
        // 將到來的就緒事件更新保存
        int numKeysUpdated = updateSelectedKeys();
        return numKeysUpdated;
    }

poll系統調用會把用戶空間的線程掛起,也就是阻塞調用,timeout指定多長時間后必須返回。

updateSelectedKeys方法根據poll返回的channel就緒事件,去更新pollArray對應fd的reventOps(見上圖),以及selector的selectedKeys。

    /**
     * Copy the information in the pollfd structs into the opss
     * of the corresponding Channels. Add the ready keys to the
     * ready queue.
     */
    protected int updateSelectedKeys() {
        int numKeysUpdated = 0;
        // Skip zeroth entry; it is for interrupts only
        for (int i=channelOffset; i<totalChannels; i++) {
            // 得到就緒事件的掩碼
            int rOps = pollWrapper.getReventOps(i);
            if (rOps != 0) {
                SelectionKeyImpl sk = channelArray[i];
                pollWrapper.putReventOps(i, 0); // 重置為0,即為未就緒
                if (selectedKeys.contains(sk)) {
                    // 把事件的掩碼翻譯成SelectionKey中定義的操作(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT)
                    if (sk.channel.translateAndSetReadyOps(rOps, sk)) {
                        numKeysUpdated++;
                    }
                } else {
                    sk.channel.translateAndSetReadyOps(rOps, sk);
                    if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
                        // 更新selectedKeys
                        selectedKeys.add(sk);    
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }
            

 

把就緒事件的掩碼進行翻譯,感覺就像是Java做的一層適配,讓我們用戶不用去關注事件掩碼等細節

看一下實現這一邏輯的一段代碼,在ServerSocketChannel類里面:

/**
     * Translates native poll revent set into a ready operation set
     */
    public boolean translateReadyOps(int ops, int initialOps,
                                     SelectionKeyImpl sk) {
        int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
        int oldOps = sk.nioReadyOps();
        int newOps = initialOps;

        if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
            // This should only happen if this channel is pre-closed while a
            // selection operation is in progress
            // ## Throw an error if this channel has not been pre-closed
            return false;
        }

        if ((ops & (PollArrayWrapper.POLLERR
                    | PollArrayWrapper.POLLHUP)) != 0) {
            newOps = intOps;
            sk.nioReadyOps(newOps);
            return (newOps & ~oldOps) != 0;
        }
        // 這里將可連接當作可讀來看待的
        if (((ops & PollArrayWrapper.POLLIN) != 0) &&
            ((intOps & SelectionKey.OP_ACCEPT) != 0))
                newOps |= SelectionKey.OP_ACCEPT;

        sk.nioReadyOps(newOps);
        return (newOps & ~oldOps) != 0;
    }

 

通過上面的分析,大概有了一個清晰的思路:

Java NIO主要是基於底層操作系統提供的的IO多路復用功能,比如Linux下的select/poll、epoll等系統調用。Java層面為每個selector開辟了一塊內存,用來保存用戶注冊的所有channel、所有感興趣事件,並最終當作參數傳遞給底層的系統調用,最后將內核返回的結果封裝成selectedKeys等數據結構。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM