IOCP詳解


 

簡介: IOCP(I/O Completion Port,I/O完成端口)是性能最好的一種I/O模型。它是應用程序使用線程池處理異步I/O請求的一種機制。

IOCP詳解

IOCP(I/O Completion Port,I/O完成端口)是性能最好的一種I/O模型。它是應用程序使用線程池處理異步I/O請求的一種機制。在處理多個並發的異步I/O請求時,以往的模型都是在接收請求是創建一個線程來應答請求。這樣就有很多的線程並行地運行在系統中。而這些線程都是可運行的,Windows內核花費大量的時間在進行線程的上下文切換,並沒有多少時間花在線程運行上。再加上創建新線程的開銷比較大,所以造成了效率的低下。

Windows Sockets應用程序在調用WSARecv()函數后立即返回,線程繼續運行。當系統接收數據完成后,向完成端口發送通知包(這個過程對應用程序不可見)。

應用程序在發起接收數據操作后,在完成端口上等待操作結果。當接收到I/O操作完成的通知后,應用程序對數據進行處理。

image.png

完成端口其實就是上面兩項的聯合使用基礎上進行了一定的改進。

一個完成端口其實就是一個通知隊列,由操作系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。而套接字在被創建后,可以在任何時候與某個完成端口進行關聯。

眾所皆知,完成端口是在WINDOWS平台下效率最高,擴展性最好的IO模型,特別針對於WINSOCK的海量連接時,更能顯示出其威力。其實建立一個完成端口的服務器也很簡單,只要注意幾個函數,了解一下關鍵的步驟也就行了。

分為以下幾步來說明完成端口:

0)  同步IO與異步IO

1)  函數

2)  常見問題以及解答

3)  步驟

4)  例程

 

0、同步IO與異步IO

同步I/O首先我們來看下同步I/O操作,同步I/O操作就是對於同一個I/O對象句柄在同一時刻只允許一個I/O操作,原理圖如下:

image.png

        
由圖可知,內核開始處理I/O操作到結束的時間段是T2~T3,這個時間段中用戶線程一直處於等待狀態,如果這個時間段比較短,則不會有什么問題,但是如果時間比較長,那么這段時間線程會一直處於掛起狀態,這就會很嚴重影響效率,所以我們可以考慮在這段時間做些事情。

異步I/O操作則很好的解決了這個問題,它可以使得內核開始處理I/O操作到結束的這段時間,讓用戶線程可以去做其他事情,從而提高了使用效率。

 image.png

由圖可知,內核開始I/O操作到I/O結束這段時間,用戶層可以做其他的操作,然后,當內核I/O結束的時候,可以讓I/O對象或者時間對象通知用戶層,而用戶線程GetOverlappedResult來查看內核I/O的完成情況。

1、函數

我們在完成端口模型下會使用到的最重要的兩個函數是:

CreateIoCompletionPort、GetQueuedCompletionStatus

CreateIoCompletionPort  的作用是創建一個完成端口和把一個IO句柄和完成端口關聯起來:

// 創建完成端口 HANDLECompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

 

// 把一個IO句柄和完成端口關聯起來,這里的句柄是一個socket 句柄 CreateIoCompletionPort((HANDLE)sClient,CompletionPort, (DWORD)PerHandleData, 0);

 

其中第一個參數是句柄,可以是文件句柄、SOCKET句柄。

第二個就是我們上面創建出來的完成端口,這里就把兩個東西關聯在一起了。

第三個參數很關鍵,叫做PerHandleData,就是對應於每個句柄的數據塊。我們可以使用這個參數在后面取到與這個SOCKET對應的數據。

最后一個參數給0,意思就是根據CPU的個數,允許盡可能多的線程並發執行。

 

GetQueuedCompletionStatus的作用就是取得完成端口的結果:

// 從完成端口中取得結果 GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE)

第一個參數是完成端口

第二個參數是表明這次的操作傳遞了多少個字節的數據

第三個參數是OUT類型的參數,就是前面CreateIoCompletionPort傳進去的單句柄數據,這里就是前面的SOCKET句柄以及與之相對應的數據,這里操作系統給我們返回,讓我們不用自己去做列表查詢等操作了。

第四個參數就是進行IO操作的結果,是我們在投遞WSARecv / WSASend 等操作時傳遞進去的,這里操作系統做好准備后,給我們返回了。非常省事!!

個人感覺完成端口就是操作系統為我們包裝了很多重疊IO的不爽的地方,讓我們可以更方便的去使用,下篇我將會嘗試去講述完成端口的原理。

2、常見問題和解答

1)什么是單句柄數據(PerHandle)和單IO數據(PerIO)

單句柄數據就是和句柄對應的數據,像socket句柄,文件句柄這種東西。

單IO數據,就是對應於每次的IO操作的數據。例如每次的WSARecv/WSASend等等

其實我覺得PER是每次的意思,翻譯成每個句柄數據和每次IO數據還比較清晰一點。

在完成端口中,單句柄數據直接通過GetQueuedCompletionStatus 返回,省去了我們自己做容器去管理。單IO數據也容許我們自己擴展OVERLAPPED結構,所以,在這里所有與應用邏輯有關的東西都可以在此擴展。

 

2)如何判斷客戶端的斷開

我們要處理幾種情況

a)如果客戶端調用了closesocket,我們就可以這樣判斷他的斷開:

if(0== GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, 。。。) { } if(BytesTransferred == 0) {     // 客戶端斷開,釋放資源 }

b)如果是客戶端直接退出,那就會出現64錯誤,指定的網絡名不可再用。這種情況我們也要處理的:

if(0== GetQueuedCompletionStatus(。。。)) {    if( (GetLastError() == WAIT_TIMEOUT) ||(GetLastError() == ERROR_NETNAME_DELETED) )    {         // 客戶端斷開,釋放資源    } }

3)什么是IOCP?

我們已經提到IOCP 只不過是一個專門實現用來進行線程間的通信的技術,和信號量(semaphore)相似,因此IOCP並不是一個復雜的概念。一個IOCP 對象是與多個I/O對象關聯的,這些對象支持掛起異步IO調用。直到一個掛起的異步IO調用結束為止,一個訪問IOCP的線程都有可能被掛起。

完成端口的目標是使CPU保持在滿負荷狀態下工作。

4)為什么使用IOCP?

使用IOCP,我們可以克服”一個客戶端一個線程”的問題。我們知道,這樣做的話,如果軟件不是運行在一個多核及其上性能就會急劇下降。線程是系統資源,他們既不是無限制的、也不是代價低廉的。

IOCP提供了一種只使用一些(I/O worker)線程去“相對公平地”完成多客戶端的”輸入輸出”。線程會一直被掛起,而不會使用CPU時間片,直到有事情做完為止。

5)IOCP是如何工作的?

當使用IOCP時,你必須處理三件事情:a)將一個Socket關聯到完成端口;b)創建一個異步I/O調用; c)與線程進行同步。為了獲得異步IO調用的結果,比如哪個客戶端執行了調用,你必須傳入兩個參數:pCompletionKey參數和OVERLAPPED結構。

3、步驟

編寫完成端口服務程序,無非就是以下幾個步驟:

  1、創建一個完成端口

  2、根據CPU個數創建工作者線程,把完成端口傳進去線程里

  3、創建偵聽SOCKET,把SOCKET和完成端口關聯起來

  4、創建PerIOData,向連接進來的SOCKET投遞WSARecv操作

  5、線程里所做的事情:

 a、GetQueuedCompletionStatus,在退出的時候就可以使用PostQueudCompletionStatus使線程退出;

 b、取得數據並處理;

4、例程

下面是服務端的例程,可以使用sunxin視頻中中的客戶端程序來測試服務端。稍微研究一下,也就會對完成端口模型有個大概的了解了。

實例結果服務器、客戶端如下:

image.png

image.png

/*

   完成端口服務器

   接收到客戶端的信息,直接顯示出來

*/

 

#include"winerror.h"
#include"Winsock2.h"
#pragmacomment(lib, "ws2_32")
#include"windows.h"
#include<iostream>
usingnamespace std;
 
/// 宏定義
#define PORT 5050
#define DATA_BUFSIZE 8192
 
#define OutErr(a) cout << (a) << endl \
      << "出錯代碼:"<< WSAGetLastError() << endl \
      << "出錯文件:"<< __FILE__ << endl  \
      << "出錯行數:"<< __LINE__ << endl \
 
#define OutMsg(a) cout << (a) << endl;
 
 
/// 全局函數定義
 
 
///////////////////////////////////////////////////////////////////////
//
// 函數名       : InitWinsock
// 功能描述     : 初始化WINSOCK
// 返回值       : void
//
///////////////////////////////////////////////////////////////////////
void InitWinsock()
{
       // 初始化WINSOCK
        WSADATA wsd;
        if( WSAStartup(MAKEWORD(2, 2), &wsd) != 0)
        {
               OutErr("WSAStartup()");
        }
}
 
///////////////////////////////////////////////////////////////////////
//
// 函數名       : BindServerOverlapped
// 功能描述     : 綁定端口,並返回一個 Overlapped 的ListenSocket
// 參數         : int nPort
// 返回值       : SOCKET
//
///////////////////////////////////////////////////////////////////////
SOCKET BindServerOverlapped(int nPort)
{
 // 創建socket
 SOCKET sServer = WSASocket(AF_INET,SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
 
 // 綁定端口
 struct sockaddr_in servAddr;
 servAddr.sin_family = AF_INET;
 servAddr.sin_port = htons(nPort);
 servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
 
 if(bind(sServer, (struct sockaddr*)&servAddr, sizeof(servAddr)) < 0)
 {
        OutErr("bind Failed!");
        return NULL;
 }
 
 // 設置監聽隊列為200
 if(listen(sServer, 200) != 0)
 {
        OutErr("listen Failed!");
        return NULL;
 }
 return sServer;
}
 
 
/// 結構體定義
typedef struct
{
   OVERLAPPED Overlapped;
   WSABUF DataBuf;
   CHAR Buffer[DATA_BUFSIZE];
}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;
 
 
typedef struct
{
   SOCKET Socket;
}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;
 
 
DWORD WINAPI ProcessIO(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred;
    LPPER_HANDLE_DATA PerHandleData;
    LPPER_IO_OPERATION_DATA PerIoData;
 
 while(true)
 {
 
       if(0 == GetQueuedCompletionStatus(CompletionPort,&BytesTransferred, (LPDWORD)&PerHandleData,(LPOVERLAPPED*)&PerIoData, INFINITE))
       {
              if( (GetLastError() ==WAIT_TIMEOUT) || (GetLastError() == ERROR_NETNAME_DELETED) )
              {
                     cout << "closingsocket" << PerHandleData->Socket << endl; 
                     closesocket(PerHandleData->Socket);
 
                     delete PerIoData;
                     delete PerHandleData;
                     continue;
              }
              else
              {
               OutErr("GetQueuedCompletionStatus failed!");
              }
              return 0;
       }
 
       // 說明客戶端已經退出
       if(BytesTransferred == 0)
       {
         cout << "closing socket" <<PerHandleData->Socket << endl;
         closesocket(PerHandleData->Socket);
         delete PerIoData;
         delete PerHandleData;
         continue;
       }
 
       // 取得數據並處理
       cout << PerHandleData->Socket<< "發送過來的消息:" << PerIoData->Buffer<< endl;
 
       // 繼續向 socket 投遞WSARecv操作
       DWORD Flags = 0;
       DWORD dwRecv = 0;
       ZeroMemory(PerIoData,sizeof(PER_IO_OPERATION_DATA));
       PerIoData->DataBuf.buf =PerIoData->Buffer;
       PerIoData->DataBuf.len = DATA_BUFSIZE;
       WSARecv(PerHandleData->Socket,&PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);
 }
 
 return 0;
}
 
void main()
{
        InitWinsock();
        HANDLE CompletionPort =CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
 
        //根據系統的CPU來創建工作者線程
        SYSTEM_INFO SystemInfo;
        GetSystemInfo(&SystemInfo);
 
        //線程數目=系統進程數目的兩倍.
        for(int i = 0; i <SystemInfo.dwNumberOfProcessors * 2; i++)
        {
               HANDLE hProcessIO = CreateThread(NULL, 0,ProcessIO, CompletionPort, 0, NULL);
               if(hProcessIO)
               {
                      CloseHandle(hProcessIO);
               }
        }
 
        //創建偵聽SOCKET
        SOCKET sListen = BindServerOverlapped(PORT);
 
        SOCKET sClient;
        LPPER_HANDLE_DATA PerHandleData;
        LPPER_IO_OPERATION_DATA PerIoData;
        while(true)
        {
               // 等待客戶端接入
               //sClient = WSAAccept(sListen, NULL, NULL, NULL, 0);
               sClient = accept(sListen, 0, 0);
               cout << "Socket " << sClient << "連接進來"<< endl;
 
               PerHandleData = new PER_HANDLE_DATA();
               PerHandleData->Socket = sClient;
 
               // 將接入的客戶端和完成端口聯系起來
               CreateIoCompletionPort((HANDLE)sClient, CompletionPort,(DWORD)PerHandleData, 0);
 
               // 建立一個Overlapped,並使用這個Overlapped結構對socket投遞操作
               PerIoData = new PER_IO_OPERATION_DATA();
 
               ZeroMemory(PerIoData, sizeof(PER_IO_OPERATION_DATA));
               PerIoData->DataBuf.buf = PerIoData->Buffer;
               PerIoData->DataBuf.len = DATA_BUFSIZE;
 
               // 投遞一個WSARecv操作
               DWORD Flags = 0;
               DWORD dwRecv = 0;
               WSARecv(sClient, &PerIoData->DataBuf, 1, &dwRecv, &Flags,&PerIoData->Overlapped, NULL);
        }
 
       DWORD dwByteTrans;
       //將一個已經完成的IO通知添加到IO完成端口的隊列中.
        //提供了與線程池中的所有線程通信的方式.
        PostQueuedCompletionStatus(CompletionPort,dwByteTrans, 0, 0);  //IO操作完成時接收的字節數.
        
        closesocket(sListen);
}

 

/*--------------------------------------------

**---------客戶端例程序-----------------------

---------------------------------------------*/

#include<stdio.h>
#include<Winsock2.h>
#define MAXCNT 30000
void main()
{
       WORD wVersionRequested;
       WSADATA wsaData;
       int err;
      
       wVersionRequested = MAKEWORD( 2, 2);
      
       err = WSAStartup( wVersionRequested,&wsaData );//WSAStartup()加載套接字庫
       if ( err != 0 ) {
             
              return;
       }
      
       if ( LOBYTE( wsaData.wVersion ) != 2 ||
              HIBYTE( wsaData.wVersion ) != 2 ){
              WSACleanup( );
              return;
       }
 
       static int nCnt = 0;
       char sendBuf[2000];
//     char recvBuf[100];
       while(nCnt < MAXCNT)
       {
              SOCKETsockClient=socket(AF_INET,SOCK_STREAM,0);
              SOCKADDR_IN addrSrv;
              addrSrv.sin_addr.S_un.S_addr=inet_addr("127.0.0.1");//本地回路地址127,用於一台機器上測試的IP
              addrSrv.sin_family=AF_INET;
              addrSrv.sin_port=htons(5050);//和服務器端的端口號保持一致
              connect(sockClient,(SOCKADDR*)&addrSrv,sizeof(SOCKADDR));//連接服務器端(套接字,地址轉換,長度)
      
 
              sprintf(sendBuf,"This is TestNo : %d\n",++nCnt);
              send(sockClient,sendBuf,strlen(sendBuf)+1,0);//向服務器端發送數據,"+1"是為了給'\0'留空間
              printf("send:%s",sendBuf);
 
//           memset(recvBuf,0,100);
//           recv(sockClient,recvBuf,100,0);//接收數據
//           printf("%s\n",recvBuf);//打印
             
              closesocket(sockClient);//關閉套接字,釋放為這個套接字分配的資源
              Sleep(1);
       }
       WSACleanup();//終止對這個套接字庫的使用
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM