【死磕NIO】— 探索 SocketChannel 的核心原理


大家好,我是大明哥,一個專注於【死磕 Java】系列創作的程序員。
死磕 Java 】系列為作者「chenssy」 傾情打造的 Java 系列文章,深入分析 Java 相關技術核心原理及源碼。
死磕 Java :https://www.cmsblogs.com/group/1420041599311810560

前兩篇文章我們分析了 Channel 及 FileChannel,這篇文章我們探究 SocketChannel的核心原理,畢竟下一個系列就是 【死磕 Netty】了。

聊聊Socket

要想掌握 SocketChannel,我們就必須先了解什么是 Socket。要想解釋清楚 Socket,就需要了解下 TCP/IP。

注:本文重點在 SocketChannel,所以對 TCP和 Socket僅僅只做相關介紹,有興趣的同學,麻煩自查專業資料

TCP/IP 體系結構

學過計算機網絡的小伙伴知道,計算機網絡是分層的,每層專注於一類事情。OSI 網路模型分為七層,如下:

OSI 模型是理論中的模型,在實際應用中我們使用的是 TCP/IP 四層模型,它對OSI模型重新進行了划分和規整,如下:

網絡層次划分清楚了,那怎么傳輸數據呢?如下圖:

計算機A首先在應用層將要發送的數據准備好,然后給傳輸層, 傳輸層的主要作用就是為發送端和接收端提供可靠的連接服務,傳輸層將數據處理完成后給網絡層, 網絡層的一個核心功能就是數據傳輸路徑的選擇。計算機A到計算機B有很多條路,網絡層的作用就是負責管理下一步數據應該到那個路由器,選擇好路徑后,數據就到了網絡接入層,該層主要負責將數據從一個路由器發送到另一個路由器。

上圖是一個非常清晰的傳輸過程。但是我們思考兩個個問題:

  1. 計算機A是怎么知道計算機B的具體位置的呢?
  2. 它又怎么知道將該數據包發送給哪個應用程序呢?

TCP/IP協議族已經幫我們解決了這個問題: IP地址+協議+端口

  • 網絡層的“IP地址”唯一標識了網絡中的主機:這樣就可以找到要將數據發送給哪台主機了。
  • 傳輸層的“協議 + 端口”唯一標識主機中的應用程序:這樣就可以找到要將數據發給那個應該程序了。

利用三元組(IP地址、協議、端口)就可以讓計算機A確定將數據包發送給計算機B的應用程序了。

使用TCP/IP 協議的應用程序通常采用編程接口:UNIX BSD的套接字(socket)和UNIX System V的TLI(已經被淘汰),來實現網絡進程之間的通信。就目前而言, 幾乎所有的應用程序都是采用的 Socket

Socket

上面提到就目前而言,幾乎所有的應用程序都是采用 Socket 來完成網絡通信的。那什么是Socket呢?百度百科是這樣定義的:

套接字(socket)是一個抽象層,應用程序可以通過它發送或接收數據,可對其進行像對文件一樣的打開、讀寫和關閉等操作。套接字允許應用程序將I/O插入到網絡中,並與網絡中的其他應用程序進行通信。網絡套接字是IP地址與端口的組合。

在TCP/IP四層模型中,我們並沒有看到 Socket 影子,那它到底在哪里呢? 又扮演什么角色呢?

Socket 並不是屬於 TCP/IP 模型中的任何一層,它的存在只是為了讓應用層能夠更加簡便地將數據傳輸給傳輸層,應用層不需要關注TCP/IP 協議的復雜內容。我們可以將其理解成一個接口,一個把復雜的TCP/IP協議族隱藏起來的接口,對於應用層而言,他們只需要簡單地調用 Socket 接口就可以實現復雜的TCP/IP 協議,就像設計模式中的門面模式( 將復雜的TCP\IP 協議族隱藏起來,對外提供統一的接口,是應用層能夠更加容易地使用)。簡單地說就是簡單來說可以把 Socket理解成是應用層與TCP/IP協議族通信的抽象層、函數庫

下圖是 Socket一次完整的通信流程圖:

上圖設計到的Socket 相關函數:

  • socket():返回套接字描述符
  • connect():建立連接
  • bind():一個本地協議地址賦予一個套接字
  • linsten():服務器監聽端口連接
  • accept():應用程序接受完成3次握手的客戶端連接
  • send()recv()write()read():服務端與客戶端互相發送數據
  • colse():關閉連接

探究SocketChannel

SocketChannel 是一個連接 TCP 網絡Socket 的 Channel,我們可以認為它是對傳統 Java Socket API的改進。它支持了非阻塞的讀寫。

SocketChannel具有如下特點

  1. 對於已經存在的socket不能創建SocketChannel。
  2. SocketChannel中提供的open接口創建的Channel並沒有進行網絡級聯,需要使用connect接口連接到指定地址。
  3. 未進行連接的SocketChannle執行I/O操作時,會拋出NotYetConnectedException
  4. SocketChannel支持兩種I/O模式:阻塞式和非阻塞式。
  5. SocketChannel支持異步關閉。如果SocketChannel在一個線程上read阻塞,另一個線程對該SocketChannel調用shutdownInput,則讀阻塞的線程將返回-1表示沒有讀取任何數據;如果SocketChannel在一個線程上write阻塞,另一個線程對該SocketChannel調用shutdownWrite,則寫阻塞的線程將拋出AsynchronousCloseException

SocketChannel 的使用

1. 創建SocketChannel

要想使用 SocketChannel我們首先得創建它。創建SocketChannel的方式有兩種:

// 方式 1
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("www.baidu.com", 80));

// 方式 2
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("www.baidu.com", 80));

2、連接校驗

使用的SocketChannel必須是已連接的,如果使用一個未連接的SocketChannel,則會拋出 NotYetConnectedException。SocketChannel提供了四個方法來校驗連接。

// 測試SocketChannel是否為open狀態
socketChannel.isOpen();
// 測試SocketChannel是否已經被連接   
socketChannel.isConnected();
// 測試SocketChannel是否正在進行連接
socketChannel.isConnectionPending();
// 校驗正在進行套接字連接的SocketChannel是否已經完成連接
socketChannel.finishConnect(); 

3、讀操作

SocketChannel 提供了 read()方法用於讀取數據:

public abstract int read(ByteBuffer dst) throws IOException;

public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

public final long read(ByteBuffer[] dsts) throws IOException {
  return read(dsts, 0, dsts.length);
}

首先我們需要先分配一個 ByteBuffer,然后調用 read()方法,該方法會將數據從SocketChannel讀入到 ByteBuffer中。

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);

read()方法會返回一個 int 值,該值表示讀取了多少數據到 Buffer 中,如果返回 -1,則表示已經讀到了流的末尾。

4、寫操作

調用 SocketChannel的write()方法,可以向 SocketChannel 中寫數據。

public abstract int write(ByteBuffer src) throws IOException;

public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

public final long write(ByteBuffer[] srcs) throws IOException {
    return write(srcs, 0, srcs.length);
}

5、設置 I/O 模式

SocketChannel 支持阻塞和非阻塞兩種 I/O 模式,調用 configureBlocking()方法即可:

socketChannel.configureBlocking(false);

false 表示非阻塞,true 表示阻塞。

6、關閉

當使用完 SocketChannel 后需要將其關閉,SocketChannel 提供了 close()來關閉 SocketChannel 。

socketChannel.close();

SocketChannel 源碼分析

上面簡單介紹了 SocketChannel 的使用,下面我們再來詳細分析 SocketChannel 的源碼。SocketChannel 實現 Channel 接口,它有一個核心子類 SocketChannel,該類實現了 SocketChannel 的大部分功能。如下(圖有刪減)

創建 SocketChannel

上面提到通過調用 open()方法就可以一個 SocketChannel 實例。

    public static SocketChannel open() throws IOException {
        return SelectorProvider.provider().openSocketChannel();
    }

我們看到它是通過 SelectorProvider 來創建 SocketChannel 的,provider() 方法會創建一個 SelectorProvider 實例,SelectorProvider 是 Selector 和 Channel 實例的提供者,它提供了創建 Selector、SocketChannel、ServerSocketChannel 實例的方法,采用 SPI 的方式實現。 SelectorProvider 我們在講解 Selector 的時候在闡述。

provider 創建完成后調用 openSocketChannel() 來創建 SocketChannel。

    public SocketChannel openSocketChannel() throws IOException {
        return new SocketChannelImpl(this);
    }

從這了就可以看出 SocketChannelImpl 為 SocketChannel 的實現者。調用 SocketChannelImpl 的構造函數實例化一個 SocketChannel 對象。

    SocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        // 創建 Socket 並創建一個文件描述符與其關聯
        this.fd = Net.socket(true);
        // 在注冊 selector 的時候需要獲取到文件描述符的值
        this.fdVal = IOUtil.fdVal(fd);
        // 設置狀態為未連接
        this.state = ST_UNCONNECTED;
    }

fd:文件夾描述符對象。

fdVal:fd 的 value。

文件描述符簡稱 fd,它是一個抽象概念,在 C 庫編程中可以叫做文件流或文件流指針,在其它語言中也可以叫做文件句柄(handler),而且這些不同名詞的隱含意義可能是不完全相同的。不過在系統層,我們統一把它叫做文件描述符。

state:狀態,設置為未連接。它有如下 6 個值

private static final int ST_UNINITIALIZED = -1;
private static final int ST_UNCONNECTED = 0;
private static final int ST_PENDING = 1;
private static final int ST_CONNECTED = 2;
private static final int ST_KILLPENDING = 3;
private static final int ST_KILLED = 4;

連接服務器:connect()

調用 Connect() 方法可以鏈接遠程服務器。

    public boolean connect(SocketAddress sa) throws IOException {
        int localPort = 0;
        
        // 注意這里的加鎖
        synchronized (readLock) {
            synchronized (writeLock) {
               // 確保當前 SocketChannel 是打開且未連接的
                ensureOpenAndUnconnected();
                InetSocketAddress isa = Net.checkAddress(sa);
                SecurityManager sm = System.getSecurityManager();
                if (sm != null)
                    sm.checkConnect(isa.getAddress().getHostAddress(),
                                    isa.getPort());
                // 這里的鎖是注冊和阻塞配置的鎖
                synchronized (blockingLock()) {
                    int n = 0;
                    try {
                        try {
                            // 支持線程中斷,通過設置當前線程的Interruptible blocker屬性實現
                            begin();
                            // 
                            synchronized (stateLock) {
                               // 默認為 open, 除非調用了 close()
                                if (!isOpen()) {
                                    return false;
                                }
                                // 只有未綁定本地地址也就是說未調用bind方法才執行
                                if (localAddress == null) {
                                    NetHooks.beforeTcpConnect(fd,
                                                           isa.getAddress(),
                                                           isa.getPort());
                                }
                                // 記錄當前線程
                                readerThread = NativeThread.current();
                            }
                            for (;;) {
                                InetAddress ia = isa.getAddress();
                                if (ia.isAnyLocalAddress())
                                    ia = InetAddress.getLocalHost();
                                // 調用 Linux 的 connect 函數實現
                                // 如果采用堵塞模式,會一直等待,直到成功或出現異常
                                n = Net.connect(fd,
                                                ia,
                                                isa.getPort());
                                if (  (n == IOStatus.INTERRUPTED)
                                      && isOpen())
                                    continue;
                                break;
                            }

                        } finally {
                            readerCleanup();
                            end((n > 0) || (n == IOStatus.UNAVAILABLE));
                            assert IOStatus.check(n);
                        }
                    } catch (IOException x) {
                        // 出現異常,關閉 Channel
                        close();
                        throw x;
                    }
                    synchronized (stateLock) {
                        remoteAddress = isa;
                        if (n > 0) {
                            // n > 0,表示連接成功
                            // 連接成功,更新狀態為ST_CONNECTED
                            state = ST_CONNECTED;
                            if (isOpen())
                                
                                localAddress = Net.localAddress(fd);
                            return true;
                        }
                        // 如果是非堵塞模式,而且未立即返回成功,更新狀態為ST_PENDING
                        // 由此可見,該狀態只有非堵塞時才會存在
                        if (!isBlocking())
                            state = ST_PENDING;
                        else
                            assert false;
                    }
                }
                return false;
            }
        }
    }

該方法的核心方法就在於 n = Net.connect(fd,ia,isa.getPort()); 該方法會一直調用到 native 方法去:

JNIEXPORT jint JNICALL
Java_sun_nio_ch_Net_connect0(JNIEnv *env, jclass clazz, jboolean preferIPv6,
                             jobject fdo, jobject iao, jint port)
{
    SOCKADDR sa;
    int sa_len = SOCKADDR_LEN;
    int rv;
    //地址轉換為struct sockaddr格式
    if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *) &sa,
                                  &sa_len, preferIPv6) != 0)
    {
      return IOS_THROWN;
    }
   //傳入 fd 和 sockaddr,與遠程服務器建立連接,一般就是 TCP 三次握手
   //如果設置了 configureBlocking(false), 不會堵塞,否則會堵塞一直到超時或出現異常
    rv = connect(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
    if (rv != 0) { 
        // 0 表示連接成功,失敗時通過 errno 獲取具體原因
        if (errno == EINPROGRESS) {  //非堵塞,連接還未建立(-2)
            return IOS_UNAVAILABLE;
        } else if (errno == EINTR) {  //中斷(-3)
            return IOS_INTERRUPTED;
        }
        return handleSocketError(env, errno); //出錯
    }
    return 1; //連接建立,一般TCP連接連接都需要時間,因此除非是本地網絡,一般情況下非堵塞模式返回IOS_UNAVAILABLE比較多;
}

讀數據:read()

SocketChannel 提供 read() 方法讀取數據。

   public int read(ByteBuffer buf) throws IOException {
        synchronized (readLock) {
            // ...
            try {
                // ...
                for (;;) {
                    n = IOUtil.read(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen()) {
                        continue;
                    }
                    return IOStatus.normalize(n);
                }

            } finally {
                // ...
            }
        }
    }

核心方法就在於 IOUtil.read(fd, buf, -1, nd)

    static int read(FileDescriptor fd, ByteBuffer dst, long position,NativeDispatcher nd)
        throws IOException
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if (dst instanceof DirectBuffer)
            // 使用直接緩沖區讀取數據
            return readIntoNativeBuffer(fd, dst, position, nd);

        // 當不是使用直接內存時,則從線程本地緩沖獲取一塊臨時的直接緩沖區存放待讀取的數據
        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
        try {
            int n = readIntoNativeBuffer(fd, bb, position, nd);
            bb.flip();
            if (n > 0)
                // 將直接緩沖區的數據寫入到堆緩沖區中
                dst.put(bb);
            return n;
        } finally {
            // 使用完成后釋放緩沖
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }

這里我們看到如果 ByteBuffer 是 DirectBuffer,則調用 readIntoNativeBuffer() 讀取數據,如果不是則通過 getTemporaryDirectBuffer() 獲取一個臨時的直接緩沖區,然后調用 readIntoNativeBuffer()獲取數據,然后將獲取的數據寫入 ByteBuffer 中。

    private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,long position, NativeDispatcher nd)
        throws IOException
    {
        int pos = bb.position();
        int lim = bb.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0);

        if (rem == 0)
            return 0;
        int n = 0;
        if (position != -1) {
            n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,rem, position);
        } else {
            n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
        }
        if (n > 0)
            bb.position(pos + n);
        return n;
    }

寫數據 write()方法和 read()方法大致一樣,大明哥這里就不在闡述了,有興趣的小伙伴自己去研究下。

ServerSocketChannel 與 SocketChannel 原理大同小異,這里就不展開講述了,下篇文章我們開始研究第三個組件: Selector

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM