【NIO】Java NIO之通道


一、前言

  前面學習了緩沖區的相關知識點,接下來學習通道。

二、通道

  2.1 層次結構圖

  對於通道的類層次結構如下圖所示。

  

  其中,Channel是所有類的父類,其定義了通道的基本操作。從 Channel 接口引申出的其他接口都是面向字節的子接口,包括 WritableByteChannel和ReadableByteChannel。這也意味着通道只能在字節緩沖區上操作

  2.2 通道基礎

  Channel接口類只定義了兩個方法(isOpen和close),分別表示通道是否打開和關閉通道,具體細節需要子類實現。   

  IO操作可分為File IO和Stream IO,對應通道也有它們是文件( file)通道和套接字( socket)通道 。通道可以有多種方式創建。Socket 通道有可以直接創建新 socket 通道的工廠方法。但File通道不能直接創建,只能通過在一個打開的RandomAccessFile、FileInputStream或FileOutputStream的對象上調用getChannel( )方法來獲取。

  通道將數據傳輸給 ByteBuffer 對象或者從 ByteBuffer 對象獲取數據進行傳輸,通道可以是單向( unidirectional)或者雙向的( bidirectional)。一個 channel 類可能實現定義read( )方法的 ReadableByteChannel 接口,而另一個 channel 類也許實現 WritableByteChannel 接口以提供 write( )方法。實現這兩種接口其中之一的類都是單向的,只能在一個方向上傳輸數據。如果一個類同時實現這兩個接口,那么它是雙向的,可以雙向傳輸數據。如ByteChannel 接口,該接口繼承 ReadableByteChannel 和WritableByteChannel 兩個接口,可雙向傳輸數據。

  值得注意的是,FileInputStream 對象的getChannel( )方法獲取的 FileChannel 對象是只讀的,不過從接口聲明的角度來看卻是雙向的,因為FileChannel 實現 ByteChannel 接口。在這樣一個通道上調用 write( )方法將拋出未經檢查的NonWritableChannelException 異常,因為 FileInputStream 對象總是以 read-only 的權限打開文件。

  通道會連接一個特定 I/O 服務且通道實例( channel instance)的性能受它所連接的 I/O 服務的特征限制。如一個連接到只讀文件的 Channel 實例不能進行寫操作,即使該實例所屬的類可能有 write( )方法。

  通道可以以阻塞( blocking)或非阻塞( nonblocking)模式運行,非阻塞模式的通道永遠不會讓調用的線程休眠。請求的操作要么立即完成,要么返回一個結果表明未進行任何操作。只有面向流的( stream-oriented)的通道,如 sockets 和 pipes 才能使用非阻塞模式。

  通道不能被重復使用,一個打開的通道即代表與一個特定 I/O 服務的特定連接並封裝該連接的狀態。當通道關閉時,連接會丟失,通道將不再連接任何東西

  2.3 Scatter/Gather

  Scatter/Gather是指在多個緩沖區上實現一個簡單的 I/O 操作。

  對於write操作而言,數據是從幾個緩沖區按順序抽取(稱為 gather)並沿着通道發送的,該 gather 過程的效果好比將全部緩沖區的內容連結起來,並在發送數據前存放到一個大的緩沖區中。

  對於read操作而言,從通道讀取的數據會按順序被散布(稱為 scatter)到多個緩沖區,將每個緩沖區填滿直至通道中的數據或者緩沖區的最大空間被消耗完。

  如下代碼片段,假定 channel 連接到一個有 48 字節數據等待讀取的 socket

ByteBuffer header = ByteBuffer.allocateDirect (10);
ByteBuffer body = ByteBuffer.allocateDirect (80);
ByteBuffer [] buffers = { header, body };
int bytesRead = channel.read (buffers);

  此時,bytesRead為48,header 緩沖區將包含前 10 個從通道讀取的字節而 body 緩沖區則包含接下來的 38 個字節,緊接着,下面代碼片段  

body.clear( );
body.put("FOO".getBytes()).flip( ); // "FOO" as bytes
header.clear( );
header.putShort (TYPE_FILE).putLong (body.limit()).flip( );
long bytesWritten = channel.write (buffers);

  則將不同buffer(header、body)中的數據gather起來寫入通道,總共發送13個字節(3 + 2 + 8)。

  Scatter和Gather將讀取到的數據分開存放到多個存儲桶( bucket)或者將不同的數據區塊合並成一個整體

  2.4 文件通道

  FileChannel 類可以實現常用的 read, write 以及 scatter/gather 操作,同時它也提供了很多專用於文件的新方法。並且文件通道總是阻塞式的,因此不能被置於非阻塞模式。對於文件 I/O,最強大之處在於異步 I/O( asynchronous I/O),它允許一個進程可以從操作系統請求一個或多個 I/O 操作而不必等待這些操作的完成,發起請求的進程之后會收到它請求的 I/O 操作已完成的通知。 

  FileChannel不能直接創建,需要使用getChannel方法獲取,並且其是線程安全( thread-safe)的。多個進程可以在同一個實例上並發調用方法而不會引起任何問題,不過並非所有的操作都是多線程的,如影響通道位置或者影響文件大小的操作都是單線程的,如果一個線程在執行會影響通道位置或文件大小時,那么其他嘗試進行此類操作的線程必須等待。

  每個 FileChannel 對象與文件描述符是一對一關系,同底層的文件描述符一樣,每個 FileChannel 都有一個叫"file position"的概念,這個position值決定文件中哪一處的數據接下來將被讀或者寫,這與緩沖非常相似。

  FileChannel 位置( position)是從底層的文件描述符獲得的,該 position 同時被作為通道引用獲取來源的文件對象共享。這也就意味着一個對象對該 position 的更新可以被另一個對象看到,如下代碼片段  

RandomAccessFile randomAccessFile = new RandomAccessFile ("filename", "r");
// Set the file position
randomAccessFile.seek (1000);
// Create a channel from the file
FileChannel fileChannel = randomAccessFile.getChannel( );
// This will print "1000"
System.out.println ("file pos: " + fileChannel.position( ));
// Change the position using the RandomAccessFile object
randomAccessFile.seek (500);
// This will print "500"
System.out.println ("file pos: " + fileChannel.position( ));
// Change the position using the FileChannel object
fileChannel.position (200);
// This will print "200"
System.out.println ("file pos: " + randomAccessFile.getFilePointer( ));

  可以看到,隨着randomAccessFile設置不同的position,fileChannel的position也會相應的跟着改變。

  類似於緩沖區的 get( ) 和 put( )方法,當字節被 read( )或 write( )方法傳輸時,文件 position 會自動更新。如果 position 值達到了文件大小的值(文件大小的值可以通過 size( )方法返回), read( )方法會返回一個文件尾條件值( -1)。可是,不同於緩沖區的是,如果實現 write( )方法時 position前進到超過文件大小的值,該文件會擴展以容納新寫入的字節

  同樣類似於緩沖區,也有帶 position 參數的絕對形式的 read( )和 write( )方法。這種絕對形式的方法在返回值時不會改變當前的文件 position。由於通道的狀態無需更新,因此絕對的讀和寫可能會更加有效率,操作請求可以直接傳到本地代碼。更妙的是,多個線程可以並發訪問同一個文件而不會相互產生干擾。這是因為每次調用都是原子性的( atomic),並不依靠調用之間系統所記住的狀態。

  對於FileChannel實現的文件鎖定模型而言,鎖的對象是文件而不是通道或線程,這意味着文件鎖不適用於判優同一台 Java 虛擬機上的多個線程發起的訪問。如果一個線程在某個文件上獲得了一個獨占鎖,然后第二個線程利用一個單獨打開的通道來請求該文件的獨占鎖,那么第二個線程的請求會拋出OverlappingFileLockException異常。但如果這兩個線程運行在不同的 Java 虛擬機上,那么第二個線程會阻塞,因為鎖最終是由操作系統或文件系統來判優的並且幾乎總是在進程級而非線程級(同一JVM上的線程)上判優。鎖都是與一個文件關聯的,而不是與單個的文件句柄或通道關聯。如下示例展示了同一JVM上的兩個線程使用同一文件鎖。  

import java.io.FileOutputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;

/**
 * Created by LEESF on 2017/4/16.
 */
public class FileLockDemo {
    public static void main(String[] args) throws Exception {
        FileOutputStream fileOutputStream = new FileOutputStream("F://test.txt");
        FileChannel fileChannel = fileOutputStream.getChannel();

        Thread thread1 = new Thread(new MyRunnalbe(fileChannel));
        Thread thread2 = new Thread(new MyRunnalbe(fileChannel));

        thread1.start();
        thread2.start();
    }

    static class MyRunnalbe implements Runnable {
        private FileChannel fileChannel;

        public MyRunnalbe(FileChannel fileChannel) {
            this.fileChannel = fileChannel;
        }

        @Override
        public void run() {
            try {
                FileLock fileLock= fileChannel.lock();
                System.out.println(fileLock.isValid());
                Thread.sleep(1000);
            } catch (Exception ex) {
                System.out.println(Thread.currentThread().getName() + " " + ex);
            }
        }
    }
}

  輸出結果  

true
Thread-1 java.nio.channels.OverlappingFileLockException

  可以看到,當thread2獲取鎖后,thread1再獲取鎖時,發出現異常。

  鎖與文件關聯,而不是與通道關聯。我們使用鎖來判優外部進程,而不是判優同一個 Java 虛擬機上的線程

  2.5 內存映射文件

  新的 FileChannel 類提供了一個名為 map( )的方法,該方法可以在一個打開的文件和一個特殊類型的 ByteBuffer 之間建立一個虛擬內存映射,由 map( )方法返回的 MappedByteBuffer 對象的行為類似與基於內存的緩沖區,只不過該對象的數據元素存儲在磁盤上的文件中。通過內存映射機制來訪問一個文件會比使用常規方法讀寫高效得多,甚至比使用通道的效率都高。

  當需要映射整個文件時,可使用如下代碼片段 

buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

  與文件鎖的范圍機制不一樣,映射文件的范圍不應超過文件的實際大小。如果您請求一個超出文件大小的映射,文件會被增大以匹配映射的大小。

  同常規的文件句柄類似,文件映射可以是可寫的或只讀的。前兩種映射模式MapMode.READ_ONLY 和 MapMode.READ_WRITE 意義是很明顯的,它們表示你希望獲取的映射只讀還是允許修改映射的文件。請求的映射模式將受被調用 map( )方法的 FileChannel 對象的訪問權限所限制,如果通道是以只讀的權限打開的而您卻請求 MapMode.READ_WRITE 模式,那么map( )方法會拋出一個 NonWritableChannelException 異常;如果您在一個沒有讀權限的通道上請求MapMode.READ_ONLY 映射模式,那么將產生 NonReadableChannelException 異常。不過在以read/write 權限打開的通道上請求一個 MapMode.READ_ONLY 映射卻是允許的。而MapMode.PRIVATE 模式表示一個寫時拷貝( copy-on-write)的映射,這意味着通過 put( )方法所做的任何修改都會導致產生一個私有的數據拷貝並且該拷貝中的數據只有MappedByteBuffer 實例可以看到。該過程不會對底層文件做任何修改,而且一旦緩沖區被施以垃圾收集動作( garbage collected),那些修改都會丟失。

  FileChannel的transferTo( )和 transferFrom( )方法允許將一個通道交叉連接到另一個通道,而不需要通過一個中間緩沖區來傳遞數據。

  2.6 Socket通道

  Socket 通道有與文件通道不同的特征, 一個或幾個線程就可以管理成百上千的活動 socket 連接了並且只有很少甚至可能沒有性能損失。

  DatagramChannel 和 SocketChannel 實現定義讀和寫功能的接口而 ServerSocketChannel不實現。 ServerSocketChannel 負責監聽傳入的連接和創建新的 SocketChannel 對象,它本身從不傳輸數據。socket 通道類( DatagramChannel、SocketChannel和ServerSocketChannel)在被實例化時都會創建一個對等socket對象。

  所有通道可以在非阻塞情況下運行,這依托於SelectableChannel,使用其configureBlocking方法即可配置是否阻塞。

  ServerSocketChannel 是一個基於通道的 socket 監聽器,能夠在非阻塞模式下運行。

  當需要對一個端口進行監聽時,需要獲取通道對應的 socket,然后使用socket綁定到指定端口進行監聽,常用代碼如下 

ServerSocketChannel ssc = ServerSocketChannel.open( );
ServerSocket serverSocket = ssc.socket( );
// Listen on port 1234
serverSocket.bind (new InetSocketAddress (1234));

  在完成綁定后,可以使用ServerSocketChannel或者ServerSocket的accept方法來接受到達通道的連接,當使用ServerSocketChannel的accept時,會返回 SocketChannel 類型的對象,返回的對象能夠在非阻塞模式下運行。 而當使用ServerSocket時的accept時,總是阻塞並返回一個 java.net.Socket 對象。因此較優的做法是使用ServerSocketChannel的accept方法,下面是監聽1234端口的示例。 

import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.net.InetSocketAddress;

/**
 * Created by LEESF on 2017/4/16.
 */

public class ChannelAccept {
    public static final String GREETING = "Hello I must be going.\r\n";

    public static void main (String [] argv) throws Exception {
        int port = 1234; // default
        if (argv.length > 0) {
            port = Integer.parseInt (argv [0]);
        }
        ByteBuffer buffer = ByteBuffer.wrap (GREETING.getBytes( ));
        ServerSocketChannel ssc = ServerSocketChannel.open( );
        ssc.socket( ).bind (new InetSocketAddress (port));
        ssc.configureBlocking (false);
        while (true) {
            System.out.println ("Waiting for connections");
            SocketChannel sc = ssc.accept( );
            if (sc == null) {
                // no connections, snooze a while
                Thread.sleep (2000);
            } else {
                System.out.println ("Incoming connection from: "
                        + sc.socket().getRemoteSocketAddress( ));
                buffer.rewind( );
                sc.write (buffer);
                sc.close( );
            }
        }
    }
}

   SocketChannel是使用最多的 socket 通道類,Socket 和 SocketChannel 類封裝點對點、有序的網絡連接,類似於 TCP/IP網絡連接,SocketChannel 扮演客戶端,會發起和一個監聽服務器的連接。每個 SocketChannel 對象創建時都會同一個對等的 java.net.Socket 對象對應。

  下面代碼片段會連接指定主機的指定端口。  

SocketChannel socketChannel = SocketChannel.open( );
socketChannel.connect (new InetSocketAddress ("somehost", somePort));

  如果通過Socket的connect方法進行連接,那么線程在連接建立好或超時過期之前都將保持阻塞;而若通過SocketServer的connect方法進行連接,那么會對發起對請求地址的連接並且立即返回值。如果返回值是 true,說明連接立即建立了(這可能是本地環回連接);如果連接不能立即建立, connect( )方法會返回 false 且並發地繼續連接建立過程。

  Socket 通道是線程安全的。並發訪問時無需特別措施來保護發起訪問的多個線程,不過任何時候都只有一個讀操作和一個寫操作在進行中。

  如下示例完成了客戶端向服務端之間的數據傳送,使用ServerSocketChannel充當服務端,SocketChannel充當客戶端,客戶端向服務端發送數據,服務端接收后響應客戶端,然后服務端關閉,客戶端接收到響應數據后,關閉,代碼如下

  服務端  

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

/**
 * Created by LEESF on 2017/4/16.
 */
public class ServerSocketChannelDemo {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocketChannel.configureBlocking(false);
        serverSocket.bind(new InetSocketAddress("localhost", 1234));

        while (true) {
            SocketChannel socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                socketChannel.read(byteBuffer);
                byteBuffer.flip();
                System.out.println("server received message: " + getString(byteBuffer));
                byteBuffer.clear();
                String message = "server sending message " + System.currentTimeMillis();
                System.out.println("server sends message: " + message);
                byteBuffer.put(message.getBytes());
                byteBuffer.flip();
                socketChannel.write(byteBuffer);
                break;
            }
        }
        try {
            serverSocketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static String getString(ByteBuffer buffer) {
        Charset charset;
        CharsetDecoder decoder;
        CharBuffer charBuffer;
        try {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return "";
        }
    }
}

  客戶端  

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

/**
 * Created by LEESF on 2017/4/16.
 */
public class SocketChannelDemo {
    public static void main(String[] args) throws Exception {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("localhost", 1234));
        String message = "client sending message " + System.currentTimeMillis();
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        byteBuffer.clear();
        System.out.println("client sends message: " + message);
        byteBuffer.put(message.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);

        while (true) {
            byteBuffer.clear();
            int readBytes = socketChannel.read(byteBuffer);
            if (readBytes > 0) {
                byteBuffer.flip();
                System.out.println("client receive message: " + getString(byteBuffer));

                break;
            }
        }

        try {
            socketChannel.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static String getString(ByteBuffer buffer) {
        Charset charset;
        CharsetDecoder decoder;
        CharBuffer charBuffer;
        try {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return "";
        }
    }
}

  服務端結果如下 

server received message: client sending message 1492334785236
server sends message: server sending message 1492334785276

  客戶端結果如下  

client sends message: client sending message 1492334785236
client receive message: server sending message 1492334785276

  每個DatagramChannel 對象關聯一個DatagramSocket 對象,DatagramChannel 是模擬無連接協議(如 UDP/IP)。DatagramChannel 對象既可以充當服務器(監聽者)也可以充當客戶端(發送者)。如果新創建的通道負責監聽,那么通道必須首先被綁定到一個端口或地址/端口組合上。  

DatagramChannel channel = DatagramChannel.open( );
DatagramSocket socket = channel.socket( );
socket.bind (new InetSocketAddress (portNumber))

  2.7 管道

  管道就是一個用來在兩個實體之間單向傳輸數據的導管,Pipe 類實現一個管道范例,不過它所創建的管道是進程內(在 Java 虛擬機進程內部)而非進程間使用的。Pipe 類創建一對提供環回機制的 Channel 對象。這兩個通道的遠端是連接起來的,以便任何寫在 SinkChannel 對象上的數據都能出現在 SourceChannel 對象上。管道可以被用來僅在同一個 Java 虛擬機內部傳輸數據。

  當向管道中寫入數據時,需要訪問Pipe的sink通道,當從管道中讀取數據時,需要訪問Pipe的source通道,下面示例展示了通道的讀寫操作    

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.Pipe;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

/**
 * Created by LEESF on 2017/4/16.
 */

public class PipeDemo {
    public static void main (String [] argv) throws Exception {
        Pipe pipe = Pipe.open();
        Pipe.SinkChannel sinkChannel = pipe.sink();
        String newData = "New String to write to file..." + System.currentTimeMillis();
        ByteBuffer buf = ByteBuffer.allocate(48);
        buf.clear();
        System.out.println("writing data: " + newData);
        buf.put(newData.getBytes());
        buf.flip();
        while (buf.hasRemaining()) {
            sinkChannel.write(buf);
        }

        Pipe.SourceChannel sourceChannel = pipe.source();
        ByteBuffer byteBuffer = ByteBuffer.allocate(48);
        sourceChannel.read(byteBuffer);
        byteBuffer.flip();
        String strs = getString(byteBuffer);
        System.out.println("reading data: " + strs);
    }

    public static String getString(ByteBuffer buffer) {
        Charset charset;
        CharsetDecoder decoder;
        CharBuffer charBuffer;
        try {
            charset = Charset.forName("UTF-8");
            decoder = charset.newDecoder();
            charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
            return charBuffer.toString();
        } catch (Exception ex) {
            ex.printStackTrace();
            return "";
        }
    }
}

  運行結果 

writing data: New String to write to file...1492332436279
reading data: New String to write to file...1492332436279

  NIO還提供了通道工具類共使用,即可以通過java.nio.channels.Channels類創建通道等操作。

三、總結

  本篇博文講解了通道,包括文件通道和套接字通道,以及通道與緩沖之間gather和scatter的操作,更多具體內容,有興趣的讀者可以查閱源代碼進一步學習,也謝謝各位園友的觀看~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM