BIO編程
最原始BIO
網絡編程的基本模型是C/S模型,即兩個進程間的通信。
服務端提供IP和監聽端口,客戶端通過連接操作想服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。
傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功后,雙方通過輸入和輸出流進行同步阻塞式通信。
最原始BIO通信模型圖:
存在的問題:
- 同一時間,服務器只能接受來自於客戶端A的請求信息;雖然客戶端A和客戶端B的請求是同時進行的,但客戶端B發送的請求信息只能等到服務器接受完A的請求數據后,才能被接受。(acceptor只有在接受完client1的請求后才能接受client2的請求)
- 由於服務器一次只能處理一個客戶端請求,當處理完成並返回后(或者異常時),才能進行第二次請求的處理。很顯然,這樣的處理方式在高並發的情況下,是不能采用的。
一請求一線程BIO
那有沒有方法改進呢? ,答案是有的。改進后BIO通信模型圖:
此種BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理沒處理完成后,通過輸出流返回應答給客戶端,線程銷毀。即典型的一請求一應答通宵模型。
代碼演示
服務端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServer { //默認的端口號 private static int DEFAULT_PORT = 8083; public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("監聽來自於"+DEFAULT_PORT+"的端口信息"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //這個wait不涉及到具體的實驗邏輯,只是為了保證守護線程在啟動所有線程后,進入等待狀態 synchronized (NioSocketServer.class) { try { BioSocketServer.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我們收取信息 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用線程,同樣無法解決read方法的阻塞問題, //也就是說read方法處同樣會被阻塞,直到操作系統有數據准備好 int realLen = in.read(contextBytes, 0, maxLen); //讀取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 System.out.println("服務器收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //試圖關閉 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
客戶端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URLDecoder; import java.util.concurrent.CountDownLatch; public class BioSocketClient{ public static void main(String[] args) throws Exception { Integer clientNumber = 20; CountDownLatch countDownLatch = new CountDownLatch(clientNumber); // 分別開始啟動這20個客戶端,並發訪問 for (int index = 0; index < clientNumber; index++, countDownLatch.countDown()) { ClientRequestThread client = new ClientRequestThread(countDownLatch, index); new Thread(client).start(); } // 這個wait不涉及到具體的實驗邏輯,只是為了保證守護線程在啟動所有線程后,進入等待狀態 synchronized (BioSocketClient.class) { BioSocketClient.class.wait(); } } } /** * 一個ClientRequestThread線程模擬一個客戶端請求。 * @author keep_trying */ class ClientRequestThread implements Runnable { private CountDownLatch countDownLatch; /** * 這個線程的編號 * @param countDownLatch */ private Integer clientIndex; /** * countDownLatch是java提供的同步計數器。 * 當計數器數值減為0時,所有受其影響而等待的線程將會被激活。這樣保證模擬並發請求的真實性 * @param countDownLatch */ public ClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) { this.countDownLatch = countDownLatch; this.clientIndex = clientIndex; } @Override public void run() { Socket socket = null; OutputStream clientRequest = null; InputStream clientResponse = null; try { socket = new Socket("localhost",8083); clientRequest = socket.getOutputStream(); clientResponse = socket.getInputStream(); //等待,直到SocketClientDaemon完成所有線程的啟動,然后所有線程一起發送請求 this.countDownLatch.await(); //發送請求信息 clientRequest.write(("這是第" + this.clientIndex + " 個客戶端的請求。 over").getBytes()); clientRequest.flush(); //在這里等待,直到服務器返回信息 System.out.println("第" + this.clientIndex + "個客戶端的請求發送完成,等待服務器返回信息"); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; int realLen; String message = ""; //程序執行到這里,會一直等待服務器返回信息(注意,前提是in和out都不能close,如果close了就收不到服務器的反饋了) while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { message += new String(contextBytes , 0 , realLen); } //String messageEncode = new String(message , "UTF-8"); message = URLDecoder.decode(message, "UTF-8"); System.out.println("第" + this.clientIndex + "個客戶端接收到來自服務器的信息:" + message); } catch (Exception e) { } finally { try { if(clientRequest != null) { clientRequest.close(); } if(clientResponse != null) { clientResponse.close(); } } catch (IOException e) { } } } }
存在的問題:
- 雖然在服務器端,請求的處理交給了一個獨立線程進行,但是操作系統通知accept()的方式還是單個的。也就是,實際上是服務器接收到數據報文后的“業務處理過程”可以多線程,但是數據報文的接受還是需要一個一個的來(acceptor只有在接受完client1的請求后才能接受client2的請求),下文會驗證。
- 在linux系統中,可以創建的線程是有限的。我們可以通過cat /proc/sys/kernel/threads-max命令查看可以創建的最大線程數。當然這個值是可以更改的,但是線程越多,CPU切換所需的時間也就越長,用來處理真正業務的需求也就越少。
- 另外,如果您的應用程序大量使用長連接的話,線程是不會關閉的。這樣系統資源的消耗更容易失控。
偽異步I/O編程
為了改進這種一連接一線程的模型,我們可以使用線程池來管理這些線程,實現1個或多個線程處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽異步I/O模型“。
偽異步I/O模型圖:
代碼演示
只給出服務端,客戶端和上面相同
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServerThreadPool { //默認的端口號 private static int DEFAULT_PORT = 8083; //線程池 懶漢式的單例 private static ExecutorService executorService = Executors.newFixedThreadPool(60); public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("監聽來自於"+DEFAULT_PORT+"的端口信息"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); //當然業務處理過程可以交給一個線程(這里可以使用線程池),並且線程的創建是很耗資源的。 //最終改變不了.accept()只能一個一個接受socket的情況,並且被阻塞的情況 SocketServerThreadPool socketServerThreadPool = new SocketServerThreadPool(socket); executorService.execute(socketServerThreadPool); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //這個wait不涉及到具體的實驗邏輯,只是為了保證守護線程在啟動所有線程后,進入等待狀態 synchronized (NioSocketServer.class) { try { BioSocketServerThreadPool.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThreadPool implements Runnable { private Socket socket; public SocketServerThreadPool (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我們收取信息 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用線程,同樣無法解決read方法的阻塞問題, //也就是說read方法處同樣會被阻塞,直到操作系統有數據准備好 int realLen = in.read(contextBytes, 0, maxLen); //讀取信息 String message = new String(contextBytes , 0 , realLen); //下面打印信息 System.out.println("服務器收到來自於端口:" + sourcePort + "的信息:" + message); //下面開始發送信息 out.write("回發響應信息!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //試圖關閉 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
服務器端的執行效果
在 Socket socket = serverSocket.accept(); 處打了斷點,有20個客戶端同時發出請求,可服務端還是一個一個的處理,其它線程都處於阻塞狀態
阻塞的問題根源
那么重點的問題並不是“是否使用了多線程、或是線程池”,而是為什么accept()、read()方法會被阻塞。API文檔中對於 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
服務器線程發起一個accept動作,詢問操作系統 是否有新的socket套接字信息從端口xx發送過來。
注意,是詢問操作系統。也就是說socket套接字的IO模式支持是基於操作系統的,那么自然同步IO/異步IO的支持就是需要操作系統級別的了。如下圖:
如果操作系統沒有發現有套接字從指定的端口xx來,那么操作系統就會等待。這樣serverSocket.accept()方法就會一直等待。這就是為什么accept()方法為什么會阻塞:它內部的實現是使用的操作系統級別的同步IO。
- 阻塞IO 和 非阻塞IO
這兩個概念是程序級別的。主要描述的是程序請求操作系統IO操作后,如果IO資源沒有准備好,那么程序該如何處理的問題:前者等待;后者繼續執行(並且使用線程一直輪詢,直到有IO資源准備好了) - 同步IO 和非同步IO
這兩個概念是操作系統級別的。主要描述的是操作系統在收到程序請求IO操作后,如果IO資源沒有准備好,該如何處理相應程序的問題:前者不響應,直到IO資源准備好以后;后者返回一個標記(好讓程序和自己知道以后的數據往哪里通知),當IO資源准備好以后,再用事件機制返回給程序。