Server Develop (八) IOCP模型


IOCP模型

  IOCP全稱I/O Completion Port,中文譯為I/O完成端口。IOCP是一個異步I/O的Windows API,它可以高效地將I/O事件通知給應用程序,類似於Linux中的Epoll。

簡介

  IOCP模型屬於一種通訊模型,適用於Windows平台下高負載服務器的一個技術。在處理大量用戶並發請求時,如果采用一個用戶一個線程的方式那將造成CPU在這成千上萬的線程間進行切換,后果是不可想象的。而IOCP完成端口模型則完全不會如此處理,它的理論是並行的線程數量必須有一個上限-也就是說同時發出500個客戶請求,不應該允許出現500個可運行的線程。目前來說,IOCP完成端口是Windows下性能最好的I/O模型,同時它也是最復雜的內核對象。它避免了大量用戶並發時原有模型采用的方式,極大的提高了程序的並行處理能力。

原理圖

  從圖中可以看到,一共包括三部分:完成端口(存放重疊的I/O請求),客戶端請求的處理,等待者線程隊列(一定數量的工作者線程,一般采用CPU*2個)。

  完成端口中所謂的[端口]並不是我們在TCP/IP中所提到的端口,可以說是完全沒有關系。它其實就是一個通知隊列,由操作系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。

  通常情況下,我們會在創建一定數量的工作者線程來處理這些通知,也就是線程池的方法。線程數量取決於應用程序的特定需要。理想的情況是,線程數量等於處理器的數量,不過這也要求任何線程都不應該執行諸如同步讀寫、等待事件通知等阻塞型的操作,以免線程阻塞。每個線程都將分到一定的CPU時間,在此期間該線程可以運行,然后另一個線程將分到一個時間片並開始執行。如果某個線程執行了阻塞型的操作,操作系統將剝奪其未使用的剩余時間片並讓其它線程開始執行。也就是說,前一個線程沒有充分使用其時間片,當發生這樣的情況時,應用程序應該准備其它線程來充分利用這些時間片。

IOCP的優點

  基於IOCP的開發是異步IO的,決定了IOCP所實現的服務器的高吞吐量。

  完成端口的線程並發量可以在創建該完成端口時指定,從而限制了與該完成端口相關聯的可運行線程的數目。

     通過引入IOCP,會大大減少Thread切換帶來的額外開銷,最小化的線程上下文切換,減少線程切換帶來的巨大開銷,讓CPU把大量的事件用於線程的運行。當與該完成端口相關聯的可運行線程的總數目達到了該並發量,系統就會阻塞任何與該完成端口相關聯的后續線程的執行,直到與該完成端口相關聯的可運行線程數目下降到小於該並發量為止。

  Select是先查詢再發起IO請求,IOCP是先發起IO請求再接收通知。但是Select方式在處理大量非活動連接時是比較低效的,因為每次Select需要對所有的Socket狀態進行查詢,而對非活動的Socket查詢是沒有意義的浪費,另外由於Socket句柄不能設置用戶私有數據,當查詢返回Socket句柄時還需要一個額外的查詢來找到關聯的用戶對象,這兩點是Select低效的關鍵。

IOCP的具體實現步驟

  IOCP中用到單個函數,分為用於創建關聯完成端口、獲取完成狀態和投遞完成狀態,函數原型:

//功能:創建完成端口和關聯完成端口
 HANDLE WINAPI CreateIoCompletionPort(
     *    __in   HANDLE FileHandle,              // 已經打開的文件句柄或者空句柄,一般是客戶端的句柄
     *    __in   HANDLE ExistingCompletionPort,  // 已經存在的IOCP句柄
     *    __in   ULONG_PTR CompletionKey,        // 完成鍵,包含了指定I/O完成包的指定文件
     *    __in   DWORD NumberOfConcurrentThreads // 真正並發同時執行最大線程數,一般推介是CPU核心數*2
     * );

//例子
//創建完成端口句柄
HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);


typedef struct{
    SOCKET socket;//客戶端socket
    SOCKADDR_STORAGE ClientAddr;//客戶端地址
}PER_HANDLE_DATA, *LPPER_HANDLE_DATA;

//與socket進行關聯
CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
//功能:獲取隊列完成狀態
BOOL   GetQueuedCompletionStatus(
    HANDLE   CompletionPort,          //完成端口句柄
    LPDWORD   lpNumberOfBytes,    //一次I/O操作所傳送的字節數
    PULONG_PTR   lpCompletionKey, //當文件I/O操作完成后,用於存放與之關聯的CK
    LPOVERLAPPED   *lpOverlapped, //IOCP特定的結構體
    DWORD   dwMilliseconds);           //調用者的等待時間
/*
返回值:
調用成功,則返回非零數值,相關數據存於lpNumberOfBytes、lpCompletionKey、lpoverlapped變量中。失敗則返回零值。
*/

//用於IOCP的特定函數
typedef struct _OVERLAPPEDPLUS{
    OVERLAPPED ol;      //一個固定的用於處理網絡消息事件返回值的結構體變量
    SOCKET s, sclient;  int OpCode;      //用來區分本次消息的操作類型(在完成端口的操作里面,是以消息通知系統,讀數據/寫數據,都是要發這樣的消息結構體過去的)
    WSABUF wbuf;     //讀寫緩沖區結構體變量 
    DWORD dwBytes, dwFlags; //一些在讀寫時用到的標志性變量 
}OVERLAPPEDPLUS;
//功能:投遞一個隊列完成狀態
BOOL PostQueuedCompletionStatus(   HANDLE CompletlonPort,
//指定想向其發送一個完成數據包的完成端口對象   DW0RD dwNumberOfBytesTrlansferred, //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數   DWORD dwCompletlonKey, //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數   LPOVERLAPPED lpoverlapped, ); //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數

  TCP IOCP實現具體步驟:

  1. 創建好 IOCP
  2. 創建 Socket ( socket 可以是由 Accept 得到)
  3.  將 Socket 關聯到 IOCP
  4. socket 向 IOCP 提交各種所需請求
  5. IOCP 操作完成之后將結果返回給 socket
  6. 重復步驟 3 和 4 ,直到 socket 關閉

    例子:

  1 #include <winsock2.h>
  2 #include <windows.h>
  3 #include <string>
  4 #include <iostream>
  5 using namespace std;
  6 
  7 #pragma comment(lib,"ws2_32.lib")
  8 #pragma comment(lib,"kernel32.lib")
  9 
 10 HANDLE g_hIOCP;
 11 
 12 enum IO_OPERATION{IO_READ,IO_WRITE};
 13 
 14 struct IO_DATA{
 15     OVERLAPPED                  Overlapped;
 16     WSABUF                      wsabuf;
 17     int                         nBytes;
 18     IO_OPERATION                opCode;
 19     SOCKET                      client;
 20 };
 21 
 22 char buffer[1024];
 23 
 24 DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {
 25     IO_DATA *lpIOContext = NULL; 
 26     DWORD nBytes = 0;
 27     DWORD dwFlags = 0; 
 28     int nRet = 0;
 29 
 30     DWORD dwIoSize = 0; 
 31     void * lpCompletionKey = NULL;
 32     LPOVERLAPPED lpOverlapped = NULL;
 33 
 34     while(1){
 35         GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE);
 36         
 37         lpIOContext = (IO_DATA *)lpOverlapped;
 38         if(dwIoSize == 0)
 39         {
 40             cout << "Client disconnect" << endl;
 41             closesocket(lpIOContext->client);
 42             delete lpIOContext;
 43             continue;
 44         }
 45 
 46         if(lpIOContext->opCode == IO_READ) // a read operation complete
 47         {
 48             ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
 49             lpIOContext->wsabuf.buf = buffer;
 50             lpIOContext->wsabuf.len = strlen(buffer)+1;
 51             lpIOContext->opCode = IO_WRITE;
 52             lpIOContext->nBytes = strlen(buffer)+1;
 53             dwFlags = 0;
 54             nBytes = strlen(buffer)+1;
 55             nRet = WSASend(
 56                 lpIOContext->client,
 57                 &lpIOContext->wsabuf, 1, &nBytes,
 58                 dwFlags,
 59                 &(lpIOContext->Overlapped), NULL);
 60             if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
 61                 cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl;
 62                 closesocket(lpIOContext->client);
 63                 delete lpIOContext;
 64                 continue;
 65             }
 66             memset(buffer, NULL, sizeof(buffer));
 67         }
 68         else if(lpIOContext->opCode == IO_WRITE) //a write operation complete
 69         {
 70             // Write operation completed, so post Read operation.
 71             lpIOContext->opCode = IO_READ; 
 72             nBytes = 1024;
 73             dwFlags = 0;
 74             lpIOContext->wsabuf.buf = buffer;
 75             lpIOContext->wsabuf.len = nBytes;
 76             lpIOContext->nBytes = nBytes;
 77             ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped));
 78 
 79             nRet = WSARecv(
 80                 lpIOContext->client,
 81                 &lpIOContext->wsabuf, 1, &nBytes,
 82                 &dwFlags,
 83                 &lpIOContext->Overlapped, NULL);
 84             if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
 85                 cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl;
 86                 closesocket(lpIOContext->client);
 87                 delete lpIOContext;
 88                 continue;
 89             } 
 90             cout<<lpIOContext->wsabuf.buf<<endl;
 91         }
 92     }
 93     return 0;
 94 }
 95 void main ()
 96 {
 97     WSADATA wsaData;
 98     WSAStartup(MAKEWORD(2,2), &wsaData);
 99 
100     SOCKET    m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED);
101     
102     sockaddr_in server;
103     server.sin_family = AF_INET;
104     server.sin_port = htons(6000);
105     server.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
106     
107     bind(m_socket ,(sockaddr*)&server,sizeof(server));
108 
109     listen(m_socket, 8);
110 
111     SYSTEM_INFO sysInfo;
112     GetSystemInfo(&sysInfo);
113     int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2;
114 
115     g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount);
116     
117     //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0);
118 
119     for( int i=0;i < g_ThreadCount; ++i){
120         HANDLE  hThread;
121         DWORD   dwThreadId;
122         hThread = CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId);
123         CloseHandle(hThread);
124     }
125 
126     while(1)
127     {
128         SOCKET client = accept( m_socket, NULL, NULL );
129         cout << "Client connected." << endl;
130         
131 
132         if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){
133             cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl;
134             closesocket(client);
135         }
136         else { //post a recv request
137             IO_DATA * data = new IO_DATA;
138             memset(buffer, NULL ,1024);
139             memset(&data->Overlapped, 0 , sizeof(data->Overlapped));
140             data->opCode = IO_READ;
141             data->nBytes = 0;
142             data->wsabuf.buf  = buffer;
143             data->wsabuf.len  = sizeof(buffer);
144             data->client = client;
145             DWORD nBytes= 1024 ,dwFlags=0;
146             int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes,
147                 &dwFlags,
148                 &data->Overlapped, NULL);
149             if(nRet == SOCKET_ERROR  && (ERROR_IO_PENDING != WSAGetLastError())){
150                 cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl;
151                 closesocket(client);
152                 delete data;
153             }
154             cout<<data->wsabuf.buf<<endl;
155         }
156     }
157     closesocket(m_socket);
158     WSACleanup();
159 }
View Code
 1 #include <iostream>
 2 #include <WinSock2.h>
 3 using namespace std;
 4 
 5 #pragma comment(lib,"ws2_32.lib")
 6 
 7 void main()
 8 {
 9     WSADATA wsaData;  
10     WSAStartup(MAKEWORD(2,2), &wsaData);
11 
12     sockaddr_in server;
13     server.sin_family = AF_INET;
14     server.sin_port   = htons(6000);
15     server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1");
16 
17     SOCKET client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
18 
19     int flag;
20     flag = connect(client, (sockaddr*)&server, sizeof(server));
21     if(flag < 0){
22         cout<<"error!"<<endl;
23         return;
24     }
25     while(1){
26         cout<<"sent hello!!!!"<<endl;
27         char buffer[1024];
28         strcpy(buffer,"hello");
29         send(client, buffer, 1024, 0);
30 
31         memset(buffer, NULL, sizeof(buffer));
32         
33         cout<<"recv: "<<endl;
34         int rev = recv(client, buffer, 1024, 0);
35         if(rev == 0)
36             cout<<"recv nothing!"<<endl;
37         cout<<buffer<<endl;
38         Sleep(10000);
39     }
40 
41     closesocket(client);
42     WSACleanup();
43 }
View Code

參考

http://www.cnblogs.com/lidabo/archive/2012/12/10/2812230.html

http://www.codeproject.com/KB/IP/iocp-multicast-udp.aspx

http://blog.csdn.net/zhongguoren666/article/details/7386592

http://www.baike.com/wiki/%E5%AE%8C%E6%88%90%E7%AB%AF%E5%8F%A3%E6%A8%A1%E5%9E%8B

http://blog.csdn.net/neicole/article/details/7549497

http://ycool.com/post/zgu6hbp

 

 

知識共享許可協議
IOCP模型cococo點點 創作,采用 知識共享 署名-相同方式共享 3.0 未本地化版本 許可協議進行許可。歡迎轉載,請注明出處:
轉載自: cococo點點 http://www.cnblogs.com/coder2012


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM