NIO-FileChannel源碼分析




NIO-FileChannel源碼分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析

前言

本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。

本系列文章針對的是JDK1.8.0.161的源碼。

上一篇對SocketChannel的源碼進行了分析,本篇繼續對FileChannel的源碼進行解析。

RandomAccessFile

我們可以通過使用RandomAccessFile讀寫數據。也可以通過FileInputStream讀數據或通過FileOutputStream寫數據。但實際這三個類內部實際是一樣的,我們就以RandomAccessFile為例子說明FileChannelImpl的實現。

接口

RandomAccessFile實現了DataInputDataOutput兩個接口,即數據輸入和輸出接口。

public class RandomAccessFile implements DataOutput, DataInput, Closeable {

}

DataInput定義了一些基本的讀取方法

  • 讀取指定長度的字節數據
  • 讀取數據並轉換為基元類型。
  • 讀取一行數據。讀取到\r會丟棄,讀取到\n會丟棄並停止繼續讀取。
  • 用UTF-8編碼讀取一個string

public interface DataInput {
    void readFully(byte b[]) throws IOException;
    void readFully(byte b[], int off, int len) throws IOException;
    int skipBytes(int n) throws IOException;
    XXX readXXX() throws IOException;
    String readLine() throws IOException;
    String readUTF() throws IOException;
}

DataOutput定義了一些基本的寫方法

  • 寫入指定長度字節數據到文件。
  • 將基元類型寫入文件。
  • 使用UTF-8編碼寫入一個string到文件。
public interface DataOutput {
    void write(int b) throws IOException;
    void write(byte b[]) throws IOException;
    void write(byte b[], int off, int len) throws IOException;
    void writeXXX(XXX v) throws IOException;
    void writeUTF(String s) throws IOException;
}

創建實例

在創建RandomAccessFile我們需要傳入兩個參數:第一個是文件路徑,第二個是文件訪問方式。

public RandomAccessFile(String name, String mode)
    throws FileNotFoundException
{
    this(name != null ? new File(name) : null, mode);
}

public RandomAccessFile(File file, String mode)
    throws FileNotFoundException
{
    //File用於檢查文件路徑是否有效
    String name = (file != null ? file.getPath() : null);
    int imode = -1;
    //判斷文件訪問方式
    if (mode.equals("r"))
        imode = O_RDONLY;
    else if (mode.startsWith("rw")) {
        imode = O_RDWR;
        rw = true;
        if (mode.length() > 2) {
            if (mode.equals("rws"))
                imode |= O_SYNC;
            else if (mode.equals("rwd"))
                imode |= O_DSYNC;
            else
                imode = -1;
        }
    }
    if (imode < 0)
        throw new IllegalArgumentException("Illegal mode \"" + mode + "\" must be one of \"r\", \"rw\", \"rws\", or \"rwd\"");
    //檢查讀寫權限
    SecurityManager security = System.getSecurityManager();
    if (security != null) {
        security.checkRead(name);
        if (rw) {
            security.checkWrite(name);
        }
    }
    if (name == null) {
        throw new NullPointerException();
    }
    if (file.isInvalid()) {
        throw new FileNotFoundException("Invalid file path");
    }
    fd = new FileDescriptor();
    fd.attach(this);
    path = name;
    open(name, imode);
}
  • 首先會創建一個File對象,用於檢查文件路徑是否合法。目前僅檢查文件路徑是否含有Nul(/u0000)。
  • 檢查文件操作方式,文件有四種操作方式
模式 說明
r 以只讀方式打開。調用結果對象的任何 write 方法都將導致拋出 IOException。
rw 打開以便讀取和寫入。如果該文件尚不存在,則嘗試創建該文件。
rwd 打開以便讀取和寫入,這點和rw的操作完全一致,但是只會在cache滿或者調用RandomAccessFile.close()的時候才會執行內容同步操作。
rws 在"rwd"的基礎上對內容同步的要求更加嚴苛,每write修改一個byte都會直接修改到磁盤中。
  • 創建SecurityManager檢查讀寫文件權限
  • 創建文件描述符
  • 打開文件

獲取文件通道

通過getChannel可以獲取文件通道,進行文件讀寫。

public final FileChannel getChannel() {
    synchronized (this) {
        if (channel == null) {
            channel = FileChannelImpl.open(fd, path, true, rw, this);
        }
        return channel;
    }
}

通過FileChannelImpl.open創建一個FileChannelImpl實例。

FileChannelImpl

20191219111345.png

創建

在FileDispatcherImpl靜態構造函數中會調用IOUtil.load(),在上一章詳細介紹過。

static {
    IOUtil.load();
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable, boolean writable, boolean append, Object parent)
{
    this.fd = fd;
    this.readable = readable;
    this.writable = writable;
    this.append = append;
    this.parent = parent;
    this.path = path;
    //創建nd用於調用native方法進行讀寫
    this.nd = new FileDispatcherImpl(append);
}

寫文件


public int write(ByteBuffer src) throws IOException {
    ensureOpen();
    if (!writable)
        throw new NonWritableChannelException();
    synchronized (positionLock) {
        int n = 0;
        int ti = -1;
        try {
            begin();
            //將當前線程加入到線程集合中,當Channel關閉時,可以發送信號給線程,避免線程被I/O阻塞住
            ti = threads.add();
            if (!isOpen())
                return 0;
            do {
                //寫數據
                n = IOUtil.write(fd, src, -1, nd);
            } while ((n == IOStatus.INTERRUPTED) && isOpen());
            return IOStatus.normalize(n);
        } finally {
            //I/O完成移除線程
            threads.remove(ti);
            end(n > 0);
            assert IOStatus.check(n);
        }
    }
}
  • 首先校驗一些必要的狀態,如文件是否打開,是否可寫等。
  • 調用begin開始I/O操作
  • 將當前線程加入到線程集合中。
  • 調用IOUtil.write將buffer數據寫入到文件中
  • I/O完成調用end收尾工作
  • 將線程移除線程集合中

關於begin和end操作可以看《NIO-Channel接口分析

threads是一個NativeThreadSet類型,它用於存放native線程的唯一token。


class NativeThreadSet {
    private long[] elts;
    ...
    int add() {
        long th = NativeThread.current();
        ...
        //數組不夠長會先擴容
        ...
        for (int i = start; i < elts.length; i++) {
            //未使用過,則設置當前的線程token值
            if (elts[i] == 0) {
                elts[i] = th;
                used++;
                return i;
            }
        }
        ...
    }

    void remove(int i) {
        synchronized (this) {
            //清空
            elts[i] = 0;
            used--;
            //當調用了signalAndWait等待時會設置為true。此時會激活每個線程,並清理,每個線程都會被移除。當全部移除后激活調用signalAndWait的線程
            if (used == 0 && waitingToEmpty)
            //通知
                notifyAll();
        }
    }
    //通知並等待
    synchronized void signalAndWait() {
        boolean interrupted = false;
        while (used > 0) {
            int u = used;
            int n = elts.length;
            for (int i = 0; i < n; i++) {
                long th = elts[i];
                ...
                //激活線程
                NativeThread.signal(th);
                ...
            }
            //是否等待所有線程被移除
            waitingToEmpty = true;
            try {
                //等待線程被清理,所有線程被移除時會激活。最多等待50ms,防止線程被阻塞。
                wait(50);
            } catch (InterruptedException e) {
                interrupted = true;
            } finally {
                waitingToEmpty = false;
            }
        }
        //線程中斷則調用中斷處理操作
        if (interrupted)
            Thread.currentThread().interrupt();
    }
}

native線程被定義為NativeThread類型的對象,主要由於在linux等操作系統當線程被I/O操作阻塞時,channel釋放並不會激活該線程,因此需要通過一種通知的機制,在channel關閉時對線程進行通知,以便激活線程。

threads.add();會獲取當前native的線程token,並加入待NativeThreadSet的token數組中(若數組長度不夠,則會進行擴容。)

上一章詳細將結果IOUtil.write這里就不重復說明了

讀文件

和寫文件步驟類似,調用IOUtil.read讀取數據


public int read(ByteBuffer dst) throws IOException {
    ...
    n = IOUtil.read(fd, dst, -1, nd);
    ...
}

修改起始位置


public FileChannel position(long newPosition) throws IOException {
    ...
    p  = position0(fd, newPosition);
    ...
}

獲取文件長度

public long size() throws IOException {
    ...
    s = nd.size(fd);
    ...
}

截取長度

截取文件的長度,超過的文件內容會被刪除。

public FileChannel truncate(long newSize) throws IOException {
    
    ...
    int rv = -1;
    long p = -1;
    int ti = -1;
    ...
    // 獲取當前長度
    long size  = nd.size(fd);
    ...
    // 獲取當前位置
    p = position0(fd, -1);
    ...
    // 若當前長度大於截取的長度,則截取
    if (newSize < size) {
        ...
        rv = nd.truncate(fd, newSize);
        ...
    }
    //若當前位置大於截取的長度則修改當前位置
    if (p > newSize)
        p = newSize;
    ...
    rv = (int)position0(fd, p);
    ...
}

寫入磁盤

寫文件若沒有采用直接緩沖區,則會先寫入到頁緩沖區中,通過force可以將尚未寫入磁盤的數據強制寫道磁盤上。

public void force(boolean metaData) throws IOException {
    ...
    rv = nd.force(fd, metaData);
    ...
}

通道之間數據傳輸

若需要將一個通道的數據寫入到另一個通道,則可以使用transferTotransferFrom

transferTo

若當前通道是FileChannel,則可以將當前通道數據通過transferTo寫入到其他通道

public long transferTo(long position, long count, WritableByteChannel target) throws IOException
{
    ...
    //當前文件大小
    long sz = size();
    if (position > sz)
        return 0;
    int icount = (int)Math.min(count, Integer.MAX_VALUE);
    //可傳大小修正
    if ((sz - position) < icount)
        icount = (int)(sz - position);
    long n;
    // 若內核支持則使用直接傳輸
    if ((n = transferToDirectly(position, icount, target)) >= 0)
        return n;
    // 嘗試內存映射文件傳輸
    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
        return n;
    // 慢速傳輸
    return transferToArbitraryChannel(position, icount, target);
}

通常情況下我們要將一個通道的數據傳到另一個通道。舉個例子,從一個文件讀取數據通過socket通道進行發送。比如通過http協議讀取服務器上的一個靜態文件。

  • 文件從硬盤讀取(拷貝)頁緩沖區
  • 從頁緩沖區讀取(拷貝)數據到用戶緩沖區
  • 用戶緩沖區的數據寫入(拷貝)到socket內核緩沖區,最終再將socket內核緩沖區的數據寫入(拷貝)到網卡中。
    可以看到這中間發生了四次內存拷貝。

當我們通過transferTo在通道之間數據傳輸時,若內核支持,則會使用零拷貝的方式傳輸數據。

通過零拷貝技術可以避免將數據拷貝到用戶空間中。

直接傳輸

若底層硬件支持的話可以將讀取到的內核緩沖區的文件描述符加到socket緩沖區中,就可以省去了內核中將數據拷貝到socket緩沖區這一個內存拷貝動作。

使用直接傳輸時,只能從文件通道傳輸到網絡通道。

private long transferToDirectly(long position, int icount, WritableByteChannel target) throws IOException
{
    if (!transferSupported)
        return IOStatus.UNSUPPORTED;
    //做一些校驗,當前和目標通道是否都支持直接傳輸
    ...
    targetFD = ((SelChImpl)target).getFD();
    ...
    int thisFDVal = IOUtil.fdVal(fd);
    int targetFDVal = IOUtil.fdVal(targetFD);
    //調用native方法直接傳輸,若不支持會返回不支持的錯誤碼
    n = transferTo0(thisFDVal, position, icount, targetFDVal);
    ...
}

windows不支持transferTo0

以linux為例,linux會調用sendfile64在兩個文件描述符之間傳遞數據。

Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jint srcFD,
                                            jlong position, jlong count,
                                            jint dstFD)
{
#if defined(__linux__)
    off64_t offset = (off64_t)position;
    jlong n = sendfile64(dstFD, srcFD, &offset, (size_t)count);
    ...
    return n;
#elif defined (__solaris__)
...
#elif defined(__APPLE__)
...
#elif defined(_AIX)
...
#else
    return IOS_UNSUPPORTED_CASE;
#endif
}

sendfile64只支持將文件傳輸到socket

內存映射文件

若內核不支持上述方式則會嘗試使用mmap(內存映射文件)的方式傳輸。

應用程序調用mmap(),磁盤上的數據會通過DMA被拷貝的頁緩沖區,接着操作系統會把這段頁緩沖區與應用程序共享,這樣就不需要把頁緩沖區的內容復制到用戶空間了。應用程序再調用write(),操作系統直接將頁緩沖區的內容拷貝到socket緩沖區中,這一切都發生在內核空間,最后,socket緩沖區再把數據發到網卡去。

private long transferToTrustedChannel(long position, long count, WritableByteChannel target) throws IOException
{
    ...
    //內存映射文件
    MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
    //有個bug,若在內存映射文件寫入到目標通道時,關閉了channel,並不能中斷此次寫操作。
    int n = target.write(dbb);
    ...
    unmap(dbb);
    ...
}

MapMode有三種方式,只讀(READ_ONLY)、可讀寫(READ_WRITE)、寫時復制(PRIVATE)。

當多個進程的虛擬內存映射到同一塊物理內存時,若不采用寫時復制,則由於共用一塊物理內存,會相互影響。當使用了寫時復制的技術后,一旦一個進程要修改頁面時,就會復制一個副本,因此不會影響其他進程。

map(MapMode.READ_ONLY, position, size);
public MappedByteBuffer map(MapMode mode, long position, long size) throws IOException
{
    //一些基本校驗 
    ...
    //獲取文件大小
    long filesize = nd.size(fd);
    ...
    //根據文件大小設置文件描述符的結束未知
    rv = nd.truncate(fd, position + size);
    ...
    int pagePosition = (int)(position % allocationGranularity);
    long mapPosition = position - pagePosition;
    long mapSize = size + pagePosition;
    //調用native進行映射,若此時發生內存溢出,則強制回收一次GC,並重新嘗試映射,若還是發生內存溢出則拋出異常
    ...
    addr = map0(imode, mapPosition, mapSize);
    ...
    //創建一個內存映射的文件描述符,指向當前的native文件描述符
    FileDescriptor mfd = nd.duplicateForMapping(fd);

    int isize = (int)size;
    //Unmapper是卸載內存映射文件用的
    Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
    if ((!writable) || (imode == MAP_RO)) {
        return Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
    } else {
        //創建一個MappedByteBuffer
        return Util.newMappedByteBuffer(isize, addr + pagePosition, mfd, um);
    }
    ...
}

Unmapper適用於卸載內存映射文件用的。它實現了Runnable接口,以便於線程可以執行移除內存映射以及一些清理工作。

private static class Unmapper implements Runnable
{
    ...
    public void run() {
        if (address == 0)
            return;
        //移除內存映射
        unmap0(address, size);
        address = 0;
        ...
        // 關閉文件描述符
        nd.close(fd);
        ...
    }

那么什么時候會進行清理呢,我們可以看到實際通過Util.newMappedByteBuffer創建了一個MapperByteBuffer,並將Unmapper對象進行傳遞。

接下來看如何創建MapperByteBuffer

static MappedByteBuffer newMappedByteBuffer(int size, long addr, FileDescriptor fd, Runnable unmapper)
{
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {
        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
            new Object[] { new Integer(size), new Long(addr), fd, unmapper });
    } catch (InstantiationException |
                IllegalAccessException |
                InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

首先通過directByteBufferConstructor創建一個MapperByteBuffer,從命名可以看出來這是一個DirectByteBuffer構造器。

通過反射獲取了DirectByteBuffer的構造函數。


private static void initDBBConstructor() {
    ...
    Class<?> cl = Class.forName("java.nio.DirectByteBuffer");
    Constructor<?> ctor = cl.getDeclaredConstructor(
        new Class<?>[] { int.class, long.class, FileDescriptor.class, Runnable.class });
    ctor.setAccessible(true);
    directByteBufferConstructor = ctor;
    ...
}

構造函數傳遞的第五個參數為Unmapper對象,它被傳遞到了Cleaner中,由此可知,當MapperByteBuffer被釋放時,Cleaner可以保證內存映射被卸載。

protected DirectByteBuffer(int cap, long addr, FileDescriptor fd, Runnable unmapper)
{
    super(-1, 0, cap, cap, fd);
    address = addr;
    cleaner = Cleaner.create(this, unmapper);
    att = null;
}

當內存映射完成時,就可以通過write進行數據傳輸,傳輸完成通過ummap卸載內存映射。

private static void unmap(MappedByteBuffer bb) {
    Cleaner cl = ((DirectBuffer)bb).cleaner();
    if (cl != null)
        cl.clean();
}
常規傳輸

常規傳輸需要多次內存拷貝以及在用戶模式和內核模式切換。

private long transferToArbitraryChannel(long position, int icount, WritableByteChannel target) throws IOException
{
    ...
    //獲取臨時直接緩沖區
    ByteBuffer bb = Util.getTemporaryDirectBuffer(c);
    ...
    //讀到bb中
    int nr = read(bb, pos);
    ...
    //轉換為讀模式
    bb.flip();
    //寫入到目標通道
    int nw = target.write(bb);
    ...
    //釋放臨時直接緩沖區
    Util.releaseTemporaryDirectBuffer(bb);
}

transferFrom

若要將其他通道的數據傳輸到文件通道中,可以通過transferFrom傳輸。

若原通道是文件,則可以通過內存映射文件的方式提高性能。否則使用常規傳輸方式,需要將數據拷貝到用戶空間。

public long transferFrom(ReadableByteChannel src, long position, long count) throws IOException
{
    ...
    if (src instanceof FileChannelImpl)
        return transferFromFileChannel((FileChannelImpl)src, position, count);

    return transferFromArbitraryChannel(src, position, count);
}
內存映射文件
private long transferFromFileChannel(FileChannelImpl src, long position, long count) throws IOException
{
    ...
    MappedByteBuffer bb = src.map(MapMode.READ_ONLY, p, size);
    //寫入到文件
    long n = write(bb, position);
    ...
    //釋放內存映射
    unmap(bb);
    ...
}
常規傳輸
private long transferFromArbitraryChannel(ReadableByteChannel src, long position, long count) throws IOException
{
    int c = (int)Math.min(count, TRANSFER_SIZE);
    //獲取臨時直接緩沖區
    ByteBuffer bb = Util.getTemporaryDirectBuffer(c);
    ...
    //將src寫入到臨時直接緩存
    int nr = src.read(bb);
    ...
    //轉換為讀模式
    bb.flip();
    //寫入到文件
    int nw = write(bb, pos);
    ...
    //釋放臨時直接緩沖區
    Util.releaseTemporaryDirectBuffer(bb);
}

文件鎖

在NIO中引入了FileLock實現文件鎖,可以實現文件進程鎖。它支持獨占鎖和共享鎖。

使用獨占鎖時,只允許一個線程獨占文件,其他線程必須等待獨占的線程釋放文件鎖后才可以占用。使用共享鎖時只支持讀模式共享文件占用。關於文件鎖的使用可以看下《JAVA 文件鎖 FileLock》

public FileLock lock(long position, long size, boolean shared) throws IOException
{
    ...
    //寫模式不能共享鎖
    if (shared && !readable)
        throw new NonReadableChannelException();
    //讀模式不能獨占鎖
    if (!shared && !writable)
        throw new NonWritableChannelException();
    //創建一個文件鎖實例
    FileLockImpl fli = new FileLockImpl(this, position, size, shared);
    //獲取文件鎖表
    FileLockTable flt = fileLockTable();
    flt.add(fli);
    boolean completed = false;
    int ti = -1;
    try {
        ...
        //調用native方法加鎖
        n = nd.lock(fd, true, position, size, shared);
        if (isOpen()) {
            //部分操作系統不支持共享鎖,若獲取到的是獨占鎖,則更新當前FileLockImpl為獨占鎖
            if (n == FileDispatcher.RET_EX_LOCK) {
                //若獲取到鎖,則重新獲取一個非共享鎖實例
                FileLockImpl fli2 = new FileLockImpl(this, position, size, false);
                flt.replace(fli, fli2);
                fli = fli2;
            }
            completed = true;
        }
    } finally {
        if (!completed)
            //加鎖失敗,移除鎖
            flt.remove(fli);
        threads.remove(ti);
        ...
    }
    return fli;
}

關閉

關閉文件通道時需要釋放所有鎖和文件流

protected void implCloseChannel() throws IOException {
    // 釋放文件鎖
    if (fileLockTable != null) {
        for (FileLock fl: fileLockTable.removeAll()) {
            synchronized (fl) {
                if (fl.isValid()) {
                    //釋放鎖
                    nd.release(fd, fl.position(), fl.size());
                    ((FileLockImpl)fl).invalidate();
                }
            }
        }
    }
    // 通知當前通道所有被阻塞線程
    threads.signalAndWait();
    if (parent != null) {
        ((java.io.Closeable)parent).close();
    } else {
        nd.close(fd);
    }
}

在創建channel的時候會將RandomAccessFileFileInputStreamFileOutputStream等對象設置為channel的parent。從而使得channel關閉的時候可以釋放parent資源。

((java.io.Closeable)parent).close();
public void close() throws IOException {
    synchronized (closeLock) {
        if (closed) {
            return;
        }
        closed = true;
    }
    if (channel != null) {
        channel.close();
    }
    //關閉文件描述符
    fd.closeAll(new Closeable() {
        public void close() throws IOException {
            close0();
        }
    });
}

關閉FileDescriptor時會關閉RandomAccessFileFileInputStreamFileOutputStream等資源。在創建RandomAccessFile時會通過FileDescriptor.attach將RandomAccessFile添加到FileDescriptor的otherParents中

synchronized void closeAll(Closeable releaser) throws IOException {
    if (!closed) {
        closed = true;
        IOException ioe = null;
        //在try執行完后調用releaser的close方法
        try (Closeable c = releaser) {
            //在創建RandomAccessFile時會把RandomAccessFile對象添加到otherParents中
            if (otherParents != null) {
                for (Closeable referent : otherParents) { 
                    ...
                    referent.close();
                    ...
            }
        } 
        ...
    }
}

總結

本篇對文件通道常用的操作源碼進行解析,對linux下的零拷貝進行簡要說明。

相關文獻

  1. 史上最強Java NIO入門:擔心從入門到放棄的,請讀這篇!
  2. Java NIO系列教程
  3. 淺析Linux中的零拷貝技術
  4. Linux系統編程——內存映射與寫時復制
  5. 支撐百萬並發的“零拷貝”技術,你了解嗎?
  6. java NIO之MappedByteBuffer
  7. JAVA 文件鎖 FileLock

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12078767.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM