本章主要介紹Socket的基本概念,傳統的同步阻塞式I/O編程,偽異步IO實現,學習NIO的同步非阻塞編程和NIO2.0(AIO)異步非阻塞編程。
目前為止,Java共支持3種網絡編程模型:BIO、NIO、AIO: Java BIO : 同步並阻塞,服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善。 Java NIO : 同步非阻塞,服務器實現模式為一個請求一個線程,即客戶端發送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有I/O請求時才啟動一個線程進行處理。 Java AIO(NIO.2) : 異步非阻塞,服務器實現模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啟動線程進行處理。 BIO、NIO、AIO適用場景分析: BIO方式適用於連接數目比較小且固定的架構,這種方式對服務器資源要求比較高,並發局限於應用中,JDK1.4以前的唯一選擇,但程序直觀簡單易理解。 NIO方式適用於連接數目多且連接比較短(輕操作)的架構,比如聊天服務器,並發局限於應用中,編程比較復雜,JDK1.4開始支持。 AIO方式使用於連接數目多且連接比較長(重操作)的架構,比如相冊服務器,充分調用OS參與並發操作,編程比較復雜,JDK7開始支持。
一:基本概念
Socket又被稱為 "套接字" ,應用程序通常都是通過 "套接字" 向網絡發出請求和接收請求。Socket和serverSocket類位於java.net包中。ServerSocket用於(Server)服務端,Socket用於
(Client)客戶端。當服務端和客戶端建立連接后。兩端都會產生一個Socket實例,並且是平等的。不管是Socket還是ServerSocket。都是通過操作SocketImpl和其子類完成相關功能。
連接過程四步驟: 1:服務器監聽 2:客戶端請求 3:服務端連接確認 4:客戶端連接確認
二:傳統同步阻塞IO實現
服務端ServerSocket:
1 final static int PROT = 8765; 2 3 ServerSocket server = null; 4 5 server = new ServerSocket(PROT); 6 7 Socket socket = server.accept(); //進行阻塞 8 9 new Thread(new ServerHandler(socket)).start(); //服務端運行,等待客戶端連接
客戶端Socket:
1 final static String ADDRESS = "127.0.0.1"; 2 3 final static int PORT = 8765; 4 5 Socket socket = null; 6 7 socket = new Socket(ADDRESS, PORT); //進行連接
服務端處理器ServerHandler:
1 // 實現Runnable 2 3 private Socket socket ; 4 5 public ServerHandler(Socket socket){ 6 this.socket = socket; 7 } 8 9 //重寫run方法: 10 11 @Override 12 public void run() { 13 BufferedReader in = null; 14 PrintWriter out = null; 15 try { 16 in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); 17 out = new PrintWriter(this.socket.getOutputStream(), true); 18 String body = null; 19 while(true){ 20 body = in.readLine(); 21 if(body == null) break; 22 System.out.println("Server :" + body); 23 out.println("服務器端回送響的應數據."); 24 } 25 } catch (Exception e) { 26 e.printStackTrace(); 27 28 }
三:偽異步實現:
原理:傳統的是直接new Thread()來進行運行任務,現在我們直接通過自定義線程池來實現偽異步。
1 //之前服務端運行: 2 3 //新建一個線程執行客戶端的任務 4 new Thread(new ServerHandler(socket)).start();
1 // 現在偽異步: 2 3 HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000); 4 while(true){ 5 socket = server.accept(); 6 executorPool.execute(new ServerHandler(socket)); 7 }
自定義線程池:HandlerExecutorPool
1 public class HandlerExecutorPool { 2 3 private ExecutorService executor; 4 public HandlerExecutorPool(int maxPoolSize, int queueSize){ 5 this.executor = new ThreadPoolExecutor( 6 Runtime.getRuntime().availableProcessors(), 7 maxPoolSize, 8 120L, 9 TimeUnit.SECONDS, 10 new ArrayBlockingQueue<Runnable>(queueSize)); 11 } 12 13 public void execute(Runnable task){ 14 this.executor.execute(task); 15 } 16 17 }
四:NIO(非阻塞編程)
傳統IO和NIO的差異:IO是同步阻塞 NIO是同步非阻塞。 在jdk1.7以后,NIO升級(NIO2.0)AIO,實現了異步非阻塞
傳統的IO(BIO)阻塞:在網絡應用程序獲取網絡數據時,如果網絡傳輸數據很慢,那么程序就一直等着,直到傳輸完畢為止。
NIO:無需等待,直接獲取數據,在數據沒有傳輸完畢時,不獲取數據,數據暫時放在緩沖區,等傳輸完畢以后,緩沖區發出通知,客戶端獲取數據,實現不等待。
基本概念:
Buffer(緩沖區) channel(管道、通道) Selector(選擇器,多路復用器)
Buffer注意事項:每次在put(),for循環 之后都要進行flip()復位。要復位下標
Buffer常用方法:
flip()復位:因為buffer和游標類似,每次新增數據之后,它的下標都會自增,如果用for循環遍歷時,他只會遍歷沒有填充的下標的值,所以要用filp()方法復
位。
wrap(數組):wrap方法會包裹一個數組: 一般這種用法不會先初始化緩存對象的長度,因為沒有意義,最后還會被wrap所包裹的數組覆蓋掉
duplicate(): buffer復制的方法 。一個buffer數據復制給另外一個buffer數組
position(index):設置buffer可讀的下標的位置
remaining() :返回buffer可讀的長度
get(數組):把buffer數據復制給數組
Channel管道:雙向
兩大類: 1:網絡讀寫類(SelectableChannel) 2:文件操作類(FileChannel)
我們要使用的SocketChannel和ServerSocketChannel就在SelectableChannel類里面
Selector:選擇器(多路復用器)
原理:Selector不斷的注冊輪詢注冊在其上的通道(SocketChannel),如果某一個通道發生了讀寫操作,這個通道就處於就緒狀態,會被Selector輪詢出
來。然后通過SelectionKey就可以獲取到就緒的Channel集合,從而進行后續操作。
四大狀態:連接狀態 阻塞狀態 可讀狀態 可寫狀態
下面來看一下程序中是怎么通過這些類庫實現Socket功能。
首先介紹一下幾個輔助類
輔助類SerializableUtil,這個類用來把java對象序列化成字節數組,或者把字節數組反序列化成java對象。
輔助類MyRequestObject和MyResponseObject,這兩個類是普通的java對象,實現了Serializable接口。MyRequestObject類是Client發出的請求,MyResponseObject是Server端作出的響應。
下面主要看一下Server端的代碼,其中有一些英文注釋對理解代碼很有幫助,注釋主要是來源jdk的文檔和例子,這里就沒有再翻譯
下面是Client的代碼,代碼比較簡單就是啟動了100個線程來訪問Server
最后測試上面的代碼,首先運行Server類,然后運行Client類,就可以分別在Server端和Client端控制台看到發送或接收到的MyRequestObject或MyResponseObject對象了。
代碼實現:
注:轉自http://blog.csdn.net/kongxx/article/details/7288896
五:NIO2.0(AIO) 異步非阻塞
AIO編程:在NIO基礎上引入異步的通到的概念,實現了異步文件和異步套字節,jdk1.7以后升級。
基本概念
1 AsynchronousChannel:支持異步通道,包括服務端AsynchronousServerSocketChannel和客戶端AsynchronousSocketChannel等實現。 2 CompletionHandler:用戶處理器。定義了一個用戶處理就緒事件的接口,由用戶自己實現,異步io的數據就緒后回調該處理器消費或處理數據。 3 AsynchronousChannelGroup:一個用於資源共享的異步通道集合。處理IO事件和分配給CompletionHandler。(具體這塊還沒細看代碼,后續再分析這塊)
所謂AIO,就是異步非阻塞IO,是NIO的升級版本,也就是NIO2.0版本,但是與NIO不同,當進行讀寫操作時,只須直接調用API的read或write方法即可。這兩種方法均為異步
的,對於讀操作而言,當有流可讀取時,操作系統會將可讀的流傳入read方法的緩沖區,並通知應用程序;對於寫操作而言,當操作系統將write方法傳遞的流寫入完畢時,操作
系統主動通知應用程序。 即可以理解為,read/write方法都是異步的,完成后會主動調用回調函數。
具體代碼實現:
1 // Server類: 2 3 4 5 /** 6 * 7 *類描述:AIO 服務端 8 *@author: 豪 9 *@date: 日期:2017-5-24 時間:上午10:48:12 10 *@version 1.0 11 */ 12 public class Server { 13 //線程池 14 private ExecutorService executorService; 15 //線程組 16 private AsynchronousChannelGroup threadGroup; 17 //服務器通道 18 public AsynchronousServerSocketChannel assc; 19 20 public Server(int port){ 21 try { 22 //創建一個緩存池 23 executorService = Executors.newCachedThreadPool(); 24 //創建線程組 25 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); 26 //創建服務器通道 27 assc = AsynchronousServerSocketChannel.open(threadGroup); 28 //進行綁定 29 assc.bind(new InetSocketAddress(port)); 30 31 System.out.println("server start , port : " + port); 32 //進行阻塞 33 assc.accept(this, new ServerCompletionHandler()); 34 //一直阻塞 不讓服務器停止 35 Thread.sleep(Integer.MAX_VALUE); 36 37 } catch (Exception e) { 38 e.printStackTrace(); 39 } 40 } 41 42 public static void main(String[] args) { 43 Server server = new Server(8765); 44 } 45 46 }
1 //ServerCompletionHandler類 2 3 4 /** 5 * 6 *類描述:服務端處理類 所有的處理都在此類進行 7 *@author: 豪 8 *@date: 日期:2017-5-24 時間:上午10:47:45 9 *@version 1.0 10 */ 11 public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { 12 13 @Override 14 public void completed(AsynchronousSocketChannel asc, Server attachment) { 15 //當有下一個客戶端接入的時候 直接調用Server的accept方法,這樣反復執行下去,保證多個客戶端都可以阻塞 16 attachment.assc.accept(attachment, this); 17 read(asc); 18 } 19 20 private void read(final AsynchronousSocketChannel asc) { 21 //讀取數據 22 ByteBuffer buf = ByteBuffer.allocate(1024); 23 asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { 24 @Override 25 public void completed(Integer resultSize, ByteBuffer attachment) { 26 //進行讀取之后,重置標識位 27 attachment.flip(); 28 //獲得讀取的字節數 29 System.out.println("Server -> " + "收到客戶端的數據長度為:" + resultSize); 30 //獲取讀取的數據 31 String resultData = new String(attachment.array()).trim(); 32 System.out.println("Server -> " + "收到客戶端的數據信息為:" + resultData); 33 String response = "服務器響應, 收到了客戶端發來的數據: " + resultData; 34 write(asc, response); 35 } 36 @Override 37 public void failed(Throwable exc, ByteBuffer attachment) { 38 exc.printStackTrace(); 39 } 40 }); 41 } 42 43 private void write(AsynchronousSocketChannel asc, String response) { 44 try { 45 ByteBuffer buf = ByteBuffer.allocate(1024); 46 buf.put(response.getBytes()); 47 buf.flip(); 48 asc.write(buf).get(); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } catch (ExecutionException e) { 52 e.printStackTrace(); 53 } 54 } 55 56 @Override 57 public void failed(Throwable exc, Server attachment) { 58 exc.printStackTrace(); 59 } 60 61 }
1 //Clinet類: 2 3 /** 4 * 5 *類描述:AIO客戶端 6 *@author: 豪 7 *@date: 日期:2017-5-24 時間:上午10:47:23 8 *@version 1.0 9 */ 10 public class Client implements Runnable{ 11 12 private AsynchronousSocketChannel asc ; 13 14 public Client() throws Exception { 15 asc = AsynchronousSocketChannel.open(); 16 } 17 18 public void connect(){ 19 asc.connect(new InetSocketAddress("127.0.0.1", 8765)); 20 } 21 22 public void write(String request){ 23 try { 24 asc.write(ByteBuffer.wrap(request.getBytes())).get(); 25 read(); 26 } catch (Exception e) { 27 e.printStackTrace(); 28 } 29 } 30 31 private void read() { 32 ByteBuffer buf = ByteBuffer.allocate(1024); 33 try { 34 asc.read(buf).get(); 35 buf.flip(); 36 byte[] respByte = new byte[buf.remaining()]; 37 buf.get(respByte); 38 System.out.println(new String(respByte,"utf-8").trim()); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } catch (ExecutionException e) { 42 e.printStackTrace(); 43 } catch (UnsupportedEncodingException e) { 44 e.printStackTrace(); 45 } 46 } 47 48 @Override 49 public void run() { 50 while(true){ 51 52 } 53 } 54 55 public static void main(String[] args) throws Exception { 56 Client c1 = new Client(); 57 c1.connect(); 58 59 Client c2 = new Client(); 60 c2.connect(); 61 62 Client c3 = new Client(); 63 c3.connect(); 64 65 new Thread(c1, "c1").start(); 66 new Thread(c2, "c2").start(); 67 new Thread(c3, "c3").start(); 68 69 Thread.sleep(1000); 70 71 c1.write("c1 aaa"); 72 c2.write("c2 bbbb"); 73 c3.write("c3 ccccc"); 74 } 75 76 }