Socket通信,主要是基於TCP協議的通信。本文從Socket通信(代碼實現)、多線程並發、以及TCP協議相關原理方面 介紹 阻塞Socket通信一些知識。
本文從服務器端的視角,以“Echo Server”程序為示例,描述服務器如何處理客戶端的連接請求。Echo Server的功能就是把客戶端發給服務器的數據原封不動地返回給客戶端。
第一種方式是單線程處理方式:服務器的處理方法如下:
1 public void service(){ 2 while (true) { 3 Socket socket = null; 4 try { 5 socket = serverSocket.accept(); 6 System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort()); 7 BufferedReader br = getBufferReader(socket);//獲得socket輸入流,並將之包裝成BufferedReader 8 PrintWriter pw = getWriter(socket);//獲得socket輸出流,並將之包裝成PrintWriter 9 String msg = null; 10 while ((msg = br.readLine()) != null) { 11 12 pw.println(echo(msg));//服務端的處理邏輯,將client發來的數據原封不動再發給client 13 pw.flush(); 14 if(msg.equals("bye"))//若client發送的是 "bye" 則關閉socket 15 break; 16 } 17 } catch (IOException e) { 18 e.printStackTrace(); 19 } finally { 20 try{ 21 if(socket != null) 22 socket.close(); 23 }catch(IOException e){e.printStackTrace();} 24 } 25 } 26 }
上面用的是while(true)循環,這樣,Server不是只接受一次Client的連接就退出,而是不斷地接收Client的連接。
1)第5行,服務器線程執行到accept()方法阻塞,直至有client的連接請求到來。
2)當有client的請求到來時,就會建立socket連接。從而在第8、9行,就可以獲得這條socket連接的輸入流和輸出流。輸入流(BufferedReader)負責讀取client發過來的數據,輸出流(PrintWriter)負責將處理后的數據返回給Client。
下面來詳細分析一下建立連接的過程:
Client要想成功建立一條到Server的socket連接,其實是受很多因素影響的。其中一個就是:Server端的“客戶連接請求隊列長度”。它可以在創建ServerSocket對象由構造方法中的 backlog 參數指定:JDK中 backlog參數的解釋是: requested maximum length of the queue of incoming connections.
public ServerSocket(int port, int backlog) throws IOException { this(port, backlog, null); }
看到了這個:incoming commections 有點奇怪,因為它講的是“正在到來的連接”,那什么又是incoming commections 呢?這個就也TCP建立連接的過程有關了。
TCP建立連接的過程可簡述為三次握手。第一次:Client發送一個SYN包,Server收到SYN包之后回復一個SYN/ACK包,此時Server進入一個“中間狀態”--SYN RECEIVED 狀態。
這可以理解成:Client的連接請求已經過來了,只不過還沒有完成“三次握手”。因此,Server端需要把當前的請求保存到一個隊列里面,直至當Server再次收到了Client的ACK之后,Server進入ESTABLISHED狀態,此時:serverSocket 從accpet() 阻塞狀態中返回。也就是說:當第三次握手的ACK包到達Server端后,Server從該請求隊列中取出該連接請求,同時Server端的程序從accept()方法中返回。
那么這個請求隊列長度,就是由 backlog 參數指定。那這個隊列是如何實現的呢?這個就和操作系統有關了,感興趣的可參考:How TCP backlog works in Linux
此外,也可以看出:服務器端能夠接收的最大連接數 也與 這個請求隊列有關。對於那種高並發場景下的服務器而言,首先就是請求隊列要足夠大;其次就是當連接到來時,要能夠快速地從隊列中取出連接請求並建立連接,因此,執行建立連接任務的線程最好不要阻塞。
現在來分析一下上面那個:單線程處理程序可能會出現的問題:
服務器始終只有一個線程執行accept()方法接受Client的連接。建立連接之后,又是該線程處理相應的連接請求業務邏輯,這里的業務邏輯是:把客戶端發給服務器的數據原封不動地返回給客戶端。
顯然,這里一個線程干了兩件事:接受連接請求 和 處理連接(業務邏輯)。好在這里的處理連接的業務邏輯不算復雜,如果對於復雜的業務邏輯 而且 有可能在執行業務邏輯過程中還會發生阻塞的情況時,那此時服務器就再也無法接受新的連接請求了。
第二種方式是:一請求一線程的處理模式:
1 public void service() { 2 while (true) { 3 Socket socket = null; 4 try { 5 socket = serverSocket.accept();//接受client的連接請求 6 new Thread(new Handler(socket)).start();//每接受一個請求 就創建一個新的線程 負責處理該請求 7 } catch (IOException e) { 8 e.printStackTrace(); 9 } 10 finally { 11 try{ 12 if(socket != null) 13 socket.close(); 14 }catch(IOException e){e.printStackTrace();} 15 } 16 } 17 }
再來看Handler的部分實現:Handler是一個implements Runnable接口的線程,在它的run()里面處理連接(執行業務邏輯)
1 class Handler implements Runnable{ 2 Socket socket; 3 public Handler(Socket socket) { 4 this.socket = socket; 5 } 6 7 @Override 8 public void run() { 9 try{ 10 BufferedReader br = null; 11 PrintWriter pw = null; 12 System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort()); 13 14 br = getBufferReader(socket); 15 pw = getWriter(socket); 16 17 String msg = null; 18 while((msg = br.readLine()) != null){ 19 pw.println(echo(msg)); 20 pw.flush(); 21 if(msg.equals("bye")) 22 break; 23 } 24 }catch(IOException e){ 25 e.printStackTrace(); 26 } 27 }
從上面的單線程處理模型中看到:如果線程在執行業務邏輯中阻塞了,服務器就不能接受用戶的連接請求了。
而對於一請求一線程模型而言,每接受一個請求,就創建一個線程來負責該請求的業務邏輯。盡管,這個請求的業務邏輯執行時阻塞了,只要服務器還能繼續創建線程,那它就還可以繼續接受新的連接請求。此外,負責建立連接請求的線程 和 負責處理業務邏輯的線程分開了。業務邏輯執行過程中阻塞了,“不會影響”新的請求建立連接。
顯然,如果Client發送的請求數量很多,那么服務器將會創建大量的線程,而這是不現實的。有以下原因:
1)創建線程是需要系統開銷的,線程的運行系統資源(內存)。因此,有限的硬件資源 就限制了系統中線程的數目。
2)當系統中線程很多時,線程的上下文開銷會很大。比如,請求的業務邏輯的執行是IO密集型任務,經常需要阻塞,這會造成頻繁的上下文切換。
3)當業務邏輯處理完成之后,就需要銷毀線程,如果請求量大,業務邏輯又很簡單,就會導致頻繁地創建銷毀線程。
那能不能重用已創建的線程? ---這就是第三種方式:線程池處理。
第三種方式是線程池的處理方式:
1 public class EchoServerThreadPool { 2 private int port = 8000; 3 private ServerSocket serverSocket; 4 private ExecutorService executorService; 5 private static int POOL_SIZE = 4;//每個CPU中線程擁有的線程數 6 7 public EchoServerThreadPool()throws IOException { 8 serverSocket = new ServerSocket(port); 9 executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE); 10 System.out.println("server start"); 11 } 12 13 public void service(){ 14 while(true){ 15 Socket socket = null; 16 try{ 17 socket = serverSocket.accept();//等待接受Client連接 18 executorService.execute(new Handler(socket));//將已經建立連接的請求交給線程池處理 19 }catch(IOException e){ 20 e.printStackTrace(); 21 } 22 } 23 } 24 public static void main(String[] args)throws IOException{ 25 new EchoServerThreadPool().service(); 26 } 27 }
采用線程池最大的優勢在於“重用線程”,有請求任務來了,從線程池中取出一個線程負責該請求任務,任務執行完成后,線程自動歸還到線程池中,而且java.util.concurrent包中又給出了現成的線程池實現。因此,這種方式看起來很完美,但還是有一些問題是要注意的:
1)線程池有多大?即線程池里面有多少個線程才算比較合適?這個要根據具體的業務邏輯來分析,而且還得考慮面對的使用場景。一個合理的要求就是:盡量不要讓CPU空閑下來,即CPU的復用率要高。如果業務邏輯是經常會導致阻塞的IO操作,一般需要設置 N*(1+WT/ST)個線程,其中N為可用的CPU核數,WT為等待時間,ST為實際占用CPU運算時間。如果業務邏輯是CPU密集型作業,那么線程池中的線程數目一般為N個或N+1個即可,因為太多了會導致CPU切換開銷,太少了(小於N),有些CPU核就空閑了。
2)線程池帶來的死鎖問題
線程池為什么會帶來死鎖呢?在JAVA 1.5 之后,引入了java.util.concurrent包。線程池則可以通過如下方式實現:
ExecutorService executor = Executors.newSingleThreadExecutor(); //ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(task);// task implements Runnable executor.shutdown();
Executors可以創建各種類型的線程池。如果創建一個緩存的線程池:
ExecutorService executor = Executors.newCachedThreadPool();
對於高負載的服務器而言,在緩存線程池中,被提交的任務沒有排成隊列,而是直接交給線程執行。也就是說:只要來一個請求,如果線程池中沒有線程可用,服務器就會創建一個新的線程。如果線程已經把CPU用完了,此時還再創建線程就沒有太大的意義了。因此,對於高負載的服務器而言,一般使用的是固定數目的線程池(來自Effective Java)
主要有兩種類型的死鎖:①線程A占有了鎖X,等待鎖Y,而線程B占用了鎖Y,等待鎖X。因此,向線程池提交任務時,要注意判斷:提交了的任務(Runnable對象)會不會導致這種情況發生?
②線程池中的所有線程在執行各自的業務邏輯時都阻塞了,它們都需要等待某個任務的執行結果,而這個任務還在“請求隊列”里面未提交!
3)來自Client的請求實在是太多了,線程池中的線程都用完了(已無法再創建新線程)。此時,服務器只好拒絕新的連接請求,導致Client拋出:ConnectException。
4)線程泄露
導致線程泄露的原因也很多,而且還很難發覺,網上也有很多善於線程池線程泄露的問題。比如說:線程池中的線程在執行業務邏輯時拋異常了,怎么辦?是不是這個工作線程就異常終止了?那這樣,線程池中可用的線程數就少了一個了?看一下JDK ThreadPoolExecutor 線程池中的線程執行任務的過程如下:
try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
從上面源碼看出:線程執行出異常后是由 afterExecute(task, thrown) 來處理的。至於對線程有何影響,我也沒找到很好的解釋。
另外一種引起線程泄露的情況就是:線程池中的工作線程在執行業務邏輯時,一直阻塞下去了。那這也意味着這個線程基本上不干活了,這就影響了線程池中實際可用的線程數目。如何所有的線程都是這種情況,那也無法向線程池提交任務了。此外,關於線程池帶來的問題還可參考:Java編程中線程池的風險規避 另外, 關於JAVA線程池使用可參考下:Java的Executor框架和線程池實現原理
到這里,阻塞通信的三種模式都已經介紹完畢了。在網上發現了一篇很好的博文,剛好可以配合我這篇文章的代碼演示一起來看:架構設計:系統間通信(1)——概述從“聊天”開始上篇
TCP連接 對 應用層協議(比如 HTTP協議)會產生哪些影響?
主要從以下幾個方面描述TCP協議對應用層協議的影響:(結合JAVA網絡編程中的 具體SOcket類的 相關參數分析)
1)最大段長度MSS
TCP協議是提供可靠連接的,在建立連接的過程中,會協商一些參數,比如MSS。TCP傳輸的數據是流,把流截成一段段的報文進行傳輸,MSS是 每次傳輸TCP報文的 最大數據分段。
為什么需要MSS呢?如果傳輸的報文太大,則需要在IP層進行分片,分成了若干片的報文在傳輸過程中任何一片丟失了,整個報文都得重傳。重傳直接影響了網絡效率。因此,在建立連接時就協商(SYN包)底層的數據鏈路層最大能傳遞多大的報文(比如以太網的MTU=1500),然后在傳輸層(TCP)就對數據進行分段,盡量避免TCP傳輸的數據在IP層分片。
另外,關於MSS可參考:【網絡協議】TCP分段與IP分片 和 IP分片詳解
而對於上層應用而言(比如HTTP協議),它只管將數據寫入緩沖區,但其實它寫入的數據在TCP層其實是被分段發送的。當目的主機收到所有的分段之后,需要重組分段。因此,就會出現所謂的HTTP粘包問題。
2)TCP連接建立過程的“三次握手”
“三次握手”的大概流程如下:
Client發送一個SYN包,Server返回一個SYN/ACK包,然后Client再對 SYN/ACK 包進行一次確認ACK。在對 SYN/ACK 進行確認時,Client就可以向Server端 發送實際的數據了。這種利用ACK確認時順帶發送數據的方式 可以 減少 Client與Server 之間的報文交換。
3)TCP“慢啟動”的擁塞控制
什么是“慢啟動”呢?因為TCP連接是可靠連接,具有擁塞控制的功能。如果不進行擁塞控制,網絡擁堵了導致容易丟包,丟包又得重傳,就很難保證可靠性了。
而“慢啟動”就是實現 擁塞控制 的一種機制。也就是說:對於新建立的TCP連接而言,它不能立馬就發送很多報文,而是:先發送 1個報文,等待對方確認;收到確認后,就可以一次發送2個報文了,再等待對方確認;收到確認后,就一次可以發送4個報文了.....每次可發送的報文數依次增加(指數級增加,當然不會一直增加下去),這個過程就是“打開擁塞窗口”。
那這個慢啟動特性有何影響呢?
一般而言,就是“老的”TCP連接 比 新建立的 TCP連接有着更快的發送速度。因為,新的TCP連接有“慢啟動”啊。而“老的”TCP連接可能一次允許發送多個報文。
因此,對於HTTP連接而言,選擇重用現有連接既可以減少新建HTTP連接的開銷,又可以重用老的TCP連接,立即發送數據。
HTTP重用現有的連接,在HTTP1.0的 Connection頭部設置"Keep-Alive"屬性。在HTTP1.1版本中,默認是打開持久連接的,可參考HTTP1.1中的 persistent 參數。
4)發送數據時,先收集待發送的數據,讓發送緩沖區滿了之后再發送的Nagle算法
對於一條Socket連接而言,發送方有自己的發送緩沖區。在JAVA中,由java.net.SocketOptions類的 SO_SNFBUF 屬性指定。可以調用setSendBufferSize方法來設置發送緩沖區(同理接收緩沖區)
public synchronized void setSendBufferSize(int size) throws SocketException{ if (!(size > 0)) { throw new IllegalArgumentException("negative send size"); } if (isClosed()) throw new SocketException("Socket is closed"); getImpl().setOption(SocketOptions.SO_SNDBUF, new Integer(size)); }
那什么是Negle算法呢?
假設每次發送的TCP分段只包含少量的有效數據(比如1B),而TCP首部加上IP首部至少有40B,每次為了發送1B的數據都要帶上一個40B的首部,顯然網絡利用率是很低的。
因為,Negle算法就是:發送方的數據不是立即就發送,而是先放在緩沖區內,等到緩沖區滿了再發送(或者所發送的所有分組都已經返回了確認了)。說白了,就是先把數據“聚集起來”,分批發送。
Negale算法對上層應用會有什么影響呢?
對小批量數據傳輸的時延影響很大。比如 網絡游戲 中的實時捕獲 玩家的位置。玩家位置變了,也許只有一小部分數據發送給 服務器,若采用Negale算法,發送的數據被緩沖起來了,服務器會遲遲接收不到玩家的實時位置信息。因此,Negale算法適合於那種大批量數據傳輸的場景。
因此,SocketOptions類的 TCP_NODELAY 屬性用來設置 在TCP連接中是否啟用 Negale算法。
public void setTcpNoDelay(boolean on) throws SocketException { if (isClosed()) throw new SocketException("Socket is closed"); getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on)); }
5)在發送數據時捎帶確認的延遲確認算法
比如,Server在接收到了Client發送的一些數據,但是Server並沒有立即對這些數據進行確認。而是:當Server有數據需要發送到Client時,在發送數據的同時 捎帶上 對前面已經接收到的數據的確認。(這其實也是盡量減少Server與Client之間的報文量,畢竟:每發一個報文,是有首部開銷的。)
這種方式會影響到上層應用的響應性。可能會對HTTP的請求-響應模式產生很大的時延。
6)TCP的 KEEP_ALIVE
這個在JDK源碼中解釋的非常好了。故直接貼上來:
/**
* When the keepalive option is set for a TCP socket and no data
* has been exchanged across the socket in either direction for
* 2 hours (NOTE: the actual value is implementation dependent),
* TCP automatically sends a keepalive probe to the peer. This probe is a
* TCP segment to which the peer must respond.
* One of three responses is expected:
* 1. The peer responds with the expected ACK. The application is not
* notified (since everything is OK). TCP will send another probe
* following another 2 hours of inactivity.
* 2. The peer responds with an RST, which tells the local TCP that
* the peer host has crashed and rebooted. The socket is closed.
* 3. There is no response from the peer. The socket is closed.
*
* The purpose of this option is to detect if the peer host crashes.
*
* Valid only for TCP socket: SocketImpl
當TCP連接設置了KEEP-ALIVE時,如果這條socket連接在2小時(視情況而定)內沒有數據交換,然后就會發一個“探測包”,以判斷對方的狀態。
然后,等待對方發送這個探測包的響應。一共會出現以上的三種情況,並根據出現的情況作出相應的處理。
①對方(peer)收到了正常的 ACK,說明一切正常,上層應用並不會注意到這個過程(發送探測包的過程)。再等下一個2個小時時繼續探測連接是否存活。
②對方返回一個RST包,表明對方已經crashed 或者 rebooted,socket連接關閉。
③未收到對方的響應,socket連接關閉。
這里需要注意的是:在HTTP協議中也有一個KEEP-ALIVE,可參考:HTTP長連接
7)TCP連接關閉時的影響
TCP關閉連接有“四次揮手”,主動關閉連接的一方會有一個 TIME_WAIT 狀態。也就是說,在Socket的close()方法執行后,close()方法立即返回了,但是底層的Socket連接並不會立即關閉,而是會等待一段時間,將剩余的數據都發送完畢再關閉連接。可以用SocketOptions的 SO_LINGER 屬性來控制sockect的關閉行為。
看JDK中 SO_LINGER的解釋如下:
/** * Specify a linger-on-close timeout. This option disables/enables * immediate return from a <B>close()</B> of a TCP Socket. Enabling * this option with a non-zero Integer <I>timeout</I> means that a * <B>close()</B> will block pending the transmission and acknowledgement * of all data written to the peer, at which point the socket is closed * <I>gracefully</I>. Upon reaching the linger timeout, the socket is * closed <I>forcefully</I>, with a TCP RST. Enabling the option with a * timeout of zero does a forceful close immediately. If the specified * timeout value exceeds 65,535 it will be reduced to 65,535. * <P> * Valid only for TCP: SocketImpl * * @see Socket#setSoLinger * @see Socket#getSoLinger */ public final static int SO_LINGER = 0x0080;
因此,當調用Socket類的 public void setSoLinger(boolean on, int linger)設置了 linger 時間后,執行 close()方法不會立即返回,而是進入阻塞狀態。
然后,Socket會 等到所有的數據都已經確認發送了 peer 端。(will block pending the transmission and acknowledgement of all data written to the peer)【第四次揮手時client 發送的ACK到達了Server端】
或者:經過了 linger 秒之后,強制關閉連接。( Upon reaching the linger timeout, the socket is closed forcefully)
那為什么需要一個TIME_WAIT時延呢?即:執行 close()方法 時需要等待一段時間再 真正關閉Socket?這也是“四次揮手”時,主動關閉連接的一方會 持續 TIME_WAIT一段時間(一般是2MSL大小)
①確保“主動關閉端”(Client端)最后發送的ACK能夠成功到達“被動關閉端”(Server端)
因為,如何不能確保ACK是否成功到達Server端的話,會影響Server端的關閉。假設最后第四次揮手時 Client 發送給 Server的ACK丟失了,若沒有TIME_WAIT,Server會認為是自己FIN包沒有成功發送給Client(因為Server未收到ACK啊),就會導致Server重傳FIN,而不能進入 closed 狀態。
②舊的TCP連接包會干擾新的TCP連接包,導致新的TCP連接收到的包亂序。
若沒有TIME_WAIT,本次TCP連接(為了更好的闡述問題,記本次TCP連接為TCP_連接1)斷開之后,又立即建立新的一條TCP連接(TCP_連接2)。
TCP_連接1 發送的包 有可能在網絡中 滯留了。而現在又新建了一條 TCP_連接2 ,如果滯留的包(滯留的包是無效的包了,因為TCP_連接1已經關閉了) 又 重新到達了 TCP_連接2,由於 滯留的包的(源地址,源端口,目的地址,目的端口)與 TCP_連接2 中發送的包 是一樣的,因此會干擾 TCP_連接2 中的包(序號)。
如果有TIME_WAIT,由於TIME_WAIT的長度是 2MSL。因此,TCP_連接1中的滯留的包,經過了2MSL時間之后,已經失效了。就不會干擾新的TCP_連接2了。
此外,這也是為什么在Linux中,你Kill了某個連接進程之后,又立即重啟連接進程,會報 端口占用錯誤,因為在底層,其實它的端口還未釋放。
