Java NIO 通道 Channel


Channel 是 NIO 的核心概念,它表示一個打開的連接,這個連接可以連接到 I/O 設備(例如:磁盤文件,Socket)或者一個支持 I/O 訪問的應用程序。Java NIO 使用緩沖區和通道來進行數據傳輸。
Java 應用 IO設備 緩沖區 通道

一個通道在創建的時候被打開,可以調用 isOpen() 來判斷一個通道是否是打開狀態。關閉通道使用 close() 方法,一個通道一旦被關閉,將不能被重新打開。

1. 基於緩沖區 Buffer 和通道 Channel 的數據交互

應用程序可以通過與 I/O 設備建立通道來實現對 I/O 設備的讀寫操作,操作的數據通過緩沖區 Buffer 來進行交互。
Java程序 通道 Channel 緩沖區 Buffer 1.1讀 read() 1.2 填充數據 1.3 返回數據 2.1 填充數據 2.2 寫 write()

從 I/O 設備讀取數據時:
1)應用程序調用通道 Channel 的 read() 方法;
2)通道往緩沖區 Buffer 中填入 I/O 設備中的數據,填充完成之后返回;
3)應用程序從緩沖區 Buffer 中獲取數據。

往 I/O 設備寫數據時:
1)應用程序往緩沖區 Buffer 中填入要寫到 I/O 設備中的數據;
2)調用通道 Channel 的 write() 方法,通道將數據傳輸至 I/O 設備。

2. NIO 中主要的通道類型與操作

這里僅討論磁盤文件和網絡套接字的 I/O 通道,在整個 NIO 的學習中,直接內存映射相關內容一般指的是磁盤文件 I/O,而 I/O 多路復用模型和選擇器則一般指網絡I/O。磁盤文件通道為 FileChannel,網絡套接字通道有 TCP 相關的 SocketChannel,ServerSocketChannel 和 UDP 相關的 DatagramChannel。

2.1 FileChannel

文件通道可以連接一個文件,然后對文件進行讀,寫,映射到直接內存。使用文件通道操作文件的一般流程為:

1)獲取通道。文件通道通過 FileChannel 的靜態方法 open() 來獲取,獲取時需要指定文件路徑和文件打開方式。

FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 獲取文件通道

2)創建字節緩沖區。文件相關的字節緩沖區有兩種,一種是基於堆的 HeapByteBuffer,另一種是基於文件映射,放在堆外內存中的 MappedByteBuffer。這里使用前者,后者相關內容可以參考:Java NIO 文件通道 FileChannel 用法

ByteBuffer buf = ByteBuffer.allocate(10); // 分配字節緩存

3)讀寫操作。

讀取數據。一般需要一個循環結構來讀取數據,讀取數據時需要注意切換 ByteBuffer 的讀寫模式。

while (channel.read(buf) != -1){ // 讀取通道中的數據,並寫入到 buf 中
    buf.flip(); // 緩存區切換到讀模式
    while (buf.position() < buf.limit()){ // 讀取 buf 中的數據
        text.append((char)buf.get());
    }
    buf.clear(); // 清空 buffer,緩存區切換到寫模式
}

寫入數據。

for (int i = 0; i < text.length(); i++) {
    buf.put((byte)text.charAt(i)); // 填充緩沖區,需要將 2 字節的 char 強轉為 1 自己的 byte
    if (buf.position() == buf.limit() || i == text.length() - 1) { // 緩存區已滿或者已經遍歷到最后一個字符
        buf.flip(); // 將緩沖區由寫模式置為讀模式
        channel.write(buf); // 將緩沖區的數據寫到通道
        buf.clear(); // 清空緩存區,將緩沖區置為寫模式,下次才能使用
    }
}

4)將數據刷出到物理磁盤。FileChannel 的 force(boolean metaData) 方法可以確保對文件的操作能夠更新到磁盤。metaData 為 true 表示不僅要刷出數據,還要刷入文件的元數據,如:修改時間。

channel.force(false);

5)關閉通道。

channel.close();

下面給出一個文件通道的具體示例。示例中 writeText() 將字符串寫入到文件當中,然后 readText() 再將內容讀出來。這里為了簡單起見,示例代碼中字符串只能包含 ASCII 字符,而不能包含中文字或其它特殊字符;否則會亂碼。

public class FileChannelReadWrite {
    public static void main(String[] args) throws IOException {
        String fileName = "data.txt";
        String text = "Hello, welcome to Robothy's blog.";
        writeText(fileName, text);
        System.out.println(readText(fileName));
    }

    static String readText(String fileName) throws IOException {
        FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.READ);// 獲取文件通道
        ByteBuffer buf = ByteBuffer.allocate(10); // 分配字節緩存
        StringBuilder text = new StringBuilder();
        while (channel.read(buf) != -1){ // 讀取通道中的數據,並寫入到 buf 中
            buf.flip(); // 緩存區切換到讀模式
            while (buf.position() < buf.limit()){ // 讀取 buf 中的數據
                text.append((char)buf.get());
            }
            buf.clear(); // 清空 buffer,緩存區切換到寫模式
        }
        channel.close(); // 關閉通道
        return text.toString();
    }

    static void writeText(String fileName, String text) throws IOException {
        // 獲取文件通道
        FileChannel channel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); 
        ByteBuffer buf = ByteBuffer.allocate(10); // 創建字節緩沖區
        for (int i = 0; i < text.length(); i++) {
            buf.put((byte)text.charAt(i)); // 填充緩沖區,需要將 2 字節的 char 強轉為 1 自己的 byte
            if (buf.position() == buf.limit() || i == text.length() - 1) { // 緩存區已滿或者已經遍歷到最后一個字符
                buf.flip(); // 將緩沖區由寫模式置為讀模式
                channel.write(buf); // 將緩沖區的數據寫到通道
                buf.clear(); // 清空緩存區,將緩沖區置為寫模式,下次才能使用
            }
        }
        channel.force(false); // 將數據刷出到磁盤,不刷出文件元數據
        channel.close(); // 關閉通道
    }
}

關於 FileChannel 的更多詳細用法:Java NIO 文件通道 FileChannel 用法

2.2 SocketChannel

SocketChannel 負責 TCP 套接字的連接和數據傳輸,客戶端和服務端都需要用到。SocketChannel 是線程安全的,支持多線程訪問。

SocketChannel 有阻塞連接和非阻塞連接兩種模式。對於阻塞連接,讀取數據時會阻塞,直到有數據過來或者連接被關閉;對於非阻塞連接,調用 read() 方法時無論是否有數據都會立即返回。可以調用 configureBlocking(boolean block) 來配置為阻塞通道或非阻塞通道。

SocketChannel 可以由服務端或者客戶端發起關閉。假設客戶端在寫數據時,服務端關閉了連接,客戶端 write() 方法會拋出 AsynchronousCloseException;假設客戶端在讀取數據時,服務端關閉了連接,read() 方法會立即返回 -1,此時緩沖區中沒有內容。

TCP 客戶端使用 SocketChannel 與服務端進行交互的流程為:

1)打開通道,連接到服務端。

SocketChannel channel = SocketChannel.open(); // 打開通道,此時還沒有打開 TCP 連接
channel.connect(new InetSocketAddress("localhost", 9090)); // 連接到服務端

這兩句也可以合並起來寫。

SocketChannel channel = SocketChannel.open(new InetSocketAddress("localhost", 9090));

2)分配緩沖區。

ByteBuffer buf = ByteBuffer.allocate(10); // 分配一個 10 字節的緩沖區,不實用,容量太小

3)配置是否為阻塞方式。(默認為阻塞方式)

channel.configureBlocking(false); // 配置通道為非阻塞模式

如果配置了非阻塞模式,還需要調用 SocketChannel.finishConnect() 方法確保連接已經完成。

while (!channel.finishConnect()){// 不斷檢查是否完成了連接
    Thread.sleep(10);
}

4)與服務端進行數據交互。

5)關閉連接。

在關閉連接時,如果客戶端是寫數據的一方,完成寫入之后應該先調用一下 SocketChannel.shutdownOutput() ,此時讀的一端能夠檢測到 read() 返回的 -1。然后調用 clser() 方法關閉通道。

channel.shutdownOutput(); // 關閉 TCP 輸出,此時客戶端會發送 -1 給服務端
channel.close();          // 關閉通道

服務端在客戶端由連接過來時會創建一個 SocketChannel,不需要手動創建,后續步驟和客戶端一樣。下面有完整的示例。

2.3 ServerSocketChannel

ServerSocketChannel 負責監聽連接,服務端使用,在監聽到 TCP 連接時會產生一個 SocketChannel 實例與客戶端進行連接和數據交互。一般為了支持並發,服務端在產生 SocketChannel 之后可以通道實例放到一個隊列中,用一個線程池去處理隊列中的通道。不過這種方式並不能支持高並發,要支持高並發應該使用基於多路復用 I/O 模型的選擇器

1)打開一個 ServerSocketChannel 通道, 綁定端口。

ServerSocketChannel server = ServerSocketChannel.open(); // 打開通道

2)綁定端口

server.bind(new InetSocketAddress(9090)); // 綁定端口

3)阻塞等待連接到來。有新連接時會創建一個 SocketChannel 通道,服務端可以通過這個通道與連接過來的客戶端進行通信。等待連接到來的代碼一般放在一個循環結構中。

SocketChannel client = server.accept(); // 阻塞,直到有連接過來

4)通過 SocketChannel 與客戶端進行數據交互

5)關閉 SocketChannel

client.close();

2.4 基於套接字通道的 TCP 通信完整示例

用戶在客戶端控制台數據要發送的內容,服務端接收內容並打印在控制台。客戶端輸入 "Bye" 之后,斷開與服務端的連接。

TCP 客戶端代碼:

public class SocketChannelWrite {

    public static void main(String[] args) throws IOException, InterruptedException {
        SocketChannel channel = SocketChannel.open(); // 打開通道,此時還沒有打開 TCP 連接
        channel.connect(new InetSocketAddress("localhost", 9090)); // 連接到服務端
        ByteBuffer buf = ByteBuffer.allocate(10); // 分配一個 10 字節的緩沖區,不實用,容量太小
        Scanner scanner = new Scanner(System.in); // 掃描控制台輸入
        scanner.useDelimiter("\n");
        while(true){
            String msg = scanner.next() + "\r\n"; // 讀取控制台輸入的消息,再拼接上換行符
            for(int i=0; i<msg.length(); i++){    // 逐個字符遍歷輸入的內容
                buf.put((byte)msg.charAt(i));     // 將字符逐個放入緩沖區
                if(buf.position() == buf.limit() || i == msg.length()-1){ // 緩沖區已滿或者
                    buf.flip();         // 緩沖區切換到讀模式
                    channel.write(buf); // 往通道寫入數據
                    buf.clear();        // 清空緩沖區,緩沖區切換到寫入模式
                }
            }
            if("Bye\r\n".equals(msg)){
                channel.shutdownOutput(); // 關閉 TCP 輸出,此時客戶端會發送 -1 給服務端
                channel.close();          // 關閉通道
                break;
            }
        }
    }
}

TCP 服務端代碼:

public class ServerSocketChannelRead {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel server = ServerSocketChannel.open(); // 打開通道
        server.bind(new InetSocketAddress(9090));   // 綁定端口
        ByteBuffer buff = ByteBuffer.allocate(10);  // 為了代碼演示,只分配容量為 10 字節的緩沖區
        while (true) {
            SocketChannel client = server.accept(); // 阻塞,直到有連接過來
            System.out.println("Client connected.");
            while (true) {                          // 循環讀取客戶端發送過來的數據
                if(client.read(buff) == -1){        // 客戶端關閉了輸出之后,阻塞的 client.read(buf) 會立即返回 -1,此時 buf 中沒有內容
                    client.close();                 // 關閉通道
                    System.out.println("Client closed the connection.");
                    break;
                }
                buff.flip();    // 切換到讀模式
                while (buff.position() < buff.limit()) {
                    System.out.print((char) buff.get()); // 一個字符一個字符打印出來
                }
                buff.clear();   // 切換到寫模式
            }
        }
    }
}

2.5 DatagramChannel

數據報通道 DatagramChannel 表示 UDP 通道。UDP 是無連接協議,在收發數據時不需要進行連接。與 FileChannel 和 SocketChannel 使用 read()/write() 不同,DatagramChannel 通常使用 receive()/send() 來收發數據。receive() 在接收數據之后會返回發送方的地址,send() 方法在發送數據的時候需要指定接收方的地址。

DatagramChannel 支持阻塞模式和非阻塞模式。非阻塞模式時,receive(ByteBuffer dst) 方法會立即返回,如果有數據,則會返回發送方的地址;如果沒有數據,則返回 null。類似地,非阻塞模式下 send(ByteBuffer src, SocketAddress) 也會立即返回,返回的結果為發送的字節數。

DatagramChannel 作為客戶端操作流程:

1)打開通道

DatagramChannel channel = DatagramChannel.open();

2)配置阻塞模式

channel.configureBlocking(false); // 非阻塞模式

3)分配緩沖區

ByteBuffer buf = ByteBuffer.allocate(1024);     // 分配 1024 字節的緩沖區

4)數據交互

數據報通道 DatagramChannel 通過 receive()/send() 方法來進行數據的交互。需要注意的是,發送數據時,每次最多發送一個 UDP 數據報的大小(理論上是 65535-8 字節);因此,當緩沖區過大時,需要考慮多次發送。發送數據的時候需要指定地址。

另外,DatagramChannel 指定了 connect(SocketAddress remote) 方法,傳入通信對方的地址。如果調用了此方法,則該通道只能和指定的地址進行數據交互,即使 send() 指定了其它的地址也沒有。事實上,DatagramChannel 提供了 read()/write() 方法,這兩個方法只有在 connect 指定了地址的情況下才能夠使用,否則數據將被丟棄。

SocketAddress address = channel.receive(buf);
channel.send(buf, address);

5)關閉通道

channel.close();

DatagramChannel 作為服務端操作流程:

1)打開通道
與客戶端打開通道的方式一樣。

2)綁定要監聽的端口

channel.bind(new InetSocketAddress(9090));   // 綁定要監聽的端口

3)配置阻塞模式

4)分配緩沖區

5)接收客戶端發送過來的數據

下面提供基於 DatagramChannel 進行 UDP 通信的完整示例代碼。

2.6 基於 DatagrapChannel 的 UDP 通訊實例

服務端接收客戶端發送過來的數據報,然后打印其內容,再向客戶端發送一條消息,表示接收到的消息的大小。

public class DatagramChannelRead {

    public static void main(String[] args) throws IOException {
        DatagramChannel channel = DatagramChannel.open(); // 打開通道
        channel.bind(new InetSocketAddress(9090));   // 綁定要監聽的端口
        ByteBuffer buf = ByteBuffer.allocate(1024);       // 分配緩沖區

        while (true){
            SocketAddress address = channel.receive(buf);  // 接收數據,獲取發送方地址
            buf.flip(); // 緩沖區切換為讀模式
            int len = buf.limit(); // 獲取 buff 中數據的長度
            System.out.println("Client -> " + new String(buf.array(), 0, len, StandardCharsets.UTF_8)); // 打印 buf 中的內容
            buf.clear(); // 清空緩沖區,切換到寫模式

            buf.put(String.format("Received %4d bytes.", len).getBytes()); // 將要返回給發送端的消息填入緩沖區
            buf.flip();
            channel.send(buf, address); // send 一次性最多只能發送 65535 - 8 字節的數據,如果 buf 很大的話需要用一個循環去發送。
            buf.clear();
        }
   

客戶端有2個線程, sender 線程接收用戶在控制台輸入的內容,接收一行輸入的內容就發送給服務端;receiver 線程接收服務端返回的消息並打印在控制台。當用戶輸入 "Bye" 時,客戶端退出。

public class DatagramChannelWrite {

    public static void main(String[] args) throws IOException, InterruptedException {
        DatagramChannel channel = DatagramChannel.open(); // 打開通道

        InetSocketAddress serverAddress = new InetSocketAddress("localhost", 9090); // 聲明服務端的地址

        channel.configureBlocking(false); // 非阻塞模式

        // 用於接收服務端發送過來的消息
        Thread receiver = new Thread(()->{
            ByteBuffer buf = ByteBuffer.allocate(1024);     // 分配 1024 字節的緩沖區
            while(!Thread.currentThread().isInterrupted()){ // 檢查中斷標志,如果被中斷,則結束線程
                try {
                    while (null == channel.receive(buf)) {  // 循環接收數據
                        Thread.sleep(10);             // 沒有消息則 sleep 10ms
                    }
                    buf.flip();
                    System.out.println("Server -> " + new String(buf.array(), 0, buf.limit()));
                    buf.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        Thread sender = new Thread(()->{
            try {
                ByteBuffer buf = ByteBuffer.allocate(1024);
                Scanner scanner = new Scanner(System.in);
                while (true){
                    String msg = scanner.nextLine();
                    if(msg.equals("Bye")) {
                        receiver.interrupt();
                        break;
                    }
                    buf.put(msg.getBytes(StandardCharsets.UTF_8));
                    buf.flip();
                    channel.send(buf, serverAddress);
                    buf.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        sender.start();     // 啟動 sender 線程
        receiver.start();   // 啟動 receiver線程
        receiver.join();    // 等待 receiver
        channel.close();    // 關閉通道
    }
}

3. 小結

1)Java NIO 中的通道結合緩沖區,提供了一種與流不一樣的操作模式。通道是應用程序到 I/O 設備的一個打開的連接,應用程序可以往通道中寫入數據或者從通道中讀取數據。

2)NIO 中主要的通道有四種,磁盤文件 I/O 相關的 FileChannel,網絡 I/O 相關的 SocketChannel, ServerSocketChannel 和 DatagramChannel。其中文件相關的通道只能以阻塞的方式進行 I/O 操作,而網絡相關通道則可以通過阻塞方式和非阻塞方式進行通信。

以上是關於通道的一些基本概念和用法,就這些內容上來看,NIO 相對於普通的 I/O 並沒有太大的優勢(非阻塞網絡 I/O除外);普通 I/O 流中的 BufferedInputStream, BufferedOutputSteram 能夠起到和通道幾乎一樣的作用。事實上,基於內存映射技術的直接內存緩存提供了比普通 I/O 更加高效的訪問磁盤文件方式;而 NIO 為網絡 I/O 提供了非阻塞訪問模型的接口,配合選擇器 Selector,極大提高了 Java 程序所能夠支持的並發數。

4. 參考

[1] Java API Specification
[2] Java NIO Tutorial


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM