阻塞通信之Socket編程


Socket通信,主要是基於TCP協議的通信。本文從Socket通信(代碼實現)、多線程並發、以及TCP協議相關原理方面 介紹 阻塞Socket通信一些知識。

 本文從服務器端的視角,以“Echo Server”程序為示例,描述服務器如何處理客戶端的連接請求。Echo Server的功能就是把客戶端發給服務器的數據原封不動地返回給客戶端。

第一種方式是單線程處理方式:服務器的處理方法如下:

 1     public void service(){
 2         while (true) {
 3             Socket socket = null;
 4             try {
 5                 socket = serverSocket.accept();
 6                 System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
 7                 BufferedReader br = getBufferReader(socket);//獲得socket輸入流,並將之包裝成BufferedReader
 8                 PrintWriter pw = getWriter(socket);//獲得socket輸出流,並將之包裝成PrintWriter
 9                 String msg = null;
10                 while ((msg = br.readLine()) != null) {
11                     
12                     pw.println(echo(msg));//服務端的處理邏輯,將client發來的數據原封不動再發給client
13                     pw.flush();
14                     if(msg.equals("bye"))//若client發送的是 "bye" 則關閉socket
15                         break;
16                 }
17             } catch (IOException e) {
18                 e.printStackTrace();
19             } finally {
20                 try{
21                     if(socket != null)
22                         socket.close();
23                 }catch(IOException e){e.printStackTrace();}
24             }
25         }
26     }

上面用的是while(true)循環,這樣,Server不是只接受一次Client的連接就退出,而是不斷地接收Client的連接。

1)第5行,服務器線程執行到accept()方法阻塞,直至有client的連接請求到來。

2)當有client的請求到來時,就會建立socket連接。從而在第8、9行,就可以獲得這條socket連接的輸入流和輸出流。輸入流(BufferedReader)負責讀取client發過來的數據,輸出流(PrintWriter)負責將處理后的數據返回給Client。

 

下面來詳細分析一下建立連接的過程:

Client要想成功建立一條到Server的socket連接,其實是受很多因素影響的。其中一個就是:Server端的“客戶連接請求隊列長度”。它可以在創建ServerSocket對象由構造方法中的 backlog 參數指定:JDK中 backlog參數的解釋是: requested maximum length of the queue of incoming connections.

    public ServerSocket(int port, int backlog) throws IOException {
        this(port, backlog, null);
    }

看到了這個:incoming commections 有點奇怪,因為它講的是“正在到來的連接”,那什么又是incoming commections 呢?這個就也TCP建立連接的過程有關了。

TCP建立連接的過程可簡述為三次握手。第一次:Client發送一個SYN包,Server收到SYN包之后回復一個SYN/ACK包,此時Server進入一個“中間狀態”--SYN RECEIVED 狀態。

這可以理解成:Client的連接請求已經過來了,只不過還沒有完成“三次握手”。因此,Server端需要把當前的請求保存到一個隊列里面,直至當Server再次收到了Client的ACK之后,Server進入ESTABLISHED狀態,此時:serverSocket 從accpet() 阻塞狀態中返回。也就是說:當第三次握手的ACK包到達Server端后,Server從該請求隊列中取出該連接請求,同時Server端的程序從accept()方法中返回。

那么這個請求隊列長度,就是由 backlog 參數指定。那這個隊列是如何實現的呢?這個就和操作系統有關了,感興趣的可參考:How TCP backlog works in Linux

此外,也可以看出:服務器端能夠接收的最大連接數 也與 這個請求隊列有關。對於那種高並發場景下的服務器而言,首先就是請求隊列要足夠大;其次就是當連接到來時,要能夠快速地從隊列中取出連接請求並建立連接,因此,執行建立連接任務的線程最好不要阻塞。

 

現在來分析一下上面那個:單線程處理程序可能會出現的問題:

服務器始終只有一個線程執行accept()方法接受Client的連接。建立連接之后,又是該線程處理相應的連接請求業務邏輯,這里的業務邏輯是:把客戶端發給服務器的數據原封不動地返回給客戶端。

顯然,這里一個線程干了兩件事:接受連接請求 和 處理連接(業務邏輯)。好在這里的處理連接的業務邏輯不算復雜,如果對於復雜的業務邏輯 而且 有可能在執行業務邏輯過程中還會發生阻塞的情況時,那此時服務器就再也無法接受新的連接請求了。

 

第二種方式是:一請求一線程的處理模式:

 1     public void service() {
 2         while (true) {
 3             Socket socket = null;
 4             try {
 5                 socket = serverSocket.accept();//接受client的連接請求
 6                 new Thread(new Handler(socket)).start();//每接受一個請求 就創建一個新的線程 負責處理該請求
 7             } catch (IOException e) {
 8                 e.printStackTrace();
 9             } 
10             finally {
11                 try{
12                     if(socket != null)
13                         socket.close();
14                 }catch(IOException e){e.printStackTrace();}
15             }
16         }
17     }

 

再來看Handler的部分實現:Handler是一個implements Runnable接口的線程,在它的run()里面處理連接(執行業務邏輯)

 1 class Handler implements Runnable{
 2     Socket socket;
 3     public Handler(Socket socket) {
 4         this.socket = socket;
 5     }
 6     
 7     @Override
 8     public void run() {
 9         try{
10             BufferedReader br = null;
11             PrintWriter pw = null;
12             System.out.println("new connection accepted " + socket.getInetAddress() + ":" + socket.getPort());
13             
14             br = getBufferReader(socket);
15             pw = getWriter(socket);
16             
17             String msg = null;
18             while((msg = br.readLine()) != null){
19                 pw.println(echo(msg));
20                 pw.flush();
21                 if(msg.equals("bye"))
22                     break;
23             }
24         }catch(IOException e){
25             e.printStackTrace();
26         }
27     }

 

從上面的單線程處理模型中看到:如果線程在執行業務邏輯中阻塞了,服務器就不能接受用戶的連接請求了。

而對於一請求一線程模型而言,每接受一個請求,就創建一個線程來負責該請求的業務邏輯。盡管,這個請求的業務邏輯執行時阻塞了,只要服務器還能繼續創建線程,那它就還可以繼續接受新的連接請求。此外,負責建立連接請求的線程 和 負責處理業務邏輯的線程分開了。業務邏輯執行過程中阻塞了,“不會影響”新的請求建立連接。

顯然,如果Client發送的請求數量很多,那么服務器將會創建大量的線程,而這是不現實的。有以下原因:

1)創建線程是需要系統開銷的,線程的運行系統資源(內存)。因此,有限的硬件資源 就限制了系統中線程的數目。

2)當系統中線程很多時,線程的上下文開銷會很大。比如,請求的業務邏輯的執行是IO密集型任務,經常需要阻塞,這會造成頻繁的上下文切換。  

3)當業務邏輯處理完成之后,就需要銷毀線程,如果請求量大,業務邏輯又很簡單,就會導致頻繁地創建銷毀線程。

那能不能重用已創建的線程? ---這就是第三種方式:線程池處理。

 

第三種方式是線程池的處理方式:

 1 public class EchoServerThreadPool {
 2     private int port = 8000;
 3     private ServerSocket serverSocket;
 4     private ExecutorService executorService;
 5     private static int POOL_SIZE = 4;//每個CPU中線程擁有的線程數
 6     
 7     public EchoServerThreadPool()throws IOException {
 8         serverSocket = new ServerSocket(port);
 9         executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE);
10         System.out.println("server start");
11     }
12     
13     public void service(){
14         while(true){
15             Socket socket = null;
16             try{
17                 socket = serverSocket.accept();//等待接受Client連接
18                 executorService.execute(new Handler(socket));//將已經建立連接的請求交給線程池處理
19             }catch(IOException e){
20                 e.printStackTrace();
21             }
22         }
23     }
24     public static void main(String[] args)throws IOException{
25         new EchoServerThreadPool().service();
26     }
27 }

 

采用線程池最大的優勢在於“重用線程”,有請求任務來了,從線程池中取出一個線程負責該請求任務,任務執行完成后,線程自動歸還到線程池中,而且java.util.concurrent包中又給出了現成的線程池實現。因此,這種方式看起來很完美,但還是有一些問題是要注意的:

1)線程池有多大?即線程池里面有多少個線程才算比較合適?這個要根據具體的業務邏輯來分析,而且還得考慮面對的使用場景。一個合理的要求就是:盡量不要讓CPU空閑下來,即CPU的復用率要高。如果業務邏輯是經常會導致阻塞的IO操作,一般需要設置 N*(1+WT/ST)個線程,其中N為可用的CPU核數,WT為等待時間,ST為實際占用CPU運算時間。如果業務邏輯是CPU密集型作業,那么線程池中的線程數目一般為N個或N+1個即可,因為太多了會導致CPU切換開銷,太少了(小於N),有些CPU核就空閑了。

2)線程池帶來的死鎖問題

線程池為什么會帶來死鎖呢?在JAVA 1.5 之后,引入了java.util.concurrent包。線程池則可以通過如下方式實現:

ExecutorService executor = Executors.newSingleThreadExecutor();
//ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(task);// task implements Runnable

executor.shutdown();

Executors可以創建各種類型的線程池。如果創建一個緩存的線程池:

ExecutorService executor = Executors.newCachedThreadPool();

對於高負載的服務器而言,在緩存線程池中,被提交的任務沒有排成隊列,而是直接交給線程執行。也就是說:只要來一個請求,如果線程池中沒有線程可用,服務器就會創建一個新的線程。如果線程已經把CPU用完了,此時還再創建線程就沒有太大的意義了。因此,對於高負載的服務器而言,一般使用的是固定數目的線程池(來自Effective Java)

 

主要有兩種類型的死鎖:①線程A占有了鎖X,等待鎖Y,而線程B占用了鎖Y,等待鎖X。因此,向線程池提交任務時,要注意判斷:提交了的任務(Runnable對象)會不會導致這種情況發生?

②線程池中的所有線程在執行各自的業務邏輯時都阻塞了,它們都需要等待某個任務的執行結果,而這個任務還在“請求隊列”里面未提交!

3)來自Client的請求實在是太多了,線程池中的線程都用完了(已無法再創建新線程)。此時,服務器只好拒絕新的連接請求,導致Client拋出:ConnectException。

4)線程泄露

導致線程泄露的原因也很多,而且還很難發覺,網上也有很多善於線程池線程泄露的問題。比如說:線程池中的線程在執行業務邏輯時拋異常了,怎么辦?是不是這個工作線程就異常終止了?那這樣,線程池中可用的線程數就少了一個了?看一下JDK ThreadPoolExecutor 線程池中的線程執行任務的過程如下:

       try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }

從上面源碼看出:線程執行出異常后是由 afterExecute(task, thrown) 來處理的。至於對線程有何影響,我也沒找到很好的解釋。

 

另外一種引起線程泄露的情況就是:線程池中的工作線程在執行業務邏輯時,一直阻塞下去了。那這也意味着這個線程基本上不干活了,這就影響了線程池中實際可用的線程數目。如何所有的線程都是這種情況,那也無法向線程池提交任務了。此外,關於線程池帶來的問題還可參考:Java編程中線程池的風險規避  另外, 關於JAVA線程池使用可參考下:Java的Executor框架和線程池實現原理

 

到這里,阻塞通信的三種模式都已經介紹完畢了。在網上發現了一篇很好的博文,剛好可以配合我這篇文章的代碼演示一起來看:架構設計:系統間通信(1)——概述從“聊天”開始上篇

 

TCP連接 對 應用層協議(比如 HTTP協議)會產生哪些影響?

主要從以下幾個方面描述TCP協議對應用層協議的影響:(結合JAVA網絡編程中的 具體SOcket類的 相關參數分析)

1)最大段長度MSS

TCP協議是提供可靠連接的,在建立連接的過程中,會協商一些參數,比如MSS。TCP傳輸的數據是流,把流截成一段段的報文進行傳輸,MSS是 每次傳輸TCP報文的 最大數據分段。

為什么需要MSS呢?如果傳輸的報文太大,則需要在IP層進行分片,分成了若干片的報文在傳輸過程中任何一片丟失了,整個報文都得重傳。重傳直接影響了網絡效率。因此,在建立連接時就協商(SYN包)底層的數據鏈路層最大能傳遞多大的報文(比如以太網的MTU=1500),然后在傳輸層(TCP)就對數據進行分段,盡量避免TCP傳輸的數據在IP層分片。

另外,關於MSS可參考:【網絡協議】TCP分段與IP分片 和 IP分片詳解

而對於上層應用而言(比如HTTP協議),它只管將數據寫入緩沖區,但其實它寫入的數據在TCP層其實是被分段發送的。當目的主機收到所有的分段之后,需要重組分段。因此,就會出現所謂的HTTP粘包問題。

 

2)TCP連接建立過程的“三次握手”

“三次握手”的大概流程如下:

Client發送一個SYN包,Server返回一個SYN/ACK包,然后Client再對 SYN/ACK 包進行一次確認ACK。在對 SYN/ACK 進行確認時,Client就可以向Server端 發送實際的數據了。這種利用ACK確認時順帶發送數據的方式 可以 減少 Client與Server 之間的報文交換。

 

3)TCP“慢啟動”的擁塞控制

 什么是“慢啟動”呢?因為TCP連接是可靠連接,具有擁塞控制的功能。如果不進行擁塞控制,網絡擁堵了導致容易丟包,丟包又得重傳,就很難保證可靠性了。

 而“慢啟動”就是實現 擁塞控制 的一種機制。也就是說:對於建立的TCP連接而言,它不能立馬就發送很多報文,而是:先發送 1個報文,等待對方確認;收到確認后,就可以一次發送2個報文了,再等待對方確認;收到確認后,就一次可以發送4個報文了.....每次可發送的報文數依次增加(指數級增加,當然不會一直增加下去),這個過程就是“打開擁塞窗口”。

那這個慢啟動特性有何影響呢?

一般而言,就是“老的”TCP連接 比 新建立的 TCP連接有着更快的發送速度。因為,新的TCP連接有“慢啟動”啊。而“老的”TCP連接可能一次允許發送多個報文。

因此,對於HTTP連接而言,選擇重用現有連接既可以減少新建HTTP連接的開銷,又可以重用老的TCP連接,立即發送數據。

HTTP重用現有的連接,在HTTP1.0的 Connection頭部設置"Keep-Alive"屬性。在HTTP1.1版本中,默認是打開持久連接的,可參考HTTP1.1中的 persistent 參數。

 

4)發送數據時,先收集待發送的數據,讓發送緩沖區滿了之后再發送的Nagle算法

對於一條Socket連接而言,發送方有自己的發送緩沖區。在JAVA中,由java.net.SocketOptions類的 SO_SNFBUF 屬性指定。可以調用setSendBufferSize方法來設置發送緩沖區(同理接收緩沖區)

public synchronized void setSendBufferSize(int size)
    throws SocketException{
        if (!(size > 0)) {
            throw new IllegalArgumentException("negative send size");
        }
        if (isClosed())
            throw new SocketException("Socket is closed");
        getImpl().setOption(SocketOptions.SO_SNDBUF, new Integer(size));
    }

 

那什么是Negle算法呢?

假設每次發送的TCP分段只包含少量的有效數據(比如1B),而TCP首部加上IP首部至少有40B,每次為了發送1B的數據都要帶上一個40B的首部,顯然網絡利用率是很低的。

因為,Negle算法就是:發送方的數據不是立即就發送,而是先放在緩沖區內,等到緩沖區滿了再發送(或者所發送的所有分組都已經返回了確認了)。說白了,就是先把數據“聚集起來”,分批發送。

Negale算法對上層應用會有什么影響呢?

對小批量數據傳輸的時延影響很大。比如 網絡游戲 中的實時捕獲 玩家的位置。玩家位置變了,也許只有一小部分數據發送給 服務器,若采用Negale算法,發送的數據被緩沖起來了,服務器會遲遲接收不到玩家的實時位置信息。因此,Negale算法適合於那種大批量數據傳輸的場景。

因此,SocketOptions類的 TCP_NODELAY 屬性用來設置 在TCP連接中是否啟用 Negale算法。

    public void setTcpNoDelay(boolean on) throws SocketException {
        if (isClosed())
            throw new SocketException("Socket is closed");
        getImpl().setOption(SocketOptions.TCP_NODELAY, Boolean.valueOf(on));
    }

 

5)在發送數據時捎帶確認的延遲確認算法

 比如,Server在接收到了Client發送的一些數據,但是Server並沒有立即對這些數據進行確認。而是:當Server有數據需要發送到Client時,在發送數據的同時 捎帶上 對前面已經接收到的數據的確認。(這其實也是盡量減少Server與Client之間的報文量,畢竟:每發一個報文,是有首部開銷的。)

這種方式會影響到上層應用的響應性。可能會對HTTP的請求-響應模式產生很大的時延。

 

6)TCP的 KEEP_ALIVE

這個在JDK源碼中解釋的非常好了。故直接貼上來:

    /**
     * When the keepalive option is set for a TCP socket and no data
     * has been exchanged across the socket in either direction for
     * 2 hours (NOTE: the actual value is implementation dependent),
     * TCP automatically sends a keepalive probe to the peer. This probe is a
     * TCP segment to which the peer must respond.
     * One of three responses is expected:
     * 1. The peer responds with the expected ACK. The application is not
     *    notified (since everything is OK). TCP will send another probe
     *    following another 2 hours of inactivity.
     * 2. The peer responds with an RST, which tells the local TCP that
     *    the peer host has crashed and rebooted. The socket is closed.
     * 3. There is no response from the peer. The socket is closed.
     *
     * The purpose of this option is to detect if the peer host crashes.
     *
     * Valid only for TCP socket: SocketImpl

當TCP連接設置了KEEP-ALIVE時,如果這條socket連接在2小時(視情況而定)內沒有數據交換,然后就會發一個“探測包”,以判斷對方的狀態。

然后,等待對方發送這個探測包的響應。一共會出現以上的三種情況,並根據出現的情況作出相應的處理。

①對方(peer)收到了正常的 ACK,說明一切正常,上層應用並不會注意到這個過程(發送探測包的過程)。再等下一個2個小時時繼續探測連接是否存活。

②對方返回一個RST包,表明對方已經crashed 或者 rebooted,socket連接關閉。

③未收到對方的響應,socket連接關閉。

這里需要注意的是:在HTTP協議中也有一個KEEP-ALIVE,可參考:HTTP長連接

 

7)TCP連接關閉時的影響

TCP關閉連接有“四次揮手”,主動關閉連接的一方會有一個 TIME_WAIT 狀態。也就是說,在Socket的close()方法執行后,close()方法立即返回了,但是底層的Socket連接並不會立即關閉,而是會等待一段時間,將剩余的數據都發送完畢再關閉連接。可以用SocketOptions的 SO_LINGER 屬性來控制sockect的關閉行為。

看JDK中 SO_LINGER的解釋如下:

    /**
     * Specify a linger-on-close timeout.  This option disables/enables * immediate return from a <B>close()</B> of a TCP Socket.  Enabling
     * this option with a non-zero Integer <I>timeout</I> means that a
     * <B>close()</B> will block pending the transmission and acknowledgement * of all data written to the peer, at which point the socket is closed * <I>gracefully</I>.  Upon reaching the linger timeout, the socket is
     * closed <I>forcefully</I>, with a TCP RST. Enabling the option with a
     * timeout of zero does a forceful close immediately. If the specified
     * timeout value exceeds 65,535 it will be reduced to 65,535.
     * <P>
     * Valid only for TCP: SocketImpl
     *
     * @see Socket#setSoLinger
     * @see Socket#getSoLinger
     */
    public final static int SO_LINGER = 0x0080;

 

因此,當調用Socket類的 public void setSoLinger(boolean on, int linger)設置了 linger 時間后,執行 close()方法不會立即返回,而是進入阻塞狀態。

然后,Socket會 等到所有的數據都已經確認發送了 peer 端。(will block pending the transmission and acknowledgement of all data written to the peer)【第四次揮手時client 發送的ACK到達了Server端】

或者:經過了 linger 秒之后,強制關閉連接。( Upon reaching the linger timeout, the socket is closed forcefully)

 

那為什么需要一個TIME_WAIT時延呢?即:執行 close()方法 時需要等待一段時間再 真正關閉Socket?這也是“四次揮手”時,主動關閉連接的一方會 持續 TIME_WAIT一段時間(一般是2MSL大小)

①確保“主動關閉端”(Client端)最后發送的ACK能夠成功到達“被動關閉端”(Server端)

因為,如何不能確保ACK是否成功到達Server端的話,會影響Server端的關閉。假設最后第四次揮手時 Client 發送給 Server的ACK丟失了,若沒有TIME_WAIT,Server會認為是自己FIN包沒有成功發送給Client(因為Server未收到ACK啊),就會導致Server重傳FIN,而不能進入 closed 狀態。

②舊的TCP連接包會干擾新的TCP連接包,導致新的TCP連接收到的包亂序。

若沒有TIME_WAIT,本次TCP連接(為了更好的闡述問題,記本次TCP連接為TCP_連接1)斷開之后,又立即建立新的一條TCP連接(TCP_連接2)。

TCP_連接1 發送的包 有可能在網絡中 滯留了。而現在又新建了一條 TCP_連接2 ,如果滯留的包(滯留的包是無效的包了,因為TCP_連接1已經關閉了) 又 重新到達了 TCP_連接2,由於 滯留的包的(源地址,源端口,目的地址,目的端口)與 TCP_連接2 中發送的包 是一樣的,因此會干擾 TCP_連接2 中的包(序號)。

如果有TIME_WAIT,由於TIME_WAIT的長度是 2MSL。因此,TCP_連接1中的滯留的包,經過了2MSL時間之后,已經失效了。就不會干擾新的TCP_連接2了。

 

此外,這也是為什么在Linux中,你Kill了某個連接進程之后,又立即重啟連接進程,會報 端口占用錯誤,因為在底層,其實它的端口還未釋放。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM