NIO-Channel接口分析




NIO-Channel接口分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析

前言

本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。

本系列文章針對的是JDK1.8.0.161的源碼。

上一篇介紹了Channel的基本使用,下面對Channel的接口進行分析。

接口

SCTP協議

SCTP(Stream Control Transmission Protocol)是一種傳輸協議,在TCP/IP協議棧中所處的位置和TCP、UDP類似,兼有TCP/UDP兩者特征。

對於SCTP協議這里不詳細描述,想了解的同學可以看下這篇文章
SCTP協議平時用的不多,這里不做具體討論。

UDP協議

NIO使用DatagrmChannel實現了UDP協議的網絡通訊。

20191207210432.png

下面我們對各個接口進行分析。

AutoCloseableCloseable分別是自動關閉和主動關閉接口。當資源(如句柄或文件等)需要釋放時,則需要調用close方法釋放資源。

public interface AutoCloseable {
    void close() throws Exception;
}

public interface Closeable extends AutoCloseable {
    void close() throws IOException;
}

Channel是通道接口,針對於I/O相關的操作,需要打開和關閉操作。

public interface Channel extends Closeable {
    boolean isOpen();

    void close() throws IOException;
}

InterruptibleChannel是支持異步關閉和中斷的通道接口。為了支持Thead的interrupt模型,當線程中斷時,可以執行中斷處理對象的回調,從而關閉釋放Channel。

public interface InterruptibleChannel extends Channel {
    void close() throws IOException;
}

關於InterruptibleChannel可中斷I/O詳細解析可以看一下《JDK源碼閱讀-InterruptibleChannel與可中斷IO》

Interruptible是線程中斷接口,即上面提的Thead的interrupt模型。當線程中斷時,則會調用中斷操作。

public abstract interface Interruptible {
    public abstract void interrupt(java.lang.Thread t);
}
public class Thread implements Runnable {
    ...
    public void interrupt() {
        if (this != Thread.currentThread())
            checkAccess();

        synchronized (blockerLock) {
            Interruptible b = blocker;
            if (b != null) {
                interrupt0();           // Just to set the interrupt flag
                b.interrupt(this);
                return;
            }
        }
        interrupt0();
    }
    ...
}

AbstractInterruptibleChannel實現了ChannelInterruptibleChannel接口。

20191208013708.png

  • closeLock是關閉時的鎖
  • open表示channle是否打開
  • interuptorInterruptible中斷回調
  • interrupted為I/O執行時的線程
public abstract class AbstractInterruptibleChannel implements Channel, InterruptibleChannel {
    ...
    public final void close() throws IOException {
        synchronized(this.closeLock) {
            if (this.open) {
                this.open = false;
                this.implCloseChannel();
            }
        }
    }
    //具體的Channel實現關閉
    protected abstract void implCloseChannel() throws IOException;

    protected final void begin() {
        if (this.interruptor == null) {
            this.interruptor = new Interruptible() {
                //線程中斷時,則會調用該接口關閉Channel
                public void interrupt(Thread target) {
                    synchronized(AbstractInterruptibleChannel.this.closeLock) {
                        if (AbstractInterruptibleChannel.this.open) {
                            AbstractInterruptibleChannel.this.open = false;
                            AbstractInterruptibleChannel.this.interrupted = target;

                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) {
                            }

                        }
                    }
                }
            };
        }
        //將線程的blockOn設置為當前interruptor,從而使得線程關閉時能關閉channel
        blockedOn(this.interruptor);
        Thread me = Thread.currentThread();
        if (me.isInterrupted()) {
            this.interruptor.interrupt(me);
        }

    }

    protected final void end(boolean completed)
        throws AsynchronousCloseException
    {
        //I/O結束,清除線程blocker
        blockedOn(null);
        Thread interrupted = this.interrupted;
        if (interrupted != null && interrupted == Thread.currentThread()) {
            interrupted = null;
            throw new ClosedByInterruptException();
        }
        if (!completed && !open)
            throw new AsynchronousCloseException();
    }

    static void blockedOn(Interruptible intr) {
        SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(), intr);
    }
}

AbstractInterruptibleChannel添加了beginend方法。 在I/O操作開始時會調用begin,在I/O操作結束時會調用end。在begin方法內將中斷操作加入到當前線程中。最終會調用到線程的blockOn方法,它會將該中斷接口注入到線程中,使得線程中斷時可以調用到Channel並釋放相關資源。

public void blockedOn(Thread t, Interruptible b) {
    t.blockedOn(b);
}

SelectableChannel接口聲明了Channel是可以被選擇的,在Windows平台通過WindowsSelectorImpl實現,Linux通過EPollSelectorImpl實現。此外還有KQueue等實現,關於Selector具體細節在《NIO-Selector》一文中會介紹。

AbstractSelectableChannel實現了SelectableChannel接口。

NetworkChannel適用於網絡傳輸的接口。

public interface NetworkChannel extends Channel {
    //綁定地址
    NetworkChannel bind(SocketAddress var1) throws IOException;

    //獲取本地地址
    SocketAddress getLocalAddress() throws IOException;

    //設置socket選項
    <T> NetworkChannel setOption(SocketOption<T> var1, T var2) throws IOException;
    //獲取socket選項
    <T> T getOption(SocketOption<T> var1) throws IOException;
    //當前通道支持的socket選項
    Set<SocketOption<?>> supportedOptions();
}

MulticastChannel是支持組播接口。

public interface MulticastChannel extends NetworkChannel {
    void close() throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf) throws IOException;

    MembershipKey join(InetAddress group, NetworkInterface interf, InetAddress source) throws IOException;
}

SelChImpl接口用於將底層的I/O就緒狀態更新為就緒事件。

public interface SelChImpl extends Channel {

    FileDescriptor getFD();
    int getFDVal();
    //更新就緒事件
    public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk);
    //設置就緒事件
    public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk);
    //將底層的輪詢操作轉換為事件
    void translateAndSetInterestOps(int ops, SelectionKeyImpl sk);
    //返回channle支持的操作,比如讀操作、寫操作等
    int validOps();
    void kill() throws IOException;
}

由於UDP支持讀寫數據,因此還實現了ReadableByteChannelWritableByteChannel接口

public interface ReadableByteChannel extends Channel {
    int read(ByteBuffer dst) throws IOException;
}   

public interface WritableByteChannel extends Channel {
    int write(ByteBuffer src) throws IOException;
}

ByteChannel是支持讀寫的通道。

public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

ScatteringByteChannel則支持根據傳入偏移量讀,支持根據傳入偏移量寫GatheringByteChannel

public interface ScatteringByteChannel extends ReadableByteChannel {
    long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
    long read(ByteBuffer[] dsts) throws IOException;}

public interface GatheringByteChannel extends WritableByteChannel {
    long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
    long write(ByteBuffer[] srcs) throws IOException;
}

TCP協議

客戶端

20191209111550.png

TCP協議除了不支持組播,其他和UDP是一樣的,不再重復介紹。

服務端

20191209112800.png

服務端無需數據讀寫,僅需要接收連接,數據讀寫是SocketChannel干的事。因此沒有ReadableByteChannelWriteableByteChannel等讀寫接口

文件

20191209112005.png

文件比網絡協議少了NetworkChannelSelChImplSelectableChannelSelChImplSelectableChannel主要是用於支持選擇器的,由於網絡傳輸大多數連接時空閑的,而且數據何時會到來並不知曉,同時需要支持高並發來連接,因此支持多路復用技術可以顯著的提高性能,而磁盤讀寫則沒有該需求,因此無需選擇器。

SeekableByteChannel可以通過修改position支持從指定位置讀寫數據。

public interface SeekableByteChannel extends ByteChannel {
    int read(ByteBuffer dst) throws IOException;

    int write(ByteBuffer src) throws IOException;
    
    long position() throws IOException;
    //設置偏移量
    SeekableByteChannel position(long newPosition) throws IOException;

    long size() throws IOException;
    //截取指定大小
    SeekableByteChannel truncate(long size) throws IOException;
}

總結

由於文章篇幅比較長,因此還是將接口分析和實現分析分開。本篇文章對Channel的接口進行說明,下一篇將對具體的實現進行分析。

相關文獻

  1. SCTP協議詳解
  2. 史上最強Java NIO入門:擔心從入門到放棄的,請讀這篇!
  3. Java NIO系列教程
  4. 為什么SCTP沒有被大量使用/知道
  5. JDK源碼閱讀-InterruptibleChannel與可中斷IO
  6. 廣播和組播
  7. 關於AccessController.doPrivileged
  8. ServiceLoader源碼分析
  9. 基於Java的RDMA高性能通信庫(六):SDP - Java Socket Direct Protocol

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12040082.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM