1. BIO
JDK5之前, JDK的IO模式只有BIO(同步阻塞)
問題: 因為阻塞的存在, 需對每個請求開啟一個線程. 過多的線程切換影響操作系統性能
解決: 使用線程池, 處理不過來的放入隊列, 再處理不過來的會觸發其他機制
問題: 超過線程池數量的請求需要等待
public class Client { final static String ADDRESS = "127.0.0.1"; final static int PORT = 8765; public static void main(String[] args) throws IOException { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(ADDRESS, PORT); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); // true自動flush //向服務器端發送數據 out.println("來自客戶端的請求"); //從服務端接收數據 String response = in.readLine(); // 阻塞 System.out.println("Client獲取數據: " + response); } catch (Exception e) { e.printStackTrace(); } finally { out.close(); in.close(); socket.close(); } } }
服務端1: 一個請求~一個線程
public class Server { final static int PROT = 8765; public static void main(String[] args) throws IOException { ServerSocket server = null; try { server = new ServerSocket(PROT); System.out.println("server start"); while(true){ Socket socket = server.accept(); //監聽 阻塞 , socket底層會新建線程處理與客戶端的三次握手 //建立線程處理獲取的 socket new Thread(new ServerHandler(socket)).start(); } } catch (Exception e) { e.printStackTrace(); } finally { server.close(); } } } class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); // 阻塞 if (body == null) break; System.out.println("Server獲取的請求: " + body); out.println("來自服務器的響應"); } } catch (Exception e) { e.printStackTrace(); } finally { try { out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
服務端2: 用線程池處理請求
public class Server { final static int PORT = 8765; public static void main(String[] args) throws IOException { ServerSocket server = null; try { server = new ServerSocket(PORT); System.out.println("server start"); HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000); while(true){ Socket socket = server.accept(); executorPool.execute(new ServerHandler(socket)); } } catch (Exception e) { e.printStackTrace(); } finally { server.close(); } } } class HandlerExecutorPool { private ExecutorService executor; public HandlerExecutorPool(int maxPoolSize, int queueSize){ this.executor = new ThreadPoolExecutor( // 帶阻塞隊列的線程池 Runtime.getRuntime().availableProcessors(), // 初始線程數 maxPoolSize, // 線程數上限 如果要處理請求的Runnable對象裝滿了隊列, 則提高現有線程數 120L, // 如在120個時間顆粒內某線程是空閑的, 將被回收 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize) // 存放處理請求的Runnable對象 ); } public void execute(Runnable task){ this.executor.execute(task); } } class ServerHandler implements Runnable { private Socket socket; public ServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String body = null; while (true) { body = in.readLine(); if (body == null) break; System.out.println("Server獲取的請求: " + body); // 阻塞 out.println("來自服務器的響應"); } } catch (Exception e) { e.printStackTrace(); } finally { try { out.close(); in.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
2.NIO1.0
JDK5以后引入了NIO1.0(多路復用機制)
伴隨多路復用在程序中引入了如下概念:
Channel(通道):TCP連接的抽象,一個TCP連接對應多個Channel,這樣減少TCP的連接次數。
通道與BIO中socket類似
通道與BIO中的流類似, 不過channel是雙向的而流是單向的
channel有多種狀態位, 能被selector識別
Buffer(緩沖區):
緩沖區是一塊內存區域(數組), 在NIO中被包裝成Buffer對象. Buffer提供方法用來訪問該內存。
BIO中,數據存儲在流中,而NIO中,數據存儲在緩沖區中。
除了boolean的其他java七種基本類型都有相應的Buffer類. 最常使用的是ByteBuffer
Selector(多路復用器):負責輪詢所有注冊通道,根據通道狀態執行相關操作。狀態包括:Connect,Accept,Read,Write。
在"四種常用IO模型"里提過用select系統調用實現IO多路復用. 除select外Linux還提供了poll/epoll函數, 其中select/poll函數按順序掃描文件句柄是否就緒,支持的文件句柄數有限; 而epoll使用基於事件驅動方式替代順序掃描,性能更高, 對文件句柄數沒有數量限制. JDK的Selector使用了epoll, 只需要一個線程輪詢, 就可以接入大量的客戶端.
public class Client { public static void main(String[] args) throws IOException { SocketChannel sc = null; ByteBuffer writeBuf = ByteBuffer.allocate(1024); ByteBuffer readBuf = ByteBuffer.allocate(1024); try { //創建通道 sc = SocketChannel.open(); //進行連接 sc.connect(new InetSocketAddress("127.0.0.1", 8765)); // 下面步驟可以用selector輪詢代替 while(true){ //定義一個字節數組,然后使用系統錄入功能: byte[] bytes1 = new byte[1024]; System.in.read(bytes1); //阻塞 //把數據放到緩沖區中 writeBuf.put(bytes1); //對緩沖區進行復位 writeBuf.flip(); //寫出數據 sc.write(writeBuf); //清空緩沖區 writeBuf.clear(); // 接收服務端響應 sc.read(readBuf); readBuf.flip(); byte[] bytes2 = new byte[readBuf.remaining()]; readBuf.get(bytes2); readBuf.clear(); String body = new String(bytes2); System.out.println("Client獲取數據: " + body); } } catch (IOException e) { e.printStackTrace(); } finally { sc.close(); } } }
通過改變Selector監聽Channel的狀態位, 控制與客戶端讀寫的先后順序
public class Server implements Runnable{ private Selector seletor; private ByteBuffer readBuf = ByteBuffer.allocate(1024); private ByteBuffer writeBuf = ByteBuffer.allocate(1024); public Server(int port){ try { //1 創建多路復用器selector this.seletor = Selector.open(); //2 創建ServerSocket通道 ServerSocketChannel ssc = ServerSocketChannel.open(); //3 設置通道是否阻塞, 決定了通道了read/write/accept/connect方法是否阻塞 ssc.configureBlocking(false); //4 設置通道地址 ssc.bind(new InetSocketAddress(port)); //5 將ServerSocket通道注冊到selector上, 指定監聽其accept事件 ssc.register(this.seletor, SelectionKey.OP_ACCEPT); System.out.println("Server start"); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { while(true){ try { // select阻塞, 監聽相關事件 this.seletor.select(); // 解除阻塞, 返回選擇key, key含有通道, 狀態等信息 Iterator<SelectionKey> keysIter = this.seletor.selectedKeys().iterator(); // 進行遍歷 while(keysIter.hasNext()){ SelectionKey key = keysIter.next(); keysIter.remove(); if (key.isValid()) { // 等待接收連接狀態 if (key.isAcceptable()) { accept(key); } // 可讀狀態 if (key.isReadable()) { read(key); } if (key.isWritable()) { write(key); } } } } catch (IOException e) { e.printStackTrace(); } } } private void write(SelectionKey key) { try { // 獲取通道 SocketChannel sc = (SocketChannel) key.channel(); // 寫回給客戶端數據 writeBuf.put("來自服務器的響應".getBytes()); writeBuf.flip(); sc.write(writeBuf); writeBuf.clear(); // 修改監聽的狀態位, 如果保持OP_WRITE會導致重復寫 key.interestOps(SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } private void read(SelectionKey key) { try { // 獲取通道 SocketChannel sc = (SocketChannel) key.channel(); // 讀取數據, 讀到buffer. 按程序運行順序, 這里sc是否設置為阻塞效果都一樣 int count = sc.read(this.readBuf); // readBuf寫時會改變position的值 if (count == -1) { key.channel().close(); key.cancel(); //取消該通道在selector的注冊, 之后不會被select輪詢到 return; } // 有數據則進行讀取. 讀取前需要將position和limit進行復位 readBuf.flip(); // 根據緩沖區的數據長度創建相應大小的byte數組, 接收緩沖區的數據 byte[] bytes = new byte[this.readBuf.remaining()]; // 接收緩沖區數據 readBuf.get(bytes); readBuf.clear(); String body = new String(bytes).trim(); System.out.println("Server獲取的請求: " + body); // 如果保持OP_READ會導致重復讀 sc.register(this.seletor, SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } } private void accept(SelectionKey key) { try { // 獲取服務通道 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 獲取客戶端通道. SocketChannel sc = ssc.accept(); // 設置非阻塞模式 sc.configureBlocking(false); // 將客戶端通道注冊到多路復用器上,指定監聽事件 sc.register(this.seletor, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { new Thread(new Server(8765)).start();; } }
BIO客戶端與NIO服務端通信需注意的:
BIO服務端, 一次IO有明確的結束點, 客戶端再次read會返回-1
NIO服務端一次IO結束后, 沒有關閉通道, 它可能把通道從讀狀態轉為寫狀態. 於是selector不監聽讀了, 客戶端再次read什么都沒返回, 就會阻塞.
3.NIO2.0
JDK7引入了NIO2.0(即AIO)
NIO1.0中, IO過程沒有阻塞, 阻塞被轉移到了Selector輪詢上. Selector管理所有的Channel, 因此能把總阻塞時間縮到最短.
NIO2.0中, 供我們調用的IO API都是非阻塞的, 背后復雜的實現過程(肯定有阻塞)被轉移到了JDK底層和操作系統上. 我們的程序的IO調用可以做到立即返回.
同樣有Channel和Buffer, 但沒有Selector
public class Server { //線程池 private ExecutorService executorService; //異步通道線程組 private AsynchronousChannelGroup threadGroup; //服務器通道 public AsynchronousServerSocketChannel assc; public Server(int port){ try { //創建一個線程池 executorService = Executors.newCachedThreadPool(); //使用線程池創建異步通道線程組, 該線程組在底層支持着我們的異步操作 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); //使用 異步通道線程組 創建服務器通道 assc = AsynchronousServerSocketChannel.open(threadGroup); //給通道綁定端口 assc.bind(new InetSocketAddress(port)); System.out.println("server start"); // 下面的accept不會阻塞 , 一個accept只能接收一個連接請求 // accept第一個參數: 被綁定到IO操作的關聯對象(子類), 第二個參數 CompletionHandler<AsynchronousSocketChannel, 關聯對象(父類)>, 操作成功后執行的回調句柄 // 如果接受了一個新的連接, 其結果AsynchronousSocketChannel會被綁定與assc通道到相同的AsynchronousChannelGroup assc.accept(this, new ServerCompletionHandler()); // 這里為了避免程序結束, 異步通道線程組結束就不會執行回調了 Thread.sleep(Integer.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { new Server(8765); } }
//第一個參數: IO操作結果; 第二個參數: 被綁定到IO操作的關聯對象 public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { // 以下兩個重載參數與CompletionHander的模板參數一致, 回調時被傳入IO結果和IO操作時設置的關聯對象 @Override public void completed(AsynchronousSocketChannel asc, Server attachment) { // 完成當前連接時, 首先, 為下一個客戶端能接入再次調用accept異步方法 attachment.assc.accept(attachment, this); // 其次, 執行下一步的讀操作 read(asc); } @Override public void failed(Throwable exc, Server attachment) { exc.printStackTrace(); } private void read(final AsynchronousSocketChannel asc) { //讀取數據 ByteBuffer buf = ByteBuffer.allocate(1024); // 第一個參數: 讀操作的Buffer, 第二個參數: IO關聯對象, 第三個參數:CompletionHandler<Integer, IO管理對象父類> asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer resultSize, ByteBuffer attachment) { //進行讀取之后,重置標識位 attachment.flip(); //獲得讀取的字節數 System.out.println("Server端" + "收到客戶端的數據長度為:" + resultSize); //獲取讀取的數據 String resultData = new String(attachment.array()).trim(); System.out.println("Server端" + "收到客戶端的數據信息為:" + resultData); String response = "From服務端To客戶端: 於" + new Date() + "收到了請求數據"+ resultData; write(asc, response); } @Override public void failed(Throwable exc, ByteBuffer attachment) { exc.printStackTrace(); } }); } private void write(AsynchronousSocketChannel asc, String response) { try { ByteBuffer buf = ByteBuffer.allocate(1024); buf.put(response.getBytes()); buf.flip(); // 寫操作, 異步 Future<Integer> future = asc.write(buf); // 阻塞等待結果 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
public class Client { private AsynchronousSocketChannel asc ; public Client() throws Exception { asc = AsynchronousSocketChannel.open(); } public void connect() throws InterruptedException, ExecutionException{ // get()阻塞 asc.connect(new InetSocketAddress("127.0.0.1", 8765)).get(); } public void write(String request){ try { // get()阻塞 asc.write(ByteBuffer.wrap(request.getBytes())).get(); read(); } catch (Exception e) { e.printStackTrace(); } } private void read() throws IOException { ByteBuffer buf = ByteBuffer.allocate(1024); try { // get()阻塞 asc.read(buf).get(); buf.flip(); byte[] respByte = new byte[buf.remaining()]; buf.get(respByte); System.out.println(new String(respByte,"utf-8").trim()); // 關閉 asc.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { Client c1 = new Client(); Client c2 = new Client(); c1.connect(); c2.connect(); c1.write("aa"); c2.write("bbb"); } }