轉載請注明出處:http://www.cnblogs.com/Joanna-Yan/p/7723174.html
為了解決同步阻塞I/O面臨的一個鏈路需要一個線程處理的問題,后來有人對它的線程模型進行了優化,后端通過一個線程池來處理多個客戶端的請求接入,形成客戶端個數M:線程池最大線程數N的比例關系,其中M可以遠遠大於N,通過線程池可以靈活的調配線程資源,設置線程的最大值,防止由於海量並發接入導致線程耗盡。
下面,我們結合連接模型圖和源碼,對偽異步I/O進行分析,看它是否能夠解決同步阻塞I/O面臨的問題。
1. 偽異步I/O模型圖
采用線程池和任務隊列可以實現一種叫做偽異步的I/O通信框架,它的模型圖如下所示。
當有新的客戶端接入的時候,將客戶端的Socket封裝成一個Task(該任務實現java.lang.Runnable接口)投遞到后端的線程池中進行處理,JDK的線程池維護一個消息隊列和N個活躍線程對消息隊列中的任務進行處理。由於線程池可以設置消息隊列的大小和最大線程數。因此,它的資源占用是可控的,無論多少個客戶端並發訪問,都不會導致資源的耗盡和宕機。
圖1 偽異步I/O服務端通信模型(M:N)
下面我們依然采用時間服務器程序,將其改造成為偽異步I/O時間服務器,然后通過對代碼進行分析,找出其弊端。
2.偽異步式I/O創建的TimeServer源碼分析
package joanna.yan.poio; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * 偽異步式I/O * @author Joanna.Yan * @date 2017年10月24日上午10:16:10 */ public class TimeServer { public static void main(String[] args) { int port=9090; if(args!=null&&args.length>0){ try { port=Integer.valueOf(args[0]); } catch (Exception e) { // 采用默認值 } } ServerSocket server=null; try { server=new ServerSocket(port); System.out.println("The time server is start in port :"+ port); Socket socket=null; //創建一個時間服務器類的線程池 TimeServerHandlerExecutePool singleExecutor=new TimeServerHandlerExecutePool(50, 10000);//創建I/O任務 while(true){ socket=server.accept(); //當接收到新的客戶端連接時,將請求Socket封裝成一個Task,然后調用execute方法執行。從而避免了每個請求接入都創建一個新的線程。 singleExecutor.execute(new TimeServerHandler(socket)); } } catch (IOException e) { e.printStackTrace(); }finally{ if(server!=null){ try { System.out.println("The time server close"); server=null; server.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
package joanna.yan.poio; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 由於線程池和消息隊列都是有界的,因此,無論客戶端並發連接數多大,它都不會導致線程個數過於膨脹或者內存溢出, * 相比於傳統的一連接一線程模型,是一種改良。 * @author Joanna.Yan * @date 2017年10月24日下午2:39:49 */ public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){ executor=new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<java.lang.Runnable>(queueSize)); } public void execute(java.lang.Runnable task){ executor.execute(task);; } }
package joanna.yan.poio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.Date; public class TimeServerHandler implements Runnable{ private Socket socket; public TimeServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in=null; PrintWriter out=null; try { in=new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out=new PrintWriter(this.socket.getOutputStream(), true); String currentTime=null; String body=null; while(true){ body=in.readLine(); if(body==null){ break; } System.out.println("The time server receive order:"+body); //如果請求消息為查詢時間的指令"QUERY TIME ORDER"則獲取當前最新的系統時間。 currentTime="QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; out.println(currentTime); } } catch (IOException e) { e.printStackTrace(); }finally{ if(in!=null){ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if(out!=null){ out.close(); out=null; } if(this.socket!=null){ try { this.socket.close(); this.socket=null; } catch (IOException e) { e.printStackTrace(); } } } } }
偽異步I/O通信框架采用了線程池實現,因此避免了為每個請求都創建一個獨立線程造成的線程資源耗盡問題。但是由於它底層的通信依然采用同步阻塞模型,因此無法從根本上解決問題。
3.偽異步I/O弊端分析
要對偽異步I/O的弊端進行深入分析,首先我們看兩個Java同步I/O的API說明。隨后我們結合代碼進行詳細分析。
請注意加粗斜體字部分的API說明,當對Socket的輸入流進行讀取操作的時候,它會一直阻塞下去,直到發生如下三種事件。
- 有數據可讀;
- 可用數據已經讀取完畢;
- 發生空指針或者I/O異常。
這意味着當對方發送請求或者應答消息比較緩慢、或者網絡傳輸較慢時,讀取輸入流一方的通信線程將被長時間阻塞,如果對方要60s才能夠將數據發送完成,讀取一方的I/O線程也將會被同步阻塞60s,在此期間,其他接入消息只能在消息隊列中排隊。
下面我們接着對輸出流進行分析,還是看JDK I/O類庫輸出流的API文檔,然后結合文檔說明進行故障分析。
當調用OutputStream的write方法寫輸出流的時候,它將會被阻塞,直到要發送的字節全部寫入完畢,或者發生異常。學習過TCP/IP相關知識的人都知道,當消息的接收方處理緩慢的時候,將不能及時地從TCP緩沖區讀取數據,這將會導致發送方的TCP window size不斷減小,直到為0,雙方處於Keep-Alive狀態,消息發送方將不能再向TCP緩沖區寫入消息,這是如果采用的是同步阻塞I/O,write操作將會被無限期阻塞,直到TCP window size大於0或者發生I/O異常。
通過對輸入和輸出流的API文檔進行分析,我們了解到讀和寫操作都是同步阻塞的,阻塞的時間取決於對方I/O線程的處理速度和網絡I/O傳輸速度。本質上來講,我們無法保證生產環境的網絡狀況和對端的應用程序能夠足夠快,如果我們的應用程序依賴對方的處理速度,它的可靠性就非常差。
偽異步I/O實際上僅僅只是對之前I/O線程模型的一個簡單優化,它無法從根本上解決同步I/O導致的通信線程阻塞問題。下面我們就簡單分析下如果通信對方返回應答時間過長,會引起的級聯故障。
- 服務端處理緩慢,返回應答消息耗費60s,平時只需要10ms。
- 采用偽異步I/O的線程正在讀取故障服務節點的響應,由於讀取輸入流是阻塞的,因此,它將會被同步阻塞60s。
- 假如所有的可用線程都被故障服務器阻塞,那后續所有的I/O消息都將在隊里中排隊。
- 由於線程池采用阻塞隊里實現,當隊列積滿之后,后續入隊的操作將被阻塞。
- 由於前端只有一個Accptor線程接收客戶端接入,它被阻塞在線程池的同步阻塞隊列之后,新的客戶端請求消息將被拒絕, 客戶端會發生大量的連接超時。
- 由於幾乎所有的連接都超時,調用者會認為系統已經崩潰,無法接收新的請求消息。
那么這個問題如何解決?后面的NIO將給出答案。
如果此文對您有幫助,微信打賞我一下吧~