首先來看一個傳統簡單的網絡通信案例,該案例是基於同步阻塞的I/O,服務端代碼如下
public class Server extends Thread{
private ServerSocket serverSocket;
public Server(int port) throws IOException
{
serverSocket = new ServerSocket(port, 1000); //端口號,以及運行連接可以保存的最長隊列
serverSocket.setSoTimeout(1000000);
}
public void run()
{
while(true)
{
try
{
System.out.println("等待遠程連接,端口號為:" + serverSocket.getLocalPort() + "...");
Socket server = serverSocket.accept();
System.out.println("遠程主機地址:" + server.getRemoteSocketAddress());
DataInputStream in = new DataInputStream(server.getInputStream());
Thread.sleep(2000);
System.out.println(in.readUTF());
DataOutputStream out = new DataOutputStream(server.getOutputStream());
out.writeUTF("0101, 主機收到:" + server.getLocalSocketAddress() + "\nGoodbye!");
server.close();
}catch(SocketTimeoutException s)
{
System.out.println("Socket timed out!");
break;
}catch(IOException e)
{
e.printStackTrace();
break;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String [] args) throws IOException {
Thread t = new Server(6666);
t.run();
}
}
客戶端代碼如下:
public class Client implements Runnable{
private int id;
public Client(int id){
this.id = id;
}
public static void main(String[] args) throws InterruptedException, IOException {
ExecutorService es = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++) {
es.execute(new Client(i+1));
}
es.shutdown();
}
@Override
public void run() {
Socket client = null;
try {
client = new Socket("127.0.0.1", 6666);
OutputStream outToServer = client.getOutputStream();
DataOutputStream out = new DataOutputStream(outToServer);
out.writeUTF("Hello, I am the " + id + "-client and I come from " + client.getLocalSocketAddress());
InputStream inFromServer = client.getInputStream();
DataInputStream in = new DataInputStream(inFromServer);
System.out.println("client-" + id + " : response : " + in.readUTF());
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
看到當假設100個客戶端同時連接服務器的時候,單線程下服務端對接收的請求只會一個一個去處理,導致很多客戶端請求被阻塞,處於等待情況,這個時候,通常的服務端優化的解決辦法是開啟利用線程池開啟多個線程去處理。如下:
public class BlockServer implements Runnable{
private Socket server;
public BlockServer(Socket server){
this.server = server;
}
@Override
public void run() {
DataInputStream in = null;
DataOutputStream out = null;
try {
in = new DataInputStream(server.getInputStream());
System.out.println(server.getInetAddress() + ":" + in.readUTF());
out = new DataOutputStream(server.getOutputStream());
Thread.sleep(2000);
out.writeUTF("server receive your message." );
in.close();
out.close();
server.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
ExecutorService es = Executors.newFixedThreadPool(100);
ServerSocket serverSocket = new ServerSocket(6666, 1000);
System.out.println("等待遠程連接,端口號為:" + serverSocket.getLocalPort() + "...");
while (!Thread.currentThread().isInterrupted()){
Socket socket = serverSocket.accept();
es.execute(new BlockServer(socket));
}
es.shutdown();
}
}
兩種結果的輸出可以看出基於多線程的網絡通信效率遠遠高於單線程。不過多線程通信有一個很大的缺陷——嚴重依賴線程,通常在Linux環境下並沒有線程的概念,此時,線程的本質就是進程了,此時線程的創建銷毀,以及線程(上下文)的切換將導致很大的開銷,因此,基於這些原因,導致了線程資源不能隨便的使用,當我們面對大量的客戶端連接服務器的時候,並不能一味的去瘋狂創建線程。此時,NIO就可以幫助我們解決此類問題。
2. 多路復用的NIO(New IO)——同步非阻塞
BIO模型中,因為在進行IO操作的時候,程序無法知道數據到底准備好沒有,能否可讀,只能一直干等着,而且即便我們可以猜到什么時候數據准備好了,但我們也沒有辦法通過socket.read()或者socket.write()函數去返回,而NIO卻可以通過I/O復用技術把這些連接請求注冊到多路復用器Selector中去,用一個線程去監聽和處理多個SocketChannel上的事件。
BufferByte和Channel
在NIO中並不是以流的方式來處理數據的,而是以buffer緩沖區和Channel管道(全雙工)配合使用來處理數據。這里可以用鐵路交通來類比兩者的關系,假設現在有一批貨物要從北京運到上海且用鐵路運輸,則要有一條從北京到上海的鐵路,以及一列運輸貨物的火車,這里貨物就是客戶端和服務端的交流的信息,Channel管道則是從北京到上海的鐵路,而buffer緩沖區則是這列運輸火車。 其中Channel分為四類:
-
FileChannel: 用於文件IO,支持阻塞模式。可以通過InputStream/OutputStream/RandomAccssFile去獲取該對象。該Channel的用法在后面的文件傳輸示例代碼中有展示,
-
DatagramChannel: 用於UDP通信。
-
SocketChannel: 用於TCP的客戶端通信。客戶端通過SocketChannel.open()獲得該對象。
-
ServerSocketChannel: 用於TCP的服務端通信。服務端通過ServerSocketChannel.open()獲得該對象。
服務端ServerSocketChannel可以通過調用accept方法返回新建立的SocketChannel對象,通過該對象調用wriet/read(ByteBuffer)來將數據寫入通道或從通道中讀取數據。而ByteBuffer的用法,主要涉及到幾個變量:capacity,position,limit和mark,具體含義如下代碼所示,如果要讀取buffer中的數據必須調用flip方法,通過改變position和limit的值,來讀取兩個下標之間數據。如下所示:
public class Test1 {
public static void main(String[] args) {
// 創建一個緩沖區
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 看一下初始時4個核心變量的值
//limit 緩沖區里的數據的總數
System.out.println("初始時-->limit--->"+byteBuffer.limit());
//position 下一個要被讀或寫的元素的位置
System.out.println("初始時-->position--->"+byteBuffer.position());
//capacity 緩沖區能夠容納的數據元素的最大數量。
System.out.println("初始時-->capacity--->"+byteBuffer.capacity());
//mark 一個備忘位置。用於記錄上一次讀寫的位置。
System.out.println("初始時-->mark--->" + byteBuffer.mark());
System.out.println("--------------------------------------");
// 添加一些數據到緩沖區中
String s = "testing.....";
byteBuffer.put(s.getBytes());
// 看一下初始時4個核心變量的值
System.out.println("put完之后-->limit--->"+byteBuffer.limit());
System.out.println("put完之后-->position--->"+byteBuffer.position());
System.out.println("put完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("put完之后-->mark--->" + byteBuffer.mark());
//讀數據前要調用,可以指示讀數據的操作從position讀到limit之間的數據
byteBuffer.flip();
System.out.println("--------------------------------------");
System.out.println("flip完之后-->limit--->"+byteBuffer.limit());
System.out.println("flip完之后-->position--->"+byteBuffer.position());
System.out.println("flip完之后-->capacity--->"+byteBuffer.capacity());
System.out.println("flip完之后-->mark--->" + byteBuffer.mark());
// 創建一個limit()大小的字節數組(因為就只有limit這么多個數據可讀)
byte[] bytes = new byte[byteBuffer.limit()];
// 將讀取的數據裝進我們的字節數組中
byteBuffer.get(bytes);
// 輸出數據
System.out.println(new String(bytes, 0, bytes.length));
}
}
/*output
初始時-->limit--->1024
初始時-->position--->0
初始時-->capacity--->1024
初始時-->mark--->java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024]
--------------------------------------
put完之后-->limit--->1024
put完之后-->position--->12
put完之后-->capacity--->1024
put完之后-->mark--->java.nio.HeapByteBuffer[pos=12 lim=1024 cap=1024]
--------------------------------------
flip完之后-->limit--->12
flip完之后-->position--->0
flip完之后-->capacity--->1024
flip完之后-->mark--->java.nio.HeapByteBuffer[pos=0 lim=12 cap=1024]
testing.....
*/
一些用NIO模型實現的簡單demo,可以查看[github地址],有文件傳輸以及多客戶端廣播的demo。
NIO是Java SE 1.4版,為了提升網絡傳輸性能而設計的新版本的IO,注意,這里的優化主要針對的是網絡通信方面的socket的優化。如下程序可以測試針對本地文件IO,兩者的異同。
public class FileTransformCompare {
//傳統方式
private long transferFile(File source, File dest) throws IOException {
long startTime = System.currentTimeMillis();
if(!dest.exists())
dest.createNewFile();
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(source));
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dest));
//將數據從源讀到目的文件
byte[] bytes = new byte[1024];
int len = 0;
while ((len = bis.read(bytes))>0){
bos.write(bytes, 0, len);
}
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
//NIO方式
private long transferFileFileWithNio(File source, File dest) throws IOException {
long startTime = System.currentTimeMillis();
if(!dest.exists())
dest.createNewFile();
RandomAccessFile sourceRAF = new RandomAccessFile(source, "rw");
RandomAccessFile destRAF = new RandomAccessFile(dest, "rw");
FileChannel readChannel = sourceRAF.getChannel();
FileChannel writeChannel = destRAF.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024*1024); //1M緩沖區
while (readChannel.read(byteBuffer) > 0){
byteBuffer.flip();
writeChannel.write(byteBuffer);
byteBuffer.clear();
}
writeChannel.close();
readChannel.close();
long endTime = System.currentTimeMillis();
return endTime - startTime;
}
public static void main(String[] args) throws IOException {
FileTransformCompare ftc = new FileTransformCompare();
// File source = new File("F:\\apache-maven-3.6.2-bin.tar.gz");
// File dest1 = new File("G:\\迅雷下載\\apache1.tar.gz");
// File dest2 = new File("G:\\迅雷下載\\apache2.tar.gz");
File source = new File("G:\\迅雷下載\\影視\\戰爭之王.BD1280超清國英雙語中英雙字.mp4");
File dest1 = new File("G:\\迅雷下載\\test1.mp4");
File dest2 = new File("G:\\迅雷下載\\test2.mp4");
long time = ftc.transferFile(source, dest1);
System.out.println("普通字節流時間: " + time);
long timeNio = ftc.transferFileFileWithNio(source, dest2);
System.out.println("NIO時間: " + timeNio);
}
}
/*
當文件的大小較小的時候,NIO會比傳統IO好一點,但是文件較大的時候,則NIO不如傳統IO
下面結果是復制一部2.6G的電影的結果:
普通字節流時間: 79745
NIO時間: 80160
*/
也就是說,通常談到NIO的時候,只會針對網絡編程來說。
3. AIO 異步非阻塞I/O
NIO的非阻塞模式采用多路復用器(Selector),用一個線程不斷的去輪詢所有的通道,一旦某個通道有數據可讀(或可寫),則表示該通道數據以及准備好(通道可寫),那么這個通道就會被選擇出來,對它進行讀寫操作,但是要注意的是在執行讀寫操作的線程本身就是堵塞的,要等待該對該通道的數據操作完成,線程才可以去操作其他通道。
而AIO(Asynchronous IO)則是由操作系統在IO操作完成之后再去通知調用者,這就意味着執行程序的線程再發起讀寫操作的時候總是立即返回的,這個時候可以去做其他的事情,當底層讀寫操作完成的時候,將由操作系統通過調用相應的回調函數將已經讀到的函數交給程序進行處理(寫入過程一樣)。正因如此,會導致不同的操作系統上的性能表現會不同,在Linux系統中AIO的底層系統實現是epoll函數(NIO的底層實現是select函數或者poll函數——兩者的區別在於能存儲文件描述符的數量有關,因為select存放文件描述符的載體是一個數組,而poll則是用鏈表去存儲)
AIO主要針對一些異步的IO操作,操作系統執行完讀寫事件后就會調用程序的回調函數—— java.util.concurrent.Future對象和java.nio.channels.CompletionHandler,而Future是基於CompletionHandler的封裝。因為該過數據的讀寫都是由操作系統負責,則回調函數只需要負責准備發送數據或者解析讀取的數據即可。
主要的API如下
1. AsynchronousChannelGroup——異步通信組,異步通道在處理 I/O請求時,需要使用一個AsynchronousChannelGroup類,該類的對象表示的是一個異步通道的分組,每一個分組都有一個線程池與之對應,需要使用AsynchronousChannelGroup類的靜態工廠方法withThreadPool(ExectorService es); withFixedThreadPool();withCachedThreadPool()設置線程池。
AsynchronousServerSocketChannel: 異步版的ServerSocketChannel,其accpet方法有兩種:
//第一種
AsynchronousServerSocketChannel server
= AsynchronousServerSocketChannel.open().bind(null);
Future<AsynchronousSocketChannel> future = server.accept();
future.isDone(); //返回對象來查詢操作的狀態
future.isCancelled(); //明確檢查操作是否被取消,如果操作在正常完成之前被取消,則它返回true
future.cancel(true); //取消操作
AsynchronousSocketChannel client= future.get(); //使用get()方法,該方法將阻塞等待結果的返回:
AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS); //也可以設置阻塞時間
//第二種
AsynchronousServerSocketChannel listener
= AsynchronousServerSocketChannel.open().bind(null);
listener.accept(
attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
public void completed(
AsynchronousSocketChannel client, Object attachment) {
// do whatever with client
}
public void failed(Throwable exc, Object attachment) {
// handle failure
}
});
2.AsynchronousSocketChannel異步版的SocketChannel,提供了兩種的read()和write()方法。
-
-
void read(ByteBuffer buffer, A attachment, CompletionHandler handler);
-
void write(ByteBuffer buffer, A attachment, CompletionHandler handler);
-
Future<Integer> read(ByteBuffer buffer);
-
Future<Integer> write(ByteBuffer buffer);
-
3. CompletionHandler的回調接口,當IO操作完成的時候,即會調用這兩個方法:
-
void complete(V result, A attachment)
當IO操作順利完成的時候被調用,對於accept方法返回Socket通道,對於read/write操作,則返回本次寫入或讀取的字節數。
-
void failed(Throwable exe, A attachment)
當IO操作失敗的時候被調用,建議在此方法中對連接等資源進行關閉和釋放。
關於AIO的demo可以參照github地址上的代碼,實現一個前台輸入表達式,后端計算后返回結果的功能。
