基於完成端口的文件傳輸設計


完成端口

說到完成端口,我想很多人都不太陌生,下面是一段摘錄:
“完成端口”模型是迄今為止最為復雜的一種I/O模型。然而,假若一個應用程序同時需要管理為數眾多的套接字,那么采用這種模型,往往可以達到最佳的系統性能!但不幸的是,該模型只適用於Windows NT和Windows 2000操作系統。因其設計的復雜性,只有在你的應用程序需要同時管理數百乃至上千個套接字的時候,而且希望隨着系統內安裝的CPU數量的增多,應用程序的性能也可以線性提升,才應考慮采用“完成端口”模型。要記住的一個基本准則是,假如要為Windows NT或Windows 2000開發高性能的服務器應用,同時希望為大量套接字I/O請求提供服務(Web服務器便是這方面的典型例子),那么I/O完成端口模型便是最佳選擇!(節選自《Windows網絡編程》第八章)

在.net中,一旦說到完成端口,我們就不得不提到SocketAsyncEventArgs這個高性能的類,其內部是基於完成端口實現的,然后被微軟提供了諸多封裝,所以使用起來也比較簡單一些.由於這個類利用完成端口並且結合異步事件的方式進行設計的,所以我們可以大致的知道他的一些特點.

處理流程

關於此類,微軟的解釋很多,但是始終離不開高性能三個字.根據我的理解,如果利用此類設計一個服務端的消息傳送中心,那么其運作流程如下:

1.創建SocketAsyncEventArgs池,並且創建緩沖管理中心,負責對池中的對象分配緩沖大小.

2.創建服務器套接字,並處於監聽狀態.同時創建基於SocketAsyncEventArgs的客戶端接收對象,接收客戶端的連接.

3.如果有客戶端連接, 客戶端接收對象將會把控制權移交給數據接收對象,數據接收對象開始接收數據.

這只是一個簡單的流程,從這里我們可以看出,服務端套接字只是負責監聽,一旦有客戶端連接,就會把連接事件拋給接收對象;所以客戶端一個一個的來,服務端一個一個的拋,性能自然會好了不少.

說了這么多,我們來淺析一下SocketAsyncEventArgs對象中的一些基本的知識點.

緩沖

首先,說到緩沖,我們很容易理解為一個存儲池,里面可以放入東西,也可以拿走.在SocketAsyncEventArgs類中,我們可以利用其SetBuffer方法初始化緩沖區.

比如說: receiveArgs.SetBuffer(new byte[10],0,5);他的意思就是為當前接收數據對象設置的緩沖區大小為10字節,位置從0開始,並且允許接收的最大的數據長度為5字節.

當數據被接收,然后寫入到緩沖區的時候,數據會按照預先設定好的緩沖規則進行放置.比如這里我輸入aaaaa,那么在緩沖區會放入如下數據:

 

但是當有新的數據再進來的時候,比如這里我輸入了bbbb,那么緩沖區就變成了

看到了沒有,緩沖區中第五位依然是我們前一次插入的值,所以,在這里我提醒大家,緩沖區的東西取完之后,一定要重置一下,否則臟數據會導致數據錯誤.

講到這里,也許有人會說,假如我插入aaaaab會怎么樣,其實,這個長度已經超過了緩沖區的長度,緩沖區將會做截斷處理,然后當作兩段放入緩存中,首先會是

,然后會是

 

所以如果這個時候你的數據沒有被及時取走的話,將會得到最終結果 

 這不,已經發生粘包現象了.

 拋出方式

說完緩沖,這里我們需要說到的是服務端套接字是如何將接收的客戶端通過事件的方式拋給SocketAsyncEventArgs對象的.

在進行服務端,我們一般都會有一個用於監聽的套接字,套接字會利用serverSocket.AcceptAsync(SocketAsyncEventArgs)異步方法注冊SocketAsyncEventArgs對象,然后一旦有客戶端連接,就會激發SocketAsyncEventArgs對象的Completed事件,然后,在這個事件中,serverSocket會通過ReceiveAsync(SocketAsyncEventArgs)異步方法將數據處理的過程交給另外一個專門負責數據處理的對象去完成,這樣,客戶端連接,服務端只負責將連接事件拋給SocketAsyncEventArgs對象即可,比起傳統的編程方式: 服務端既負責監聽,又負責接收, 效率極大的提升.

同步方式

最后說到的是同步問題,因為在異步交互的系統中,同步問題確實很重要,尤其是當客戶端和服務器同步的時候.

客戶端里面的線程同步,我們可以利用AutoResetEvent來實現,客戶端和服務端同步的時候,我們需要AutoResetEvent並且結合一些標記信息來進行,也就是客戶端往服務器發送的一些小標記,比如1代表可以進行,0代表取消等等.

在設計的時候,我們還需要記住的是,代碼的核心只是負責數據的傳輸,不要加過多的邏輯判斷在里面,否則會影響性能,邏輯判斷等最好移到界面處理處來進行.

代碼精解

好了,下面上代碼來解釋解釋.

首先我們先從服務端開始:

View Code
  private int listenClientCount;
        private int poolSize;
        private int bufferSize;

        private EndPoint endPoint;
        private Socket serverSocket;
        private SocketAsyncEventArgsPool readWritePool;
        private BufferManager bufferManager;
        private SocketAsyncEventArgs acceptArgs;

        private SocketAsyncEventArgs receiveArgs;
        private SocketAsyncEventArgs sendArgs;

        public Action<Socket> OnStarted;
        public Action<Socket> OnConnected;
        public Action<SocketAsyncEventArgs> OnReceive;
        public Action<SocketAsyncEventArgs> OnSent;
        public Action<SocketAsyncEventArgs> OnDisconnect;
        public Action OnStopped;

在服務端,我們會定義服務端套接字,用於連接對象的acceptArgs,用於接收數據的receiveArgs以及用於拋出事件的一些Action等等,這些Action讓外部引用進行注冊並進行操縱.

然后這里是Start函數,

View Code
public void Start()
        {
            if (this.serverSocket != null)
                throw new Exception("Server is running, can't init another one.");

            this.serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            try
            {
                this.serverSocket.Bind(endPoint);
                this.serverSocket.Listen(listenClientCount);
            }
            catch (ArgumentNullException ex)
            {
                throw new Exception(ex.Message);
            }
            catch (SocketException ex)
            {
                throw new Exception(ex.Message);
            }

            if (OnStarted != null)
                OnStarted(serverSocket);

            StartAccept(null);
        }

這個函數的作用就是開啟監聽,並且將接收事件傳遞給acceptArgs對象.
然后,當客戶端連接過來的時候,服務端套接字開始將控制權傳遞給receiveArgs對象:

View Code
 private void ProcessAccept(SocketAsyncEventArgs e)
        {
            if (OnConnected != null)
            {
                OnConnected(e.AcceptSocket);
            }
            receiveArgs = this.readWritePool.Pop();
            this.bufferManager.SetBuffer(receiveArgs);
            receiveArgs.AcceptSocket = e.AcceptSocket;
            receiveArgs.Completed += IO_Completed;

            if (!e.AcceptSocket.ReceiveAsync(receiveArgs))
            {
                ProcessReceive(receiveArgs);
            }

            StartAccept(e);
        }

注意,這里有一個IO_Completed事件,表示一旦當前數據的數據接收完成,receiveArgs對象將會觸發該事件,用於進行下一步操作,一般會取lastOperation來判斷是否繼續接收.
最后需要說的就是真正的數據接收函數:

View Code
        private void ProcessReceive(SocketAsyncEventArgs e)
        {
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            {
                if (this.OnReceive != null)
                    this.OnReceive(e);

                if (!e.AcceptSocket.ReceiveAsync(e))
                {
                    this.ProcessReceive(e);
                }
            }
            else
            {
                CloseClient(e);
            }
        }

這個函數利用了循環的方式來接收數據,以便保證所有的數據都能夠進來,直至數據接收完成.在這個接收函數中,不應該有任何的邏輯判斷,否則會比較影響吞吐性能.

測試結果

好了,下面就看下界面截圖吧(請注意,這里我得客戶端是在上海,服務器端在香港,真實模擬數據傳輸情況).

(圖1,這是服務端開啟時候的情景)

(圖2,這是客戶端開啟時候的情景)

(圖3,客戶端准備發送文件給服務器端的情景,請注意這里的同步方式,客戶端會等待服務端准備完成,才會開始傳送數據)

(圖4,數據傳送中,請看,左邊是客戶端的緩沖區的緩沖數據,右邊是服務器端的接收緩沖區的緩沖數據)

(圖5,數據傳輸完畢的情景)

(圖6,請注意,即使我們服務端設置的接收緩沖區為327670,長度為32767,但是有時候數據還是不會完整的到達的,遇到這種情況,我們只需等待下一批數據到達即可.)

源碼下載

點擊這里下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM