Java NIO系列(三) - Channel


前言

上文講到Java NIO一些基本概念。在標准的IO中,都是基於字節流/字符流進行數據操作的,而在NIO中則是是基於ChannelBuffer進行操作,其中的Channel的雖然模擬了的概念,實則大不相同。

本文將詳細闡述NIO中的通道Channel的概念和具體的用法。

Channel和Stream的區別

區別 Stream Channel
是否支持異步 不支持 支持
是否支持雙向數據傳輸 不支持,只能單向 支持,既可以從通道讀取數據,也可以向通道寫入數據
是否結合Buffer使用 必須結合Buffer使用
性能 較低 較高

Channel用於在字節緩沖區和位於通道另一側的服務(通常是文件或者套接字)之間以便有效的進行數據傳輸。借助通道,可以用最小的總開銷來訪問操作系統本身的I/O服務。

需要注意的是Channel必須結合Buffer使用,應用程序不能直接向通道中讀/寫數據,也就是緩沖區充當着應用程序和通道數據流動的轉換的角色。

正文

Channel的源碼

查看Channel的源碼。所有的接口都實現於Channel接口,從接口上來看,所有的通道都有這兩種操作:檢查通道的開啟狀態關閉通道

1
2
3
4
5
public interface Channel extends Closeable {
public boolean isOpen();

public void close() throws IOException;
}

Channel的分類

廣義上來說通道可以被分為兩類:文件I/O和網絡I/O,也就是文件通道套接字通道。如果分的更細致一點則是:

  • FileChannel:從文件讀寫數據;
  • SocketChannel:通過TCP讀寫網絡數據;
  • ServerSocketChannel:可以監聽新進來的TCP連接,並對每個鏈接創建對應的SocketChannel
  • DatagramChannel:通過UDP讀寫網絡中的數據。

Channel的特性

單向or雙向

通道既可以是單向的也可以是雙向的。只實現ReadableByteChannel接口中的read()方法或者只實現WriteableByteChannel接口中的write()方法的通道皆為單向通道,同時實現ReadableByteChannelWriteableByteChannel雙向通道,比如ByteChannel

1
2
public interface ByteChannel extends ReadableByteChannel, WritableByteChannel {
}

對於Socket通道來說,它們一直是雙向的,而對於FileChannel來說,它同樣實現了ByteChannel,但是通過FileInputStreamgetChannel()獲取的FileChannel只具有文件的只讀權限

注意:調用FileChannel的write()方法會拋出了NonWriteChannelException異常。

阻塞or非阻塞

通道的工作模式有兩種:阻塞或非阻塞。在非阻塞模式下,調用的線程不會休眠,請求的操作會立刻返回結果;在阻塞模式下,調用的線程會產生休眠。

FileChannel不能運行在非阻塞模式下,其余的通道都可阻塞運行也可以以非阻塞的方式運行。

另外從SelectableChannel引申出的類可以和支持有條件選擇的Selector結合使用,進而充分利用多路復用I/O(Multiplexed I/O)來提高性能

SelectableChannel的源碼中有以下幾個抽象方法,可以看出支持配置兩種工作模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
/**
* 配置是否為Channel阻塞模式
*/
public abstract SelectableChannel configureBlocking(boolean block) throws IOException;
/**
* 判斷是否為Channel阻塞模式
*/
public abstract boolean isBlocking();
/**
* 獲取阻塞的鎖對象
*/
public abstract Object blockingLock();
}

 

對於Socket通道類來說,通常與Selector共同使用以提高性能。需要注意的是通道不能被同時使用,一個打開的通道代表着與一個特定I/O服務進行連接並封裝了該連接的狀態,通道一旦關閉,該連接便會斷開

通道的close()比較特殊,無論在通道時在阻塞模式下還是非阻塞模式下,由於close()方法的調用而導致底層I/O關閉都可能會造成線程的暫時阻塞。在一個已關閉的通道上調用close()並沒有任何意義,只會立即返回。

Channel的實戰

對於Socket通道來說存在直接創建新Socket通道的方法,而對於文件通道來說,升級之后的FileInputStream、FileOutputStream和RandomAccessFile提供了getChannel()方法來獲取通道。

FileChannel

Java NIO中的FileChannel是一個連接到文件的通道,可以通過文件通道讀寫文件。文件通道總是阻塞式的,因此FileChannel無法設置為非阻塞模式

文件讀寫

(一). 文件寫操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void testWriteOnFileChannel() {
try {
RandomAccessFile randomAccess = new RandomAccessFile("D://test.txt", "rw");
FileChannel fileChannel = randomAccess.getChannel();

byte[] bytes = new String("Java Non-blocking IO").getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);

// 將緩沖區中的字節寫入文件通道中
fileChannel.write(byteBuffer);
// 強制將通道中未寫入磁盤的數據立刻寫入到磁盤
fileChannel.force(true);
// 清空緩沖區,釋放內存
byteBuffer.clear();
fileChannel.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

(二). 文件讀操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void testReadOnFileChannel() {
try {
FileInputStream inputStream = new FileInputStream(new File("D://test.txt"));
FileChannel fileChannel = inputStream.getChannel();

ByteBuffer byteBuffer = ByteBuffer.allocate(10);
// 不斷地寫入緩沖區,寫一次讀一次
while (fileChannel.read(byteBuffer) != -1) {
// 緩沖區從寫模式切換為讀模式
byteBuffer.flip();
// 開始讀取
while (byteBuffer.hasRemaining()) {
// 一個字節一個字節地讀取,並向后移動position地位置
System.out.print((char) byteBuffer.get());
}
// 緩沖區不會被自動覆蓋,需要主動調用該方法(實際上還是覆蓋)
byteBuffer.clear();
}
fileChannel.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

文件讀寫測試:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
System.out.println("Start to write");
// 通過FileChannel寫入數據
testWriteOnFileChannel();

System.out.println("Start to read");
// 通過FileChannel讀取數據
testReadOnFileChannel();
}

 

測試結果:

transferFrom和transferTo

(一). transferFrom()的使用

FileChanneltransferFrom()方法可以將數據從源通道傳輸到FileChannel中。下面是一個簡單的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static void testTransferFrom(){
try {
RandomAccessFile fromFile = new RandomAccessFile("D://file1.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("D://file2.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}

(二). transferTo()的使用

transferTo()方法將數據從FileChannel傳輸到目標channel中。下面是一個簡單的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void testTransferTo() {
try {
RandomAccessFile fromFile = new RandomAccessFile("D://file1.txt", "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile("D://file3.txt", "rw");
FileChannel toChannel = toFile.getChannel();

long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

ServerSocketChannel

Java NIO中的ServerSocketChannel是一個可以監聽新進來的TCP連接的通道。它類似ServerSocket,要注意的是和DatagramChannelSocketChannel不同,ServerSocketChannel本身不具備傳輸數據的能力,而只是負責監聽傳入的連接和創建新的SocketChannel

ServerSocketChannel的用法

(一). 創建ServerSocketChannel

通過ServerSocketChannel.open()方法來創建一個新的ServerSocketChannel對象,該對象關聯了一個未綁定ServerSocket通道。通過調用該對象上的socket()方法可以獲取與之關聯的ServerSocket

1
ServerSocketChannel socketChannel = ServerSocketChannel.open();

(二). 為ServerSocketChannel綁定監聽端口號

JDK 1.7之前,ServerSocketChannel沒有bind()方法,因此需要通過他關聯的的socket對象的socket()來綁定。

1
2
// JDK1.7之前
serverSocketChannel.socket().bind(new InetSocketAddress(25000));

JDK1.7及以后,可以直接通過ServerSocketChannelbind()方法來綁定端口號

1
2
// JDK1.7之后
serverSocketChannel.bind(new InetSocketAddress(25000));

(三). 設置ServerSocketChannel的工作模式

ServerSocketChannel底層默認采用阻塞的工作模式,它提供了一個configureBlocking()方法,允許配置ServerSocketChannel非阻塞方式運行。

1
2
// 設置為非阻塞模式
serverSocketChannel.configureBlocking(false);

進一步查看configureBlocking源碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public final SelectableChannel configureBlocking(boolean block) throws IOException {
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking == block)
return this;
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
implConfigureBlocking(block);
blocking = block;
}
return this;
}

Javadoc解釋configureBlocking()方法用於調整底層通道的工作模式,即阻塞和非阻塞,默認是阻塞工作模式。

如果block設置為true,直接返回當前的阻塞式的通道;如果block設置為false,configureBlocking()方法會調用implConfigureBlocking()方法。這里implConfigureBlocking()是由ServerSocketChannelImpl實現,最終調用了IOUtil中的native方法configureBlocking()。

(四). 監聽新進來的連接

通過ServerSocketChannel.accept()方法監聽新進來的連接,這里需要根據configureBlocking()的配置區分兩種工作模式的使用:

  • 阻塞模式下,當accept()方法返回的時候,它返回一個包含新連接SocketChannel,否則accept()方法會一直阻塞到有新連接到達。
  • 非阻塞模式下,在沒有新連接的情況下,accept()會立即返回null,該模式下通常不會僅僅監聽一個連接,因此需在while循環中調用accept()方法.

阻塞模式:

1
2
3
4
5
6
while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();
// 新連接沒到達之前,后面的程序無法繼續執行
InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
// 其他操作
}

非阻塞模式:

1
2
3
4
5
6
7
8
while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();
// 新連接沒到達之前,后面程序一直循環,直到檢測到socketChannel不為null時進入真正的執行邏輯
if(socketChannel != null) {
InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
// 其他操作
}
}

(五). 關閉ServerSocketChannel

通過調用ServerSocketChannel.close()方法來關閉ServerSocketChannel

1
serverSocketChannel.close();

ServerSocketChannel的完整示例

(一). 阻塞模式

代碼示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void blockingTest() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(25000));

System.out.println("ServerSocketChannel listening on 25000...");

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

while(true) {
SocketChannel socketChannel = serverSocketChannel.accept();
InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
System.out.println("Remote address: " + remoteAddress.getHostString());

while (socketChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
}
}
}

運行結果:

(二). 非阻塞模式

代碼示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void nonBlockingTest() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(25001));
System.out.println("ServerSocketChannel listening on 25001...");

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("SocketChannel: " + socketChannel);
if (socketChannel != null) {
InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress();
System.out.println("Remote address: " + remoteAddress.getHostString());

while (socketChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
byteBuffer.clear();
}
}
}
}

運行結果:


SocketChannel

Java NIO中的SocketChannel是一個連接到TCP網絡套接字通道,它是Socket類的對等類。

通常SocketChannel客戶端服務器發起連接請求,每個SocketChannel對象創建時都關聯一個對等的Socket對象。同樣SocketChannel也可以運行在非阻塞模式下。

SocketChannel的用法

SocketChannel創建的方式有兩種:

  • 客戶端主動創建:客戶端打開一個SocketChannel並連接到某台服務器上;
  • 服務端被動創建:一個新連接到達ServerSocketChannel時,服務端會創建一個SocketChannel

(一). 創建SocketChannel

通過SocketChannel的靜態方法open()創建SocketChannel對象。此時通道雖然打開,但並未建立連接。此時如果進行I/O操作會拋出NotYetConnectedException異常。

1
SocketChannel socketChannel = SocketChannel.open();

(二). 連接指定服務器

通過SocketChannel對象的connect()連接指定地址。該通道一旦連接,將保持連接狀態直到被關閉。可通過isConnected()來確定某個SocketChannel當前是否已連接。

  • 阻塞模式

如果在客戶端SocketChannel阻塞模式下,即服務器端ServerSocketChannel也為阻塞模式

1
2
3
socketChannel.connect(new InetSocketAddress("127.0.0.1", 25000));
// connect()方法調用以后,socketChannel底層的連接創建完成后,才會執行后面的打印語句
System.out.println("連接創建完成...");
  • 非阻塞模式

兩點需要注意:其一,SocketChannel需要通過configureBlocking()設置為非阻塞模式;其二,非阻塞模式下,connect()方法調用后會異步返回,為了確定連接是否建立,需要調用finishConnect()的方法。

1
2
3
4
5
6
7
8
9
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 25001));
// connect()方法調用以后,異步返回,需要手動調用finishConnect確保連接創建

while(!socketChannel.finishConnect()){
// 檢測到還未創建成功則睡眠10ms
TimeUnit.MILLISECONDS.sleep(10);
}
System.out.println("連接創建完成...");

(三). 從SocketChannel讀數據

利用SocketChannel對象的read()方法將數據從SocketChannel讀取Buffer

1
2
3
4
5
6
7
8
9
10
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

// 非阻塞模式下,read()方法在尚未讀取到任何數據時可能就返回了,所以需要關注它的int返回值。
while (socketChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
System.out.println((char) byteBuffer.get());
}
byteBuffer.clear();
}

(四). 向SocketChannel寫數據

利用SocketChannel對象的write()Buffer的數據寫入SocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("Client Blocking SocketChannel".getBytes());
// byteBuffer.put("Client Non-Blocking SocketChannel".getBytes());
byteBuffer.flip();

// 非阻塞模式下,write()方法在尚未寫出任何內容時可能就返回了。所以需要在循環中調用write()
while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}

// 保持睡眠,觀察控制台輸出
TimeUnit.SECONDS.sleep(20000);
socketChannel.close();

(五). 關閉SocketChannel

利用SocketChannel對象的close()方法關閉SocketChannel

1
socketChannel.close();

SocketChannel的完整示例

(一). 阻塞模式

代碼示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void blockingWrite() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 25000));

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("Client Blocking SocketChannel".getBytes());
byteBuffer.flip();

while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}

TimeUnit.SECONDS.sleep(20000);
socketChannel.close();
}

服務端打印結果:

(一). 非阻塞模式

代碼示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void nonBlockingWrite() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1", 25001));

while(!socketChannel.finishConnect()){
TimeUnit.MILLISECONDS.sleep(10);
}

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put("Client Non-Blocking SocketChannel".getBytes());
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
socketChannel.write(byteBuffer);
}

TimeUnit.SECONDS.sleep(20000);
socketChannel.close();
}

服務端打印結果:


DatagramChannel

Java NIO中的DatagramChannel是一個能收發UDP的通道,其底層實現為DatagramSocket + SelectorDatagramChannel可以調用socket()方法獲取對等DatagramSocket對象。
DatagramChannel對象既可以充當服務端(監聽者),也可以充當客戶端(發送者)。如果需要新創建的通道負責監聽,那么該通道必須綁定一個端口(或端口組):

DatagramChannel的完整示例

數據報發送方:

1
2
3
4
5
6
public static void main(String[] args) throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
ByteBuffer byteBuffer = ByteBuffer.wrap("DatagramChannel Sender".getBytes());
int byteSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 50020));
System.out.println("Byte sent is: " + byteSent);
}

數據報接收方:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws Exception {
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.socket().bind(new InetSocketAddress(50020));

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
datagramChannel.receive(byteBuffer);
byteBuffer.flip();

while (byteBuffer.hasRemaining()) {
System.out.print((char) byteBuffer.get());
}
}

先運行DatagramChannelReceiveTest,再運行DatagramChannelSendTest,觀察控制台輸出:

數據報發送方:

數據報接收方:


工具類Channels

NIO通道提供了一個便捷的通道類Channels,其中定義了幾種靜態的工廠方法以簡化通道轉換。其中常用的方法如下:

方法 返回 描述
newChannel(InputStream in) ReadableByteChannel 返回一個將從給定的輸入流讀取數據的通道。
newChannel(OutputStream out) WritableByteChannel 返回一個將向給定的輸出流寫入數據的通道。
newInputStream(ReadableByteChannel ch) InputStream 返回一個將從給定的通道讀取字節的流。
newOutputStream(WritableByteChannel ch) OutputStream 返回一個將向給定的通道寫入字節的流。
newReader(ReadableByteChannel ch, CharsetDecoder dec, int minBufferCap) Reader 返回一個reader,它將從給定的通道讀取字節並依據提供的字符集名稱對讀取到的字節進行解碼。
newReader(ReadableByteChannel ch, String csName) Reader 返回一個reader,它將從給定的通道讀取字節並依據提供的字符集名稱將讀取到的字節解碼成字符。
newWriter(WritableByteChannel ch, CharsetEncoder dec, int minBufferCap) Writer 返回一個writer,它將使用提供的字符集名稱對字符編碼並寫到給定的通道中。
newWriter(WritableByteChannel ch, String csName) Writer 返回一個writer,它將依據提供的字符集名稱對字符編碼並寫到給定的通道中。

總結

本文針對NIO中的通道的做了詳細的介紹,對於文件通道FileChannel網絡通道SocketChannelServerSocketChannelDatagramChannel進行了實戰演示。

篇幅較長,可見NIO提供的原生的通道API在使用上並不是太容易。


歡迎掃碼關注我的個人技術公眾號: 零壹技術棧

image

本帳號將持續分享后端技術干貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分布式和微服務,架構學習和進階等學習資料和文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM