目錄:
《Java NIO系列教程(三) Channel之Socket通道》
在《Java NIO系列教程(二) Channel》介紹了FileChannel,本章節介紹socket通道類。
一、Socket通道
新的socket通道類可以運行非阻塞模式並且是可選擇的。這兩個性能可以激活大程序(如網絡服務器和中間件組件)巨大的可伸縮性和靈活性。本節中我們會看到,再也沒有為每個socket連接使用一個線程的必要了,也避免了管理大量線程所需的上下文交換總開銷。借助新的NIO類,一個或幾個線程就可以管理成百上千的活動socket連接了並且只有很少甚至可能沒有性能損失。所有的socket通道類(DatagramChannel、SocketChannel和ServerSocketChannel)都繼承了位於java.nio.channels.spi包中的AbstractSelectableChannel。這意味着我們可以用一個Selector對象來執行socket通道的就緒選擇(readiness selection)。
請注意DatagramChannel和SocketChannel實現定義讀和寫功能的接口而ServerSocketChannel不實現。ServerSocketChannel負責監聽傳入的連接和創建新的SocketChannel對象,它本身從不傳輸數據。
在我們具體討論每一種socket通道前,您應該了解socket和socket通道之間的關系。之前的章節中有寫道,通道是一個連接I/O服務導管並提供與該服務交互的方法。就某個socket而言,它不會再次實現與之對應的socket通道類中的socket協議API,而java.net中已經存在的socket通道都可以被大多數協議操作重復使用。
全部socket通道類(DatagramChannel、SocketChannel和ServerSocketChannel)在被實例化時都會創建一個對等socket對象。這些是我們所熟悉的來自java.net的類(Socket、ServerSocket和DatagramSocket),它們已經被更新以識別通道。對等socket可以通過調用socket( )方法從一個通道上獲取。此外,這三個java.net類現在都有getChannel( )方法。
Socket通道將與通信協議相關的操作委托給相應的socket對象。socket的方法看起來好像在通道類中重復了一遍,但實際上通道類上的方法會有一些新的或者不同的行為。
要把一個socket通道置於非阻塞模式,我們要依靠所有socket通道類的公有超級類:SelectableChannel。就緒選擇(readiness selection)是一種可以用來查詢通道的機制,該查詢可以判斷通道是否准備好執行一個目標操作,如讀或寫。非阻塞I/O和可選擇性是緊密相連的,那也正是管理阻塞模式的API代碼要在SelectableChannel超級類中定義的原因。
設置或重新設置一個通道的阻塞模式是很簡單的,只要調用configureBlocking( )方法即可,傳遞參數值為true則設為阻塞模式,參數值為false值設為非阻塞模式。真的,就這么簡單!您可以通過調用isBlocking( )方法來判斷某個socket通道當前處於哪種模式。
AbstractSelectableChannel.java中實現的configureBlocking()方法如下:
public final SelectableChannel configureBlocking(boolean block) throws IOException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if (blocking == block) return this; if (block && haveValidKeys()) throw new IllegalBlockingModeException(); implConfigureBlocking(block); blocking = block; } return this; }
非阻塞socket通常被認為是服務端使用的,因為它們使同時管理很多socket通道變得更容易。但是,在客戶端使用一個或幾個非阻塞模式的socket通道也是有益處的,例如,借助非阻塞socket通道,GUI程序可以專注於用戶請求並且同時維護與一個或多個服務器的會話。在很多程序上,非阻塞模式都是有用的。
偶爾地,我們也會需要防止socket通道的阻塞模式被更改。API中有一個blockingLock( )方法,該方法會返回一個非透明的對象引用。返回的對象是通道實現修改阻塞模式時內部使用的。只有擁有此對象的鎖的線程才能更改通道的阻塞模式。
下面分別介紹這3個通道。
二、 ServerSocketChannel
讓我們從最簡單的ServerSocketChannel來開始對socket通道類的討論。以下是ServerSocketChannel的完整API:
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open() throws IOException; public abstract ServerSocket socket(); public abstract ServerSocket accept()throws IOException; public final int validOps(); }
ServerSocketChannel是一個基於通道的socket監聽器。它同我們所熟悉的java.net.ServerSocket執行相同的基本任務,不過它增加了通道語義,因此能夠在非阻塞模式下運行。
由於ServerSocketChannel沒有bind()方法,因此有必要取出對等的socket並使用它來綁定到一個端口以開始監聽連接。我們也是使用對等ServerSocket的API來根據需要設置其他的socket選項。
同它的對等體java.net.ServerSocket一樣,ServerSocketChannel也有accept( )方法。一旦您創建了一個ServerSocketChannel並用對等socket綁定了它,然后您就可以在其中一個上調用accept()。如果您選擇在ServerSocket上調用accept( )方法,那么它會同任何其他的ServerSocket表現一樣的行為:總是阻塞並返回一個java.net.Socket對象。如果您選擇在ServerSocketChannel上調用accept( )方法則會返回SocketChannel類型的對象,返回的對象能夠在非阻塞模式下運行。
換句話說:
- ServerSocketChannel的accept()方法會返回SocketChannel類型對象,SocketChannel可以在非阻塞模式下運行。
- 其它Socket的accept()方法會阻塞返回一個Socket對象。
如果ServerSocketChannel以非阻塞模式被調用,當沒有傳入連接在等待時,ServerSocketChannel.accept( )會立即返回null。正是這種檢查連接而不阻塞的能力實現了可伸縮性並降低了復雜性。可選擇性也因此得到實現。我們可以使用一個選擇器實例來注冊一個ServerSocketChannel對象以實現新連接到達時自動通知的功能。以下代碼演示了如何使用一個非阻塞的accept( )方法:
package com.dxz.springsession.nio.demo2; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.net.InetSocketAddress; public class ChannelAccept { public static final String GREETING = "Hello I must be going.\r\n"; public static void main(String[] argv) throws Exception { int port = 1234; // default if (argv.length > 0) { port = Integer.parseInt(argv[0]); } ByteBuffer buffer = ByteBuffer.wrap(GREETING.getBytes()); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { System.out.println("Waiting for connections"); SocketChannel sc = ssc.accept(); if (sc == null) { System.out.println("null"); Thread.sleep(2000); } else { System.out.println("Incoming connection from: " + sc.socket().getRemoteSocketAddress()); buffer.rewind(); sc.write(buffer); sc.close(); } } } }
日志:
2.1、打開 ServerSocketChannel
通過調用 ServerSocketChannel.open() 方法來打開ServerSocketChannel.如:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
2.2、關閉 ServerSocketChannel
通過調用ServerSocketChannel.close() 方法來關閉ServerSocketChannel. 如:
serverSocketChannel.close();
2.3、監聽新進來的連接
通過 ServerSocketChannel.accept() 方法監聽新進來的連接。當 accept()方法返回的時候,它返回一個包含新進來的連接的 SocketChannel。因此, accept()方法會一直阻塞到有新連接到達。
通常不會僅僅只監聽一個連接,在while循環中調用 accept()方法. 如下面的例子:
while(true){ SocketChannel socketChannel = serverSocketChannel.accept(); //... }
2.4、阻塞模式
會在SocketChannel sc = ssc.accept();這里阻塞住進程。
2.5、非阻塞模式
ServerSocketChannel可以設置成非阻塞模式。在非阻塞模式下,accept() 方法會立刻返回,如果還沒有新進來的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null.如:
ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(port)); ssc.configureBlocking(false); while (true) { System.out.println("Waiting for connections"); SocketChannel sc = ssc.accept(); if(sc != null) { }
三、SocketChannel
下面開始學習SocketChannel,它是使用最多的socket通道類:
Java NIO中的SocketChannel是一個連接到TCP網絡套接字的通道。可以通過以下2種方式創建SocketChannel:
- 打開一個SocketChannel並連接到互聯網上的某台服務器。
- 一個新連接到達ServerSocketChannel時,會創建一個SocketChannel。
3.1、打開 SocketChannel
下面是SocketChannel的打開方式:
SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));
3.2、關閉 SocketChannel
當用完SocketChannel之后調用SocketChannel.close()關閉SocketChannel:
socketChannel.close();
3.3、從 SocketChannel 讀取數據
要從SocketChannel中讀取數據,調用一個read()的方法之一。以下是例子:
ByteBuffer buf = ByteBuffer.allocate(48); int bytesRead = socketChannel.read(buf);
首先,分配一個Buffer。從SocketChannel讀取到的數據將會放到這個Buffer中。然后,調用SocketChannel.read()。該方法將數據從SocketChannel 讀到Buffer中。read()方法返回的int值表示讀了多少字節進Buffer里。如果返回的是-1,表示已經讀到了流的末尾(連接關閉了)。
3.4、寫入 SocketChannel
寫數據到SocketChannel用的是SocketChannel.write()方法,該方法以一個Buffer作為參數。示例如下:
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); while(buf.hasRemaining()) { channel.write(buf); }
注意SocketChannel.write()方法的調用是在一個while循環中的。Write()方法無法保證能寫多少字節到SocketChannel。所以,我們重復調用write()直到Buffer沒有要寫的字節為止。
3.5、非阻塞模式
可以設置 SocketChannel 為非阻塞模式(non-blocking mode).設置之后,就可以在異步模式下調用connect(), read() 和write()了。
3.5.1、connect()
如果SocketChannel在非阻塞模式下,此時調用connect(),該方法可能在連接建立之前就返回了。為了確定連接是否建立,可以調用finishConnect()的方法。像這樣:
socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80)); while(! socketChannel.finishConnect() ){ //wait, or do something else... }
3.5.2、write()
非阻塞模式下,write()方法在尚未寫出任何內容時可能就返回了。所以需要在循環中調用write()。前面已經有例子了,這里就不贅述了。
3.5.3、read()
非阻塞模式下,read()方法在尚未讀取到任何數據時可能就返回了。所以需要關注它的int返回值,它會告訴你讀取了多少字節。
3.6、非阻塞模式與選擇器
非阻塞模式與選擇器搭配會工作的更好,通過將一或多個SocketChannel注冊到Selector,可以詢問選擇器哪個通道已經准備好了讀取,寫入等。Selector與SocketChannel的搭配使用會在后面詳講。
四、DatagramChannel
最后一個socket通道是DatagramChannel。正如SocketChannel對應Socket,ServerSocketChannel對應ServerSocket,每一個DatagramChannel對象也有一個關聯的DatagramSocket對象。不過原命名模式在此並未適用:“DatagramSocketChannel”顯得有點笨拙,因此采用了簡潔的“DatagramChannel”名稱。
正如SocketChannel模擬連接導向的流協議(如TCP/IP),DatagramChannel則模擬包導向的無連接協議(如UDP/IP)。
DatagramChannel是無連接的。每個數據報(datagram)都是一個自包含的實體,擁有它自己的目的地址及不依賴其他數據報的數據負載。與面向流的的socket不同,DatagramChannel可以發送單獨的數據報給不同的目的地址。同樣,DatagramChannel對象也可以接收來自任意地址的數據包。每個到達的數據報都含有關於它來自何處的信息(源地址)。
4.1、打開 DatagramChannel
下面是 DatagramChannel 的打開方式:
DatagramChannel channel = DatagramChannel.open(); channel.socket().bind(new InetSocketAddress(9999));
這個例子打開的 DatagramChannel可以在UDP端口9999上接收數據包。
4.2、接收數據
通過receive()方法從DatagramChannel接收數據,如:
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
channel.receive(buf);
receive()方法會將接收到的數據包內容復制到指定的Buffer. 如果Buffer容不下收到的數據,多出的數據將被丟棄。
4.3、發送數據
通過send()方法從DatagramChannel發送數據,如:
String newData = "New String to write to file..." + System.currentTimeMillis(); ByteBuffer buf = ByteBuffer.allocate(48); buf.clear(); buf.put(newData.getBytes()); buf.flip(); int bytesSent = channel.send(buf, new InetSocketAddress("jenkov.com", 80));
這個例子發送一串字符到”jenkov.com”服務器的UDP端口80。 因為服務端並沒有監控這個端口,所以什么也不會發生。也不會通知你發出的數據包是否已收到,因為UDP在數據傳送方面沒有任何保證。
4.4、連接到特定的地址
可以將DatagramChannel“連接”到網絡中的特定地址的。由於UDP是無連接的,連接到特定地址並不會像TCP通道那樣創建一個真正的連接。而是鎖住DatagramChannel ,讓其只能從特定地址收發數據。
這里有個例子:
channel.connect(new InetSocketAddress("jenkov.com", 80));
當連接后,也可以使用read()和write()方法,就像在用傳統的通道一樣。只是在數據傳送方面沒有任何保證。這里有幾個例子:
int bytesRead = channel.read(buf); int bytesWritten = channel.write(but);
完整示例:
package com.dxz.springsession.nio.demo3; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.io.*; import java.util.*; import java.nio.*; public class DatagramChannelServerDemo { // UDP協議服務端 private int port = 9975; DatagramChannel channel; private Charset charset = Charset.forName("UTF-8"); private Selector selector = null; public DatagramChannelServerDemo() throws IOException { try { selector = Selector.open(); channel = DatagramChannel.open(); } catch (Exception e) { selector = null; channel = null; System.out.println("超時"); } System.out.println("服務器啟動"); } /* 編碼過程 */ public ByteBuffer encode(String str) { return charset.encode(str); } /* 解碼過程 */ public String decode(ByteBuffer bb) { return charset.decode(bb).toString(); } /* 服務器服務方法 */ public void service() throws IOException { if (channel == null || selector == null) return; channel.configureBlocking(false); channel.socket().bind(new InetSocketAddress(port)); // channel.write(ByteBuffer.wrap(new String("aaaa").getBytes())); channel.register(selector, SelectionKey.OP_READ); /** 外循環,已經發生了SelectionKey數目 */ while (selector.select() > 0) { System.out.println("有新channel加入"); /* 得到已經被捕獲了的SelectionKey的集合 */ Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isReadable()) { reveice(key); } if (key.isWritable()) { // send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } } catch (ClosedChannelException cex) { e.printStackTrace(); } } } /* 內循環完 */ } /* 外循環完 */ } /* * 接收 用receive()讀IO 作為服務端一般不需要調用connect(),如果未調用<span style= * "font-family: Arial, Helvetica, sans-serif;">connect()時調</span><span * style="font-family: Arial, Helvetica, sans-serif;" * >用read()\write()讀寫,會報java.nio.channels</span> .NotYetConnectedException * 只有調用connect()之后,才能使用read和write. */ synchronized public void reveice(SelectionKey key) throws IOException { if (key == null) return; // ***用channel.receive()獲取客戶端消息***// // :接收時需要考慮字節長度 DatagramChannel sc = (DatagramChannel) key.channel(); String content = ""; // create buffer with capacity of 48 bytes ByteBuffer buf = ByteBuffer.allocate(1024);// java里一個(utf-8)中文3字節,gbk中文占2個字節 buf.clear(); SocketAddress address = sc.receive(buf); // read into buffer. 返回客戶端的地址信息 String clientAddress = address.toString().replace("/", "").split(":")[0]; String clientPost = address.toString().replace("/", "").split(":")[1]; buf.flip(); // make buffer ready for read while (buf.hasRemaining()) { buf.get(new byte[buf.limit()]);// read 1 byte at a time content += new String(buf.array()); } buf.clear(); // make buffer ready for writing System.out.println("接收:" + content.trim()); // 第一次發;udp采用數據報模式,發送多少次,接收多少次 ByteBuffer buf2 = ByteBuffer.allocate(65507); buf2.clear(); buf2.put( "消息推送內容 abc..UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端UDP是一個非連接的協議,傳輸數據之前源端和終端不建立連接,當它想傳送時就簡單地去抓取來自應用程序的數據,並盡可能快地把它扔到網絡上。在發送端@Q" .getBytes()); buf2.flip(); channel.send(buf2, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 將消息回送給客戶端 // 第二次發 ByteBuffer buf3 = ByteBuffer.allocate(65507); buf3.clear(); buf3.put("任務完成".getBytes()); buf3.flip(); channel.send(buf3, new InetSocketAddress(clientAddress, Integer.parseInt(clientPost))); // 將消息回送給客戶端 } int y = 0; public void send(SelectionKey key) { if (key == null) return; // ByteBuffer buff = (ByteBuffer) key.attachment(); DatagramChannel sc = (DatagramChannel) key.channel(); try { sc.write(ByteBuffer.wrap(new String("aaaa").getBytes())); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("send2() " + (++y)); } /* 發送文件 */ public void sendFile(SelectionKey key) { if (key == null) return; ByteBuffer buff = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); String data = decode(buff); if (data.indexOf("get") == -1) return; String subStr = data.substring(data.indexOf(" "), data.length()); System.out.println("截取之后的字符串是 " + subStr); FileInputStream fileInput = null; try { fileInput = new FileInputStream(subStr); FileChannel fileChannel = fileInput.getChannel(); fileChannel.transferTo(0, fileChannel.size(), sc); fileChannel.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { fileInput.close(); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new DatagramChannelServerDemo().service(); } } //客戶端 package com.dxz.springsession.nio.demo3; import java.nio.channels.*; import java.nio.charset.*; import java.net.*; import java.io.*; import java.util.*; import java.nio.*; public class DatagramChannelClientDemo { // UDP協議客戶端 private String serverIp = "127.0.0.1"; private int port = 9975; // private ServerSocketChannel serverSocketChannel; DatagramChannel channel; private Charset charset = Charset.forName("UTF-8"); private Selector selector = null; public DatagramChannelClientDemo() throws IOException { try { selector = Selector.open(); channel = DatagramChannel.open(); } catch (Exception e) { selector = null; channel = null; System.out.println("超時"); } System.out.println("客戶器啟動"); } /* 編碼過程 */ public ByteBuffer encode(String str) { return charset.encode(str); } /* 解碼過程 */ public String decode(ByteBuffer bb) { return charset.decode(bb).toString(); } /* 服務器服務方法 */ public void service() throws IOException { if (channel == null || selector == null) return; channel.configureBlocking(false); channel.connect(new InetSocketAddress(serverIp, port));// 連接服務端 channel.write(ByteBuffer.wrap(new String("客戶端請求獲取消息").getBytes())); channel.register(selector, SelectionKey.OP_READ); /** 外循環,已經發生了SelectionKey數目 */ while (selector.select() > 0) { /* 得到已經被捕獲了的SelectionKey的集合 */ Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = null; try { key = (SelectionKey) iterator.next(); iterator.remove(); if (key.isReadable()) { reveice(key); } if (key.isWritable()) { // send(key); } } catch (IOException e) { e.printStackTrace(); try { if (key != null) { key.cancel(); key.channel().close(); } } catch (ClosedChannelException cex) { e.printStackTrace(); } } } /* 內循環完 */ } /* 外循環完 */ } // /* // * 接收 用read()讀IO // * */ // synchronized public void reveice2(SelectionKey key) throws IOException { // if (key == null) // return; // // ***用channel.read()獲取消息***// // // :接收時需要考慮字節長度 // DatagramChannel sc = (DatagramChannel) key.channel(); // String content = ""; // // create buffer with capacity of 48 bytes // ByteBuffer buf = ByteBuffer.allocate(3);// java里一個(utf-8)中文3字節,gbk中文占2個字節 // int bytesRead = sc.read(buf); //read into buffer. // // while (bytesRead >0) { // buf.flip(); //make buffer ready for read // while(buf.hasRemaining()){ // buf.get(new byte[buf.limit()]); // read 1 byte at a time // content += new String(buf.array()); // } // buf.clear(); //make buffer ready for writing // bytesRead = sc.read(buf); // } // System.out.println("接收:" + content); // } /* 接收 */ synchronized public void reveice(SelectionKey key) throws IOException { String threadName = Thread.currentThread().getName(); if (key == null) return; try { // ***用channel.receive()獲取消息***// // :接收時需要考慮字節長度 DatagramChannel sc = (DatagramChannel) key.channel(); String content = ""; // 第一次接;udp采用數據報模式,發送多少次,接收多少次 ByteBuffer buf = ByteBuffer.allocate(65507);// java里一個(utf-8)中文3字節,gbk中文占2個字節 buf.clear(); SocketAddress address = sc.receive(buf); // read into buffer. String clientAddress = address.toString().replace("/", "").split(":")[0]; String clientPost = address.toString().replace("/", "").split(":")[1]; System.out.println(threadName + "\t" + address.toString()); buf.flip(); // make buffer ready for read while (buf.hasRemaining()) { buf.get(new byte[buf.limit()]);// read 1 byte at a time byte[] tmp = buf.array(); content += new String(tmp); } buf.clear(); // make buffer ready for writing次 System.out.println(threadName + "接收:" + content.trim()); // 第二次接 content = ""; ByteBuffer buf2 = ByteBuffer.allocate(65507);// java里一個(utf-8)中文3字節,gbk中文占2個字節 buf2.clear(); SocketAddress address2 = sc.receive(buf2); // read into buffer. buf2.flip(); // make buffer ready for read while (buf2.hasRemaining()) { buf2.get(new byte[buf2.limit()]);// read 1 byte at a time byte[] tmp = buf2.array(); content += new String(tmp); } buf2.clear(); // make buffer ready for writing次 System.out.println(threadName + "接收2:" + content.trim()); } catch (PortUnreachableException ex) { System.out.println(threadName + "服務端端口未找到!"); } send(2); } boolean flag = false; public void send(int i) { if (flag) return; try { // channel.write(ByteBuffer.wrap(new // String("客戶端請求獲取消息(第"+i+"次)").getBytes())); // channel.register(selector, SelectionKey.OP_READ ); ByteBuffer buf2 = ByteBuffer.allocate(48); buf2.clear(); buf2.put(("客戶端請求獲取消息(第" + i + "次)").getBytes()); buf2.flip(); channel.write(buf2); channel.register(selector, SelectionKey.OP_READ); // int bytesSent = channel.send(buf2, new // InetSocketAddress(serverIp,port)); // 將消息回送給服務端 } catch (IOException e) { e.printStackTrace(); } flag = true; } int y = 0; public void send(SelectionKey key) { if (key == null) return; // ByteBuffer buff = (ByteBuffer) key.attachment(); DatagramChannel sc = (DatagramChannel) key.channel(); try { sc.write(ByteBuffer.wrap(new String("aaaa").getBytes())); } catch (IOException e1) { e1.printStackTrace(); } System.out.println("send2() " + (++y)); } /* 發送文件 */ public void sendFile(SelectionKey key) { if (key == null) return; ByteBuffer buff = (ByteBuffer) key.attachment(); SocketChannel sc = (SocketChannel) key.channel(); String data = decode(buff); if (data.indexOf("get") == -1) return; String subStr = data.substring(data.indexOf(" "), data.length()); System.out.println("截取之后的字符串是 " + subStr); FileInputStream fileInput = null; try { fileInput = new FileInputStream(subStr); FileChannel fileChannel = fileInput.getChannel(); fileChannel.transferTo(0, fileChannel.size(), sc); fileChannel.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { fileInput.close(); } catch (IOException ex) { ex.printStackTrace(); } } } public static void main(String[] args) throws IOException { new Thread(new Runnable() { public void run() { try { new DatagramChannelClientDemo().service(); } catch (IOException e) { e.printStackTrace(); } } }).start(); // new Thread(new Runnable() { // public void run() { // try { // new DatagramChannelClientDemo().service(); // } catch (IOException e) { // e.printStackTrace(); // } // } // }).start(); } }
Java NIO中的Buffer用於和NIO通道進行交互。如你所知,數據是從通道讀入緩沖區,從緩沖區寫入到通道中的。交互圖如下:
緩沖區本質上是一塊可以寫入數據,然后可以從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。緩沖區實際上是一個容器對象,更直接的說,其實就是一個數組,在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的; 在寫入數據時,它也是寫入到緩沖區中的;任何時候訪問 NIO 中的數據,都是將它放到緩沖區中。而在面向流I/O系統中,所有數據都是直接寫入或者直接將數據讀取到Stream對象中。
在NIO中,所有的緩沖區類型都繼承於抽象類Buffer,最常用的就是ByteBuffer,對於Java中的基本類型,基本都有一個具體Buffer類型與之相對應,它們之間的繼承關系如下圖所示:
下面是NIO Buffer相關的話題列表:
- Buffer的基本用法
- Buffer的capacity,position和limit
- Buffer的類型
- Buffer的分配
- 向Buffer中寫數據
- flip()方法
- 從Buffer中讀取數據
- clear()與compact()方法
- mark()與reset()方法
- equals()與compareTo()方法
Buffer的基本用法
使用Buffer讀寫數據一般遵循以下四個步驟:
- 寫入數據到Buffer
- 調用
flip()
方法 - 從Buffer中讀取數據
- 調用
clear()
方法或者compact()
方法
當向buffer寫入數據時,buffer會記錄下寫了多少數據。一旦要讀取數據,需要通過flip()方法將Buffer從寫模式切換到讀模式。在讀模式下,可以讀取之前寫入到buffer的所有數據。
一旦讀完了所有的數據,就需要清空緩沖區,讓它可以再次被寫入。有兩種方式能清空緩沖區:調用clear()或compact()方法。clear()方法會清空整個緩沖區。compact()方法只會清除已經讀過的數據。任何未讀的數據都被移到緩沖區的起始處,新寫入的數據將放到緩沖區未讀數據的后面。
下面是一個使用Buffer的例子:
01 |
RandomAccessFile aFile = new RandomAccessFile( "data/nio-data.txt" , "rw" ); |
02 |
FileChannel inChannel = aFile.getChannel(); |
03 |
04 |
//create buffer with capacity of 48 bytes |
05 |
ByteBuffer buf = ByteBuffer.allocate( 48 ); |
06 |
07 |
int bytesRead = inChannel.read(buf); //read into buffer. |
08 |
while (bytesRead != - 1 ) { |
09 |
10 |
buf.flip(); //make buffer ready for read |
11 |
12 |
while (buf.hasRemaining()){ |
13 |
System.out.print(( char ) buf.get()); // read 1 byte at a time |
14 |
} |
15 |
16 |
buf.clear(); //make buffer ready for writing |
17 |
bytesRead = inChannel.read(buf); |
18 |
} |
19 |
aFile.close(); |
示例2:
下面是一個簡單的使用IntBuffer的例子:
package com.dxz.nio; import java.nio.IntBuffer; public class TestIntBuffer { public static void main(String[] args) { // 分配新的int緩沖區,參數為緩沖區容量 // 新緩沖區的當前位置將為零,其界限(限制位置)將為其容量。它將具有一個底層實現數組,其數組偏移量將為零。 IntBuffer buffer = IntBuffer.allocate(8); for (int i = 0; i < buffer.capacity(); ++i) { int j = 2 * (i + 1); // 將給定整數寫入此緩沖區的當前位置,當前位置遞增 buffer.put(j); } // 重設此緩沖區,將限制設置為當前位置,然后將當前位置設置為0 buffer.flip(); // 查看在當前位置和限制位置之間是否有元素 while (buffer.hasRemaining()) { // 讀取此緩沖區當前位置的整數,然后當前位置遞增 int j = buffer.get(); System.out.print(j + " "); } } }
結果:
2 4 6 8 10 12 14 16
Buffer的capacity,position和limit
緩沖區本質上是一塊可以寫入數據,然后可以從中讀取數據的內存。這塊內存被包裝成NIO Buffer對象,並提供了一組方法,用來方便的訪問該塊內存。
為了理解Buffer的工作原理,需要熟悉它的三個屬性:
- capacity
- position
- limit
position和limit的含義取決於Buffer處在讀模式還是寫模式。不管Buffer處在什么模式,capacity的含義總是一樣的。
這里有一個關於capacity,position和limit在讀寫模式中的說明,詳細的解釋在插圖后面。
capacity
作為一個內存塊,Buffer有一個固定的大小值,也叫“capacity”.你只能往里寫capacity個byte、long,char等類型。一旦Buffer滿了,需要將其清空(通過讀數據或者清除數據)才能繼續寫數據往里寫數據。
position
當你寫數據到Buffer中時,position表示當前的位置。初始的position值為0.當一個byte、long等數據寫到Buffer后, position會向前移動到下一個可插入數據的Buffer單元。position最大可為capacity – 1.
當讀取數據時,也是從某個特定位置讀。當將Buffer從寫模式切換到讀模式,position會被重置為0. 當從Buffer的position處讀取數據時,position向前移動到下一個可讀的位置。
limit
在寫模式下,Buffer的limit表示你最多能往Buffer里寫多少數據。 寫模式下,limit等於Buffer的capacity。
當切換Buffer到讀模式時, limit表示你最多能讀到多少數據。因此,當切換Buffer到讀模式時,limit會被設置成寫模式下的position值。換句話說,你能讀到之前寫入的所有數據(limit被設置成已寫數據的數量,這個值在寫模式下就是position)
Buffer的類型
Java NIO 有以下Buffer類型
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
p<>
如你所見,這些Buffer類型代表了不同的數據類型。換句話說,就是可以通過char,short,int,long,float 或 double類型來操作緩沖區中的字節。
MappedByteBuffer 有些特別,在涉及它的專門章節中再講。
Buffer的分配
要想獲得一個Buffer對象首先要進行分配。 每一個Buffer類都有一個allocate方法。下面是一個分配48字節capacity的ByteBuffer的例子。
1 |
ByteBuffer buf = ByteBuffer.allocate( 48 ); |
這是分配一個可存儲1024個字符的CharBuffer:
1 |
CharBuffer buf = CharBuffer.allocate( 1024 ); |
向Buffer中寫數據
寫數據到Buffer有兩種方式:
- 從Channel寫到Buffer。
- 通過Buffer的put()方法寫到Buffer里。
從Channel寫到Buffer的例子
1 |
int bytesRead = inChannel.read(buf); //read into buffer. |
通過put方法寫Buffer的例子:
1 |
buf.put( 127 ); |
put方法有很多版本,允許你以不同的方式把數據寫入到Buffer中。例如, 寫到一個指定的位置,或者把一個字節數組寫入到Buffer。 更多Buffer實現的細節參考JavaDoc。
flip()方法
flip英 [flɪp] 美 [flɪp] 及物動詞 輕彈,輕擊; 按(開關); 快速翻轉; 急揮
flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成之前position的值。
換句話說,position現在用於標記讀的位置,limit表示之前寫進了多少個byte、char等 —— 現在能讀取多少個byte、char等。
從Buffer中讀取數據
從Buffer中讀取數據有兩種方式:
- 從Buffer讀取數據到Channel。
- 使用get()方法從Buffer中讀取數據。
從Buffer讀取數據到Channel的例子:
//read from buffer into channel. int bytesWritten = inChannel.write(buf);
使用get()方法從Buffer中讀取數據的例子
byte aByte = buf.get();
get方法有很多版本,允許你以不同的方式從Buffer中讀取數據。例如,從指定position讀取,或者從Buffer中讀取數據到字節數組。更多Buffer實現的細節參考JavaDoc。
rewind()方法
Buffer.rewind()將position設回0,所以你可以重讀Buffer中的所有數據。limit保持不變,仍然表示能從Buffer中讀取多少個元素(byte、char等)。
clear()與compact()方法
一旦讀完Buffer中的數據,需要讓Buffer准備好再次被寫入。可以通過clear()或compact()方法來完成。
如果調用的是clear()方法,position將被設回0,limit被設置成 capacity的值。換句話說,Buffer 被清空了。Buffer中的數據並未清除,只是這些標記告訴我們可以從哪里開始往Buffer里寫數據。
如果Buffer中有一些未讀的數據,調用clear()方法,數據將“被遺忘”,意味着不再有任何標記會告訴你哪些數據被讀過,哪些還沒有。
如果Buffer中仍有未讀的數據,且后續還需要這些數據,但是此時想要先先寫些數據,那么使用compact()方法。
compact()方法將所有未讀的數據拷貝到Buffer起始處。然后將position設到最后一個未讀元素正后面。limit屬性依然像clear()方法一樣,設置成capacity。現在Buffer准備好寫數據了,但是不會覆蓋未讀的數據。
mark()與reset()方法
通過調用Buffer.mark()方法,可以標記Buffer中的一個特定position。之后可以通過調用Buffer.reset()方法恢復到這個position。例如:
1 |
buffer.mark(); |
2 |
3 |
//call buffer.get() a couple of times, e.g. during parsing. |
4 |
5 |
buffer.reset(); //set position back to mark. |
equals()與compareTo()方法
可以使用equals()和compareTo()方法兩個Buffer。
equals()
當滿足下列條件時,表示兩個Buffer相等:
- 有相同的類型(byte、char、int等)。
- Buffer中剩余的byte、char等的個數相等。
- Buffer中所有剩余的byte、char等都相同。
如你所見,equals只是比較Buffer的一部分,不是每一個在它里面的元素都比較。實際上,它只比較Buffer中的剩余元素。
compareTo()方法
compareTo()方法比較兩個Buffer的剩余元素(byte、char等), 如果滿足下列條件,則認為一個Buffer“小於”另一個Buffer:
- 第一個不相等的元素小於另一個Buffer中對應的元素 。
- 所有元素都相等,但第一個Buffer比另一個先耗盡(第一個Buffer的元素個數比另一個少)。
緩沖區分片
在NIO中,除了可以分配或者包裝一個緩沖區對象外,還可以根據現有的緩沖區對象來創建一個子緩沖區,即在現有緩沖區上切出一片來作為一個新的緩沖區,但現有的緩沖區與創建的子緩沖區在底層數組層面上是數據共享的,也就是說,子緩沖區相當於是現有緩沖區的一個視圖窗口。調用slice()方法可以創建一個子緩沖區,讓我們通過例子來看一下:
package com.dxz.nio; import java.nio.ByteBuffer; public class BufferDemo1 { static public void main(String args[]) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(10); // 緩沖區中的數據0-9 for (int i = 0; i < buffer.capacity(); ++i) { buffer.put((byte) i); } // 創建子緩沖區 buffer.position(3); buffer.limit(7); ByteBuffer slice = buffer.slice(); // 改變子緩沖區的內容 for (int i = 0; i < slice.capacity(); ++i) { byte b = slice.get(i); b *= 10; slice.put(i, b); } buffer.position(0); buffer.limit(buffer.capacity()); while (buffer.remaining() > 0) { System.out.println(buffer.get()); } } }
結果:
0 1 2 30 40 50 60 7 8 9
只讀緩沖區
只讀緩沖區非常簡單,可以讀取它們,但是不能向它們寫入數據。可以通過調用緩沖區的asReadOnlyBuffer()方法,將任何常規緩沖區轉 換為只讀緩沖區,這個方法返回一個與原緩沖區完全相同的緩沖區,並與原緩沖區共享數據,只不過它是只讀的。如果原緩沖區的內容發生了變化,只讀緩沖區的內容也隨之發生變化:
package com.dxz.nio; import java.nio.ByteBuffer; public class BufferDemo2 { static public void main(String args[]) throws Exception { ByteBuffer buffer = ByteBuffer.allocate(10); // 緩沖區中的數據0-9 for (int i = 0; i < buffer.capacity(); ++i) { buffer.put((byte) i); } // 創建只讀緩沖區 ByteBuffer readonly = buffer.asReadOnlyBuffer(); // 改變原緩沖區的內容 for (int i = 0; i < buffer.capacity(); ++i) { byte b = buffer.get(i); b *= 10; buffer.put(i, b); } readonly.position(0); readonly.limit(buffer.capacity()); // 只讀緩沖區的內容也隨之改變 while (readonly.remaining() > 0) { System.out.println(readonly.get()); } } }
結果:
0 10 20 30 40 50 60 70 80 90
如果嘗試修改只讀緩沖區的內容,則會報ReadOnlyBufferException異常。只讀緩沖區對於保護數據很有用。在將緩沖區傳遞給某個 對象的方法時,無法知道這個方法是否會修改緩沖區中的數據。創建一個只讀的緩沖區可以保證該緩沖區不會被修改。只可以把常規緩沖區轉換為只讀緩沖區,而不能將只讀的緩沖區轉換為可寫的緩沖區。
直接緩沖區
直接緩沖區是為加快I/O速度,使用一種特殊方式為其分配內存的緩沖區,JDK文檔中的描述為:給定一個直接字節緩沖區,Java虛擬機將盡最大努 力直接對它執行本機I/O操作。也就是說,它會在每一次調用底層操作系統的本機I/O操作之前(或之后),嘗試避免將緩沖區的內容拷貝到一個中間緩沖區中 或者從一個中間緩沖區中拷貝數據。要分配直接緩沖區,需要調用allocateDirect()方法,而不是allocate()方法,使用方式與普通緩沖區並無區別,如下面的拷貝文件示例:
package com.dxz.nio; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.*; public class BufferDemo3 { static public void main(String args[]) throws Exception { String infile = "e:\\logs\\test.txt"; FileInputStream fin = new FileInputStream(infile); FileChannel fcin = fin.getChannel(); String outfile = String.format("e:\\logs\\testcopy.txt"); FileOutputStream fout = new FileOutputStream(outfile); FileChannel fcout = fout.getChannel(); // 使用allocateDirect,而不是allocate ByteBuffer buffer = ByteBuffer.allocateDirect(1024); while (true) { buffer.clear(); int r = fcin.read(buffer); if (r == -1) { break; } buffer.flip(); fcout.write(buffer); } } }
內存映射文件I/O
內存映射文件I/O是一種讀和寫文件數據的方法,它可以比常規的基於流或者基於通道的I/O快的多。內存映射文件I/O是通過使文件中的數據出現為 內存數組的內容來完成的,這其初聽起來似乎不過就是將整個文件讀到內存中,但是事實上並不是這樣。一般來說,只有文件中實際讀取或者寫入的部分才會映射到內存中。如下面的示例代碼:
package com.dxz.nio; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.*; public class BufferDemo4 { static private final int start = 0; static private final int size = 1024; static public void main(String args[]) throws Exception { RandomAccessFile raf = new RandomAccessFile("e:\\logs\\test.txt", "rw"); FileChannel fc = raf.getChannel(); MappedByteBuffer mbb = fc.map(FileChannel.MapMode.READ_WRITE, start, size); mbb.put(0, (byte) 97); mbb.put(1023, (byte) 122); raf.close(); } }
參考:http://blog.csdn.net/wuxianglong/article/details/6612263