NIO-SocketChannel源碼分析




NIO-SocketChannel源碼分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析

前言

本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。

本系列文章針對的是JDK1.8.0.161的源碼。

上一篇介紹了Channel的接口,本篇對SocektChannel的源碼進行解析。

ServerSocketChannelImpl

創建ServerSocketChannel

我們通過ServerSocketChannel.open()創建一個ServerSocketChannel,它實際通過provider創建。

public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel
{
    protected ServerSocketChannel(SelectorProvider provider) {
        super(provider);
    }

    public static ServerSocketChannel open() throws IOException {
        return SelectorProvider.provider().openServerSocketChannel();
    }
    ...
}

在首次創建時,創建初始化SelectorProvider對象。

public static SelectorProvider provider() {
        synchronized(lock) {
            return provider != null ? provider : (SelectorProvider)AccessController.doPrivileged(new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                    if (SelectorProvider.loadProviderFromProperty()) {
                        return SelectorProvider.provider;
                    } else if (SelectorProvider.loadProviderAsService()) {
                        return SelectorProvider.provider;
                    } else {
                        SelectorProvider.provider = DefaultSelectorProvider.create();
                        return SelectorProvider.provider;
                    }
                }
            });
        }
    }

具體SelcetProvider實現在《NIO-Selector》一節講解。

privoder創建完成后通過openServerSocketChannel創建ServiceSocketChannel

public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

ServerSocketChannelImpl靜態構造函數中會進行一些初始化操作。

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    private static NativeDispatcher nd;
    static {
        IOUtil.load();
        initIDs();
        nd = new SocketDispatcher();
    }
    private static native void initIDs();
    ...
}

初始化

通過靜態構造函數在首次創建時通過IOUtil.load()初始化一些必要的參數。IOUtil的靜態構造方法來加載net和nio的類庫。


public static void load() { }
static {
    java.security.AccessController.doPrivileged(
            new java.security.PrivilegedAction<Void>() {
                public Void run() {
                    //加載net和nio的庫
                    //net主要是應用層的一些協議實現,如FTP,Http
                    System.loadLibrary("net");
                    System.loadLibrary("nio");
                    return null;
                }
            });
    initIDs();
    IOV_MAX = iovMax();
}

通過System.loadLibrary加載后就可以調用該類的native方法。

initIDs是一個native方法,在網上搜了下也沒有找到相關資料,因此只能到native的源碼中看。這里對windows下的native源碼進行說明,在native\sun\nio\ch\ServerSocketChannelImpl.c可以看到ServerSocketChannelImpl的initIDs的代碼

Java_sun_nio_ch_ServerSocketChannelImpl_initIDs(JNIEnv *env, jclass cls)
{
    //查找類型FileDescriptor
    cls = (*env)->FindClass(env, "java/io/FileDescriptor");
    //獲取class的字段fd,I是int類型的意思,將字段id保存到fd_fdID,以后要獲取該字段就可以直接用fd_fdID去獲取
    fd_fdID = (*env)->GetFieldID(env, cls, "fd", "I");
    //查找InetSocketAddress類型
    cls = (*env)->FindClass(env, "java/net/InetSocketAddress");
    //創建一個引用指向cls,后面可以通過cls操作這個java對象
    isa_class = (*env)->NewGlobalRef(env, cls);
    //獲取構造函數的方法id,(Ljava/net/InetAddress;I)V 是反匯編后的構造函數簽名
    isa_ctorID = (*env)->GetMethodID(env, cls, "<init>",
                                     "(Ljava/net/InetAddress;I)V");
}

fd_fdIDisa_ctorIDisa_class保存后保存就省的在查找了。

  • FindClass:是查找指定的類型
  • GetFieldID:獲取類型字段id
  • NewGlobalRef:創建一個引用
  • GetMethodID:獲取方法id

關於JNI的字段和方法解釋可以看JNI GetFieldID和GetMethodID函數解釋及方法簽名

我們可以通過javap查看類的方法簽名,通過-p顯示所有類和成員,-l顯示行號和本地變量表,-c對代碼進行反匯編。

C:\Program Files\Java\jdk1.8.0_161\bin>javap
用法: javap <options> <classes>
其中, 可能的選項包括:
  -help  --help  -?        輸出此用法消息
  -version                 版本信息
  -v  -verbose             輸出附加信息
  -l                       輸出行號和本地變量表
  -public                  僅顯示公共類和成員
  -protected               顯示受保護的/公共類和成員
  -package                 顯示程序包/受保護的/公共類
                           和成員 (默認)
  -p  -private             顯示所有類和成員
  -c                       對代碼進行反匯編
  -s                       輸出內部類型簽名
  -sysinfo                 顯示正在處理的類的
                           系統信息 (路徑, 大小, 日期, MD5 散列)
  -constants               顯示最終常量
  -classpath <path>        指定查找用戶類文件的位置
  -cp <path>               指定查找用戶類文件的位置
  -bootclasspath <path>    覆蓋引導類文件的位置

在命令行輸入javap -p -c -l java.net.InetSocketAddress查看構造函數的方法簽名為Method "<init>":(Ljava/net/InetAddress;I)V


C:\Program Files\Java\jdk1.8.0_161\bin>javap -p -c -l java.net.InetSocketAddress
Compiled from "InetSocketAddress.java"
public class java.net.InetSocketAddress extends java.net.SocketAddress {
  private final transient java.net.InetSocketAddress$InetSocketAddressHolder holder;

  private static final long serialVersionUID;

...
public java.net.InetSocketAddress(int);
    Code:
       0: aload_0
       1: invokestatic  #215                // Method java/net/InetAddress.anyLocalAddress:()Ljava/net/InetAddress;
       4: iload_1
       5: invokespecial #219                // Method "<init>":(Ljava/net/InetAddress;I)V
       8: return
    LineNumberTable:
      line 166: 0
      line 167: 8
...

IOV_MAX用於獲取最大的可一次性寫入的緩存個數,當我們通過IOUtils.write一次性向Channel寫入多個Buffer時,會有Buffer最大數量限制的。

IOUtil.load初始化完成,則會創建SocketDispatcher,它提供了Socket的native方法,不同平台對於SocketDispatcher實現不一樣,最終都是調用FileDispatcherImpl執行相關的文件操作。

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    private final FileDescriptor fd;
    private int fdVal;
    private static final int ST_INUSE = 0;
    private int state = -1;
    ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
        super(sp);
        //首先通過Net.serverSocket(true)創建Socket並創建一個文件描述符與其關聯。
        this.fd =  Net.serverSocket(true);
        //在注冊selector的時候需要獲取到文件描述符的值。
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
    }

    ServerSocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) throws IOException {
        super(sp);
        this.fd =  fd;
        this.fdVal = IOUtil.fdVal(fd);
        this.state = ST_INUSE;
        //已綁定則直接獲取地址
        if (bound)
            localAddress = Net.localAddress(fd);//獲取傳入的文件描述符的socket地址
    }
    ...
}

文件描述符簡稱fd,它是一個抽象概念,在C庫編程中可以叫做文件流或文件流指針,在其它語言中也可以叫做文件句柄(handler),而且這些不同名詞的隱含意義可能是不完全相同的。不過在系統層,我們統一把它叫做文件描述符。

綁定和監聽

我們通過channel.bind可以將socket綁定到一個端口上。

public final ServerSocketChannel bind(SocketAddress local)
        throws IOException
{
    return bind(local, 0);
}
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
    synchronized (lock) {
        ...
        InetSocketAddress isa = (local == null) ? new InetSocketAddress(0) :
            Net.checkAddress(local);
        SecurityManager sm = System.getSecurityManager();
        //檢查是否端口已被監聽
        if (sm != null)
            sm.checkListen(isa.getPort());
        NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
        Net.bind(fd, isa.getAddress(), isa.getPort());
        //默認tcp待連接隊列長度最小為50
        Net.listen(fd, backlog < 1 ? 50 : backlog);
        synchronized (stateLock) {
            //從文件描述符中獲取地址信息
            localAddress = Net.localAddress(fd);
        }
    }
    return this;
}

首先會做一下基本校驗,包括檢查端口是否被占用。最終綁定並監聽端口。

文件描述符可以關聯到一個文件設備,最終可以關聯到socket結構體,通過socket結構體可以提取出地址信息。如何獲取地址信息可以看下根據文件描述符fd獲取socket結構體,socket結構體可以看下struct socket結構體詳解
關於backlog,由於TCP連接時會有3次握手,當服務端接收到SYN包時,會將該請求socket加入到待連接隊列中,然后返回SYN+ACK繼續進行連接握手。若連接並發量很高,而服務端來不及accept導致待連接隊列滿了,則后續的連接請求將會被拒絕。

在建立在綁定地址之前,我們需要調用NetHooks.beforeTcpBind,這個方法是將fd轉換為SDP(Sockets Direct Protocol,Java套接字直接協議) socket。

SDP需要網卡支持InfiniBand高速網絡通信技術,windows不支持該協議。

  • windows下的NetHooks
public final class NetHooks {
    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
    }

    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
    }
}
  • Solaris下的NetHooks
public final class NetHooks {

    public static abstract class Provider {
        protected Provider() {}
        public abstract void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
        public abstract void implBeforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException;
    }

    private static final Provider provider = new sun.net.sdp.SdpProvider();

    public static void beforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpBind(fdObj, address, port);
    }
    public static void beforeTcpConnect(FileDescriptor fdObj, InetAddress address, int port) throws IOException
    {
        provider.implBeforeTcpConnect(fdObj, address, port);
    }
}

實際調用了SdpProvider的方法


provider.implBeforeTcpBind(fdObj, address, port);
public void implBeforeTcpBind(FileDescriptor fdObj, InetAddress address, int port) throws IOException
{
    if (enabled)
        convertTcpToSdpIfMatch(fdObj, Action.BIND, address, port);
}
private void convertTcpToSdpIfMatch(FileDescriptor fdObj, Action action, InetAddress address, int port)
    throws IOException
{
    boolean matched = false;
    //匹配規則,一般有PortRangeRule校驗器校驗端口范圍是否符合規則
    for (Rule rule: rules) {
        if (rule.match(action, address, port)) {
            //將fd轉換為socket
            SdpSupport.convertSocket(fdObj);
            matched = true;
            break;
        }
    }
    ...
}

public static void convertSocket(FileDescriptor fd) throws IOException {
    ...
    //獲取fd索引
    int fdVal = fdAccess.get(fd);
    convert0(fdVal);
}
//調用native方法轉換
private static native int create0() throws IOException;

//rules 是在初始化SdpProvider加載的
public SdpProvider() {
    String file = AccessController.doPrivileged(
        new GetPropertyAction("com.sun.sdp.conf"));
    ...
    list = loadRulesFromFile(file);
    ...
    this.rules = list;
    this.log = out;
}

我們除了使用chennel.bind綁定地址以外,還可以通過channel.socket().bind綁定。channel.socket()會創建一個內部的ServerSocket,ServerSocketAdaptor把實現了SocketServer的配置、綁定Socket Server的功能抽出提取到了ServerSocket中。

class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl
{
    ...
    ServerSocket socket;
    public ServerSocket socket() {
        synchronized(this.stateLock) {
            if (this.socket == null) {
                this.socket = ServerSocketAdaptor.create(this);
            }
            return this.socket;
        }
    }
    ...
}

ServerSocketAdaptor實現了SocketServer的綁定,監聽等功能,但實際還是調用ServerSocketChannelImpl的方法。

public class ServerSocketAdaptor
    extends ServerSocket
{
    private final ServerSocketChannelImpl ssc;

    public static ServerSocket create(ServerSocketChannelImpl ssc) {
        try {
            return new ServerSocketAdaptor(ssc);
        } catch (IOException x) {
            throw new Error(x);
        }
    }
    private ServerSocketAdaptor(ServerSocketChannelImpl ssc)
        throws IOException
    {
        this.ssc = ssc;
    }
    
    public void bind(SocketAddress local, int backlog) throws IOException {
        ...
        ssc.bind(local, backlog);
        ...
    }
    public Socket accept() throws IOException {
        synchronized (ssc.blockingLock()) {
            ...
            SocketChannel sc;
            if ((sc = ssc.accept()) != null)
            ...
        }
    }
    public void close() throws IOException {
        ssc.close();
    }

接收

我們可以通過channel.accept接收一個新的通道。

public SocketChannel accept() throws IOException {
    synchronized (lock) {
        //基本的校驗
        if (!isOpen())
            throw new ClosedChannelException();
        if (!isBound())
            throw new NotYetBoundException();
        SocketChannel sc = null;

        int n = 0;
        //創建文件描述符和接收到的客戶端socket進行綁定
        FileDescriptor newfd = new FileDescriptor();
        InetSocketAddress[] isaa = new InetSocketAddress[1];

        try {
            //I/O開始前調用begin
            begin();
            if (!isOpen())
                return null;
            thread = NativeThread.current();
            for (;;) {
                //接收新連接,將給定的文件描述符與socket客戶端綁定,並設置套接字客戶端的地址。
                n = accept(this.fd, newfd, isaa);
                //返回 1 成功
                //若返回IOStaus.INTERRUPTED表示系統調用了中斷操作,繼續等待接收。
                if ((n == IOStatus.INTERRUPTED) && isOpen())
                    continue;
                break;
            }
        } finally {
            thread = 0;
            //I/O結束調用end
            end(n > 0);
            assert IOStatus.check(n);
        }
        //返回 1 成功
        if (n < 1)
            return null;
        //默認都是阻塞模式
        IOUtil.configureBlocking(newfd, true);
        InetSocketAddress isa = isaa[0];
        //初始化客戶端socketchannel
        sc = new SocketChannelImpl(provider(), newfd, isa);
        //檢查權限
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            try {
                sm.checkAccept(isa.getAddress().getHostAddress(),
                                isa.getPort());
            } catch (SecurityException x) {
                sc.close();
                throw x;
            }
        }
        return sc;

    }
}

SecurityManager是安全管理器,用於權限校驗的類,若權限校驗不通過則會拋出AccessControlException異常。

SocketChannelImpl

SocketChannel生命周期

20191209174933.png

  • SocketChannel會有一個state標記當前的狀態,默認為-1表示ST_UNINITIALIZED(未初始化)
  • 在構造函數最后會將state更新為0(ST_UNCONNECTED,未連接)
  • 調用connect連接服務端,連接成功之前更新state為1(ST_PENDING,待連接)
  • 連接成功時會更新state為2(ST_CONNECTED,已連接)
  • 關閉通道時若I/O未完成時會將state更新為3(ST_KILLPENDING,待釋放)
  • 當關閉通道后,且所有I/O已完成,會將state更新為4(ST_KILLED,已釋放)

創建SocketChannel

SocketChannelServerSocketChannel類似,通過SocketChannel.open創建channel,和服務端類似也是通過provider調用SocketChannle的構造函數。

public static SocketChannel open() throws IOException {
    return SelectorProvider.provider().openSocketChannel();
}

連接

我們也可以通過open(SocketAddress remote)傳入連接地址,在創建后直接。

public static SocketChannel open(SocketAddress remote)
    throws IOException
{
    SocketChannel sc = open();
    try {
        sc.connect(remote);
    }
    ...
    return sc;
}

public boolean connect(SocketAddress sa) throws IOException {
    
    ...
    begin();
    if (localAddress == null) {
        NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
    }
    ...
    //建立連接
    n = Net.connect(fd, ia, isa.getPort());
    ...
    end((n > 0) || (n == IOStatus.UNAVAILABLE));
    ...
    //更新狀態
    state = ST_CONNECTED;
    // 非阻塞socket則更新狀態為ST_PENDING
    if (!isBlocking())
        state = ST_PENDING;
    ...
}

NetHooks.beforeTcpConnectNetHooks.implBeforeTcpBind一樣,最終都是調用SdpProvider.convertTcpToSdpIfMatch

寫數據

將數據寫入channel時會調用IOUtil.write


public int write(ByteBuffer buf) throws IOException 
{
    ...
    begin();
    ...
    IOUtil.write(fd, buf, -1, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static int write(FileDescriptor fd, ByteBuffer src, long position, NativeDispatcher nd) throws IOException
{
    //使用直接緩沖區,則直接寫入到緩沖區中
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);

    // 不是直接緩沖區
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    //獲取一個臨時的直接緩沖區地址。
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        //寫入到臨時的直接緩沖區中
        bb.put(src);
        bb.flip();
        src.position(pos);
        //將直接緩沖區數據寫入
        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            //更新實際寫入量
            src.position(pos + n);
        }
        return n;
    } finally {
        //使用完緩沖釋放
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                             long position, NativeDispatcher nd)
    throws IOException
{
    ...
    if (position != -1) {
        //socket不支持,文件支持
        written = nd.pwrite(fd, ((DirectBuffer)bb).address() + pos, rem, position);
    } else {
        //調用native方法
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    ...
}
int write(FileDescriptor fd, long address, int len) throws IOException {
        return write0(fd, address, len, append);
}
static native int write0(FileDescriptor fd, long address, int len, boolean append)
    throws IOException;

當我們使用堆緩沖區時,需要從線程本地緩沖區申請一塊臨時的直接緩沖區用於存放臨時數據,會多一次內存復制。

關於為什么需要使用一塊臨時緩沖區做中間處理可以看下《Java NIO為什么需要DirectByteBuffer作為中間緩沖區》

ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
private static ThreadLocal<BufferCache> bufferCache = new ThreadLocal<BufferCache>()
{
    @Override
    protected BufferCache initialValue() {
        return new BufferCache();
    }
};
public static ByteBuffer getTemporaryDirectBuffer(int size) {
    //從線程緩沖區獲取一個緩沖區
    BufferCache cache = bufferCache.get();
    //獲取能容納的下的地址
    ByteBuffer buf = cache.get(size);
    if (buf != null) {
        return buf;
    } else {
        //當沒有合適的緩沖區時,
        if (!cache.isEmpty()) {
            //刪除第一個未用的緩沖區
            buf = cache.removeFirst();
            //釋放
            free(buf);
        }
        //重新分配一個合適大小的緩沖區
        return ByteBuffer.allocateDirect(size);
    }
}

ThreadLocal是利用空間換時間的一種方案性能優化方案,在每個線程會有一個ThreadLocalMap用於存放數據。這樣每個線程就訪問自己的數據,從而避免了非線程安全的數據由於並發問題需要通過加鎖的方式同步帶來的性能損耗。

聚集寫

當我們一次性向Channel寫入多個Buffer時,會有最大Buffer數量限制,也就是在Channel靜態構造函數獲取的IOV_MAX的值。一次性寫入多個Buffer在Linux中稱之為聚集寫,即將內存中分散在的若干緩沖區中的數據按順序寫至文件中。

調用Channel的write(ByteBuffer[] srcs, int offset, int length)一次性寫入多個Buffer。

public long write(ByteBuffer[] srcs, int offset, int length) throws IOException 
{
    ...
    begin();
    ...
    IOUtil.write(fd, srcs, offset, length, nd);
    ...
    end((n > 0) || (n == IOStatus.UNAVAILABLE));
    ...        
}
write(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length, NativeDispatcher nd)
{
    //獲取或創建一個指定長度的IOVecWrapper結構,它可以存放多個Buffer數據。
    IOVecWrapper vec = IOVecWrapper.get(length);
    ...
    while (i < count && iov_len < IOV_MAX) {
        //遍歷每一塊待寫入緩沖區
        ByteBuffer buf = bufs[i];
        //計算buffer的可讀大小存放rem中
        ...
        //將buffer放入IOVecWrapper中
        if (rem > 0) {
            vec.setBuffer(iov_len, buf, pos, rem);  
            if (!(buf instanceof DirectBuffer)) {
                //若buf不是直接緩沖區則需要創建一個臨時的直接緩沖區
                ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
                ...
                vec.setShadow(iov_len, shadow);
                ...
                //更新待寫入緩沖區引用指向直接緩沖區。
                buf = shadow;
            }
            //設置當前緩沖區的起始地址
            vec.putBase(iov_len, ((DirectBuffer)buf).address() + pos);
            //設置當前緩沖區的長度
            vec.putLen(iov_len, rem);
            iov_len++;
        }
        i++;
    }
    //調用writev一次性寫入多個緩沖區的數據。
    long bytesWritten = nd.writev(fd, vec.address, iov_len);
    ...
    long left = bytesWritten;
    //清理工作
    for (int j=0; j<iov_len; j++) {
        if (left > 0) {
            ByteBuffer buf = vec.getBuffer(j);
            int pos = vec.getPosition(j);
            int rem = vec.getRemaining(j);
            int n = (left > rem) ? rem : (int)left;
            buf.position(pos + n);
            left -= n;
        }
        ByteBuffer shadow = vec.getShadow(j);
        if (shadow != null)
        //將已寫入完成的緩沖區放回到臨時直接緩沖區中
            Util.offerLastTemporaryDirectBuffer(shadow);
        // 清楚IOVcMrapper的緩存
        vec.clearRefs(j);
    }
    return bytesWritten;
}

與聚集寫對應的還有readv()稱為分散(散布)讀,即將文件中若干連續的數據塊讀入內存分散的緩沖區中。
關於linux的聚集寫和散步讀具體內容可以閱讀readv()和writev()函數分散讀取與聚集寫入

通過IOVecWrapper結構構建了一個字節緩沖區數組用於存放多個Buffer,其內部實際維護了一個Native數組結構。


class IOVecWrapper {
    ...
    private final AllocatedNativeObject vecArray;
    final long address;
    static int addressSize;
    private IOVecWrapper(int size) {
        ...
        //false表示無需頁面對齊
        this.vecArray  = new AllocatedNativeObject(size * SIZE_IOVEC, false);
        this.address   = vecArray.address();
    }
    ...
    static {
        //獲取本機指針的大小
        addressSize = Util.unsafe().addressSize();
        //保存每個指針偏移量
        LEN_OFFSET = addressSize;
        //用於保存每個AllocatedNativeObject對象的元素的大小
        //每個NativeObject有兩個long屬性,因此需要×2
        SIZE_IOVEC = (short) (addressSize * 2);
    }
}

AllocatedNativeObject在堆外申請一片內存存放本機對象。

class AllocatedNativeObject extends NativeObject
{
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }    
}

class NativeObject {
    // native 分配地址,可能小於基地址,由於頁面大小會對齊,所以實際的及地址時頁面對其后的地址。
    protected long allocationAddress;
    // native 基地址
    private final long address;

}

通過上面我們知道當一次性批量寫入緩存時會調用native的writev聚集寫方法,調用之前會申請一塊地址用於存放可寫入的多塊緩沖區數據。現在我們在回過頭看看IOVecWrapper具體是怎么做的。

class IOVecWrapper {
    ...
    private final AllocatedNativeObject vecArray;
    private final ByteBuffer[] buf;
    private final int[] position;
    private final int[] remaining;
    private final ByteBuffer[] shadow;
    //每個線程保存一份IOVecWrapper緩存
    private static final ThreadLocal<IOVecWrapper> cached =
        new ThreadLocal<IOVecWrapper>();
    //通過get獲取一塊適合大小的空間
    static IOVecWrapper get(int size) {
        IOVecWrapper wrapper = cached.get();
        if (wrapper != null && wrapper.size < size) {
            //若獲取到空間不夠大,則重新初始化一個空間。
            wrapper.vecArray.free();
            wrapper = null;
        }
        if (wrapper == null) {
            wrapper = new IOVecWrapper(size);
            //native資源,當對象釋放時使得操作系統可以釋放內存,在將Buffer的時候關於直接緩沖區提到過相關的知識
            Cleaner.create(wrapper, new Deallocator(wrapper.vecArray));
            cached.set(wrapper);
        }
        return wrapper;
    }
    //將buffer保存起來
    void setBuffer(int i, ByteBuffer buf, int pos, int rem) {
        this.buf[i] = buf;
        this.position[i] = pos;
        this.remaining[i] = rem;
    }
    ...
}

由於IOVecWrapper內部使用的是直接緩沖區,因此將它存放於ThreadLocalMap中復用以提高性能。

讀數據

從channel讀取數據時會調用IOUtil.read

public int read(ByteBuffer buf) throws IOException {
    ...
    begin();
    ...
    n = IOUtil.read(fd, buf, -1, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static int read(FileDescriptor fd, ByteBuffer dst, long position, NativeDispatcher nd) throws IOException
{
    if (dst.isReadOnly())
        throw new IllegalArgumentException("Read-only buffer");
    if (dst instanceof DirectBuffer)
        return readIntoNativeBuffer(fd, dst, position, nd);//使用直接緩沖區

    //當不是使用直接內存時,則從線程本地緩沖獲取一塊臨時的直接緩沖區存放待讀取的數據
    ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
    try {
        //讀取
        int n = readIntoNativeBuffer(fd, bb, position, nd);
        bb.flip();
        if (n > 0)
        //將直接緩沖區的數據寫入到堆緩沖區中
            dst.put(bb);
        return n;
    } finally {
        //釋放臨時的直接緩沖區
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb, long position, NativeDispatcher nd) throws IOException
{
    ...
    if (position != -1) {
        //socket不支持,文件支持
        n = nd.pread(fd, ((DirectBuffer)bb).address() + pos,
                        rem, position);
    } else {
        n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    ...
}

分散讀

前面說到,Channel也支持分散讀取。

調用Channel的read(ByteBuffer[] dsts, int offset, int length)讀取到多個Buffer中。

public long read(ByteBuffer[] dsts, int offset, int length)
    throws IOException
{
    ...
    begin();
    ...
    n = IOUtil.read(fd, dsts, offset, length, nd);
    ...
    end(n > 0 || (n == IOStatus.UNAVAILABLE));
    ...
}

static long read(FileDescriptor fd, ByteBuffer[] bufs, NativeDispatcher nd)
        throws IOException
{
    return read(fd, bufs, 0, bufs.length, nd);
}

static long read(FileDescriptor fd, ByteBuffer[] bufs, int offset, int length,
                    NativeDispatcher nd)
    throws IOException
{
    IOVecWrapper vec = IOVecWrapper.get(length);
    ...
    while (i < count && iov_len < IOV_MAX) {
        ByteBuffer buf = bufs[i];
        ...
        vec.setBuffer(iov_len, buf, pos, rem);

        if (!(buf instanceof DirectBuffer)) {
            ByteBuffer shadow = Util.getTemporaryDirectBuffer(rem);
            vec.setShadow(iov_len, shadow);
        ...
    }
    ...
    long bytesRead = nd.readv(fd, vec.address, iov_len);
    //清理工作
    ...
    return bytesRead;
    //清理工作
    ...
}

分散讀和聚集寫邏輯類似,都需要通過IOVecWrapper進行一個“中轉”。

關閉通道

通過調用channel.close關閉通道,在接口實現里我們看過它實際是AbstractInterruptibleChannel聲明的。

public final void close() throws IOException {
    synchronized(this.closeLock) {
        if (this.open) {
            this.open = false;
            this.implCloseChannel();
        }
    }
}

AbstractSelectableChannel實現了implCloseChannel

protected final void implCloseChannel() throws IOException {
    //關閉當前channel
    implCloseSelectableChannel();
    synchronized (keyLock) {
        int count = (keys == null) ? 0 : keys.length;
        for (int i = 0; i < count; i++) {
            SelectionKey k = keys[i];
            if (k != null)
            //注冊的channel都需要取消
                k.cancel();
        }
    }
}

當channel注冊到selector時會創建SelectionKey保存到keys中。


this.implCloseSelectableChannel();
protected void implCloseSelectableChannel() throws IOException {
    synchronized(this.stateLock) {
        this.isInputOpen = false;
        this.isOutputOpen = false;
        if (this.state != 4) {
            //windows不做處理
            //linux和Solaris需要,關閉前將fd復制到另一個待關閉fd中,以防止被fd回收
            nd.preClose(this.fd);
        }

        if (this.readerThread != 0L) {
            //發送信號給讀線程,將其從阻塞I/O中釋放,避免一直被阻塞。
            NativeThread.signal(this.readerThread);
        }

        if (this.writerThread != 0L) {
            //發送信號給寫線程,將其從阻塞I/O中釋放,避免一直被阻塞。
            NativeThread.signal(this.writerThread);
        }
        //若還有注冊的channel,則不處理,等待key全部注銷后再kill
        //若沒有的話可以直接kill當前channel
        if (!this.isRegistered()) {
            this.kill();
        }
    }
}

kill方法是在具體Channel中實現的,最終調用nd的close方法關閉文件描述符。

//SocketChannel
public void kill() throws IOException {
    synchronized(this.stateLock) {
        if (this.state != 4) {
            if (this.state == -1) {
                this.state = 4;
            } else {
                assert !this.isOpen() && !this.isRegistered();
                //若仍有線程還沒釋放,則等線程I/O執行完后再kill
                if (this.readerThread == 0L && this.writerThread == 0L) {
                    nd.close(this.fd);
                    this.state = 4;
                } else {
                    this.state = 3;
                }

            }
        }
    }
}
//ServerSocketChannel
public void kill() throws IOException {
        synchronized(this.stateLock) {
            if (this.state != 1) {
                if (this.state == -1) {
                    this.state = 1;
                } else {
                    assert !this.isOpen() && !this.isRegistered();

                    nd.close(this.fd);
                    this.state = 1;
                }
            }
        }
    }

ServerSocketChannel的僅有-1(未初始化)、0(使用中)和1(已釋放)三個狀態。

半連接

NIO支持tcp的半連接,由於TCP是全雙工的,即有輸入流和輸出流。在某些時候我們可以中斷其中一個流,而另一個流仍然可以繼續工作。比如作為客戶端我們可以關閉輸出流,但是仍然能繼續接收到服務端發送的數據。

//關閉輸入流
client.socket().shutdownInput();
//關閉輸出流
client.socket().shutdownOutput();

當客戶端關閉了輸出流,實際上會送FIN包到服務端,服務端接收到后響應ACK,若服務端不發送FIN包,關閉服務端的輸出流(客戶端的輸入流)時,則服務端仍然能繼續發送(響應)數據給客戶端,客戶端也仍然可以繼續接收到數據。NIO的Pipe就是通過兩個socket的半連接實現的單項數據傳輸。

總結

本篇對客戶端和服務端的socket,綁定、連接、讀、寫以及關閉等常用操作進行源碼分析,下一篇將繼續對FileChannel源碼進行探究。

相關文獻

  1. 史上最強Java NIO入門:擔心從入門到放棄的,請讀這篇!
  2. Java NIO系列教程
  3. 基於Java的RDMA高性能通信庫(六):SDP - Java Socket Direct Protocol
  4. Java安全管理器SecurityManager
  5. 操作系統內核空間和用戶空間的互訪問
  6. BIO到NIO源碼的一些事兒之NIO中
  7. ThreadLocal
  8. 根據文件描述符fd獲取socket結構體
  9. struct socket結構體詳解
  10. readv()和writev()函數
  11. 分散讀取與聚集寫入
  12. JNI GetFieldID和GetMethodID函數解釋及方法簽名
  13. 通過javap命令分析java匯編指令
  14. 《Java NIO為什么需要DirectByteBuffer作為中間緩沖區》

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12061595.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM