IOCP模型
IOCP全稱I/O Completion Port,中文譯為I/O完成端口。IOCP是一個異步I/O的Windows API,它可以高效地將I/O事件通知給應用程序,類似於Linux中的Epoll。
簡介
IOCP模型屬於一種通訊模型,適用於Windows平台下高負載服務器的一個技術。在處理大量用戶並發請求時,如果采用一個用戶一個線程的方式那將造成CPU在這成千上萬的線程間進行切換,后果是不可想象的。而IOCP完成端口模型則完全不會如此處理,它的理論是並行的線程數量必須有一個上限-也就是說同時發出500個客戶請求,不應該允許出現500個可運行的線程。目前來說,IOCP完成端口是Windows下性能最好的I/O模型,同時它也是最復雜的內核對象。它避免了大量用戶並發時原有模型采用的方式,極大的提高了程序的並行處理能力。
原理圖

從圖中可以看到,一共包括三部分:完成端口(存放重疊的I/O請求),客戶端請求的處理,等待者線程隊列(一定數量的工作者線程,一般采用CPU*2個)。
完成端口中所謂的[端口]並不是我們在TCP/IP中所提到的端口,可以說是完全沒有關系。它其實就是一個通知隊列,由操作系統把已經完成的重疊I/O請求的通知放入其中。當某項I/O操作一旦完成,某個可以對該操作結果進行處理的工作者線程就會收到一則通知。
通常情況下,我們會在創建一定數量的工作者線程來處理這些通知,也就是線程池的方法。線程數量取決於應用程序的特定需要。理想的情況是,線程數量等於處理器的數量,不過這也要求任何線程都不應該執行諸如同步讀寫、等待事件通知等阻塞型的操作,以免線程阻塞。每個線程都將分到一定的CPU時間,在此期間該線程可以運行,然后另一個線程將分到一個時間片並開始執行。如果某個線程執行了阻塞型的操作,操作系統將剝奪其未使用的剩余時間片並讓其它線程開始執行。也就是說,前一個線程沒有充分使用其時間片,當發生這樣的情況時,應用程序應該准備其它線程來充分利用這些時間片。
IOCP的優點
基於IOCP的開發是異步IO的,決定了IOCP所實現的服務器的高吞吐量。
完成端口的線程並發量可以在創建該完成端口時指定,從而限制了與該完成端口相關聯的可運行線程的數目。
通過引入IOCP,會大大減少Thread切換帶來的額外開銷,最小化的線程上下文切換,減少線程切換帶來的巨大開銷,讓CPU把大量的事件用於線程的運行。當與該完成端口相關聯的可運行線程的總數目達到了該並發量,系統就會阻塞任何與該完成端口相關聯的后續線程的執行,直到與該完成端口相關聯的可運行線程數目下降到小於該並發量為止。
Select是先查詢再發起IO請求,IOCP是先發起IO請求再接收通知。但是Select方式在處理大量非活動連接時是比較低效的,因為每次Select需要對所有的Socket狀態進行查詢,而對非活動的Socket查詢是沒有意義的浪費,另外由於Socket句柄不能設置用戶私有數據,當查詢返回Socket句柄時還需要一個額外的查詢來找到關聯的用戶對象,這兩點是Select低效的關鍵。
IOCP的具體實現步驟
IOCP中用到單個函數,分為用於創建關聯完成端口、獲取完成狀態和投遞完成狀態,函數原型:
//功能:創建完成端口和關聯完成端口 HANDLE WINAPI CreateIoCompletionPort( * __in HANDLE FileHandle, // 已經打開的文件句柄或者空句柄,一般是客戶端的句柄 * __in HANDLE ExistingCompletionPort, // 已經存在的IOCP句柄 * __in ULONG_PTR CompletionKey, // 完成鍵,包含了指定I/O完成包的指定文件 * __in DWORD NumberOfConcurrentThreads // 真正並發同時執行最大線程數,一般推介是CPU核心數*2 * ); //例子 //創建完成端口句柄 HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); typedef struct{ SOCKET socket;//客戶端socket SOCKADDR_STORAGE ClientAddr;//客戶端地址 }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; //與socket進行關聯 CreateIoCompletionPort((HANDLE)(PerHandleData -> socket), completionPort, (DWORD)PerHandleData, 0);
//功能:獲取隊列完成狀態 BOOL GetQueuedCompletionStatus( HANDLE CompletionPort, //完成端口句柄 LPDWORD lpNumberOfBytes, //一次I/O操作所傳送的字節數 PULONG_PTR lpCompletionKey, //當文件I/O操作完成后,用於存放與之關聯的CK LPOVERLAPPED *lpOverlapped, //IOCP特定的結構體 DWORD dwMilliseconds); //調用者的等待時間 /* 返回值: 調用成功,則返回非零數值,相關數據存於lpNumberOfBytes、lpCompletionKey、lpoverlapped變量中。失敗則返回零值。 */ //用於IOCP的特定函數 typedef struct _OVERLAPPEDPLUS{ OVERLAPPED ol; //一個固定的用於處理網絡消息事件返回值的結構體變量 SOCKET s, sclient; int OpCode; //用來區分本次消息的操作類型(在完成端口的操作里面,是以消息通知系統,讀數據/寫數據,都是要發這樣的消息結構體過去的) WSABUF wbuf; //讀寫緩沖區結構體變量 DWORD dwBytes, dwFlags; //一些在讀寫時用到的標志性變量 }OVERLAPPEDPLUS;
//功能:投遞一個隊列完成狀態
BOOL PostQueuedCompletionStatus( HANDLE CompletlonPort, //指定想向其發送一個完成數據包的完成端口對象 DW0RD dwNumberOfBytesTrlansferred, //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數 DWORD dwCompletlonKey, //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數 LPOVERLAPPED lpoverlapped, ); //指定—個值,直接傳遞給GetQueuedCompletionStatus函數中對應的參數
TCP IOCP實現具體步驟:
- 創建好 IOCP
- 創建 Socket ( socket 可以是由 Accept 得到)
- 將 Socket 關聯到 IOCP
- socket 向 IOCP 提交各種所需請求
- IOCP 操作完成之后將結果返回給 socket
- 重復步驟 3 和 4 ,直到 socket 關閉
例子:
1 #include <winsock2.h> 2 #include <windows.h> 3 #include <string> 4 #include <iostream> 5 using namespace std; 6 7 #pragma comment(lib,"ws2_32.lib") 8 #pragma comment(lib,"kernel32.lib") 9 10 HANDLE g_hIOCP; 11 12 enum IO_OPERATION{IO_READ,IO_WRITE}; 13 14 struct IO_DATA{ 15 OVERLAPPED Overlapped; 16 WSABUF wsabuf; 17 int nBytes; 18 IO_OPERATION opCode; 19 SOCKET client; 20 }; 21 22 char buffer[1024]; 23 24 DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) { 25 IO_DATA *lpIOContext = NULL; 26 DWORD nBytes = 0; 27 DWORD dwFlags = 0; 28 int nRet = 0; 29 30 DWORD dwIoSize = 0; 31 void * lpCompletionKey = NULL; 32 LPOVERLAPPED lpOverlapped = NULL; 33 34 while(1){ 35 GetQueuedCompletionStatus(g_hIOCP, &dwIoSize,(LPDWORD)&lpCompletionKey,(LPOVERLAPPED *)&lpOverlapped, INFINITE); 36 37 lpIOContext = (IO_DATA *)lpOverlapped; 38 if(dwIoSize == 0) 39 { 40 cout << "Client disconnect" << endl; 41 closesocket(lpIOContext->client); 42 delete lpIOContext; 43 continue; 44 } 45 46 if(lpIOContext->opCode == IO_READ) // a read operation complete 47 { 48 ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); 49 lpIOContext->wsabuf.buf = buffer; 50 lpIOContext->wsabuf.len = strlen(buffer)+1; 51 lpIOContext->opCode = IO_WRITE; 52 lpIOContext->nBytes = strlen(buffer)+1; 53 dwFlags = 0; 54 nBytes = strlen(buffer)+1; 55 nRet = WSASend( 56 lpIOContext->client, 57 &lpIOContext->wsabuf, 1, &nBytes, 58 dwFlags, 59 &(lpIOContext->Overlapped), NULL); 60 if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 61 cout << "WASSend Failed::Reason Code::"<< WSAGetLastError() << endl; 62 closesocket(lpIOContext->client); 63 delete lpIOContext; 64 continue; 65 } 66 memset(buffer, NULL, sizeof(buffer)); 67 } 68 else if(lpIOContext->opCode == IO_WRITE) //a write operation complete 69 { 70 // Write operation completed, so post Read operation. 71 lpIOContext->opCode = IO_READ; 72 nBytes = 1024; 73 dwFlags = 0; 74 lpIOContext->wsabuf.buf = buffer; 75 lpIOContext->wsabuf.len = nBytes; 76 lpIOContext->nBytes = nBytes; 77 ZeroMemory(&lpIOContext->Overlapped, sizeof(lpIOContext->Overlapped)); 78 79 nRet = WSARecv( 80 lpIOContext->client, 81 &lpIOContext->wsabuf, 1, &nBytes, 82 &dwFlags, 83 &lpIOContext->Overlapped, NULL); 84 if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) { 85 cout << "WASRecv Failed::Reason Code1::"<< WSAGetLastError() << endl; 86 closesocket(lpIOContext->client); 87 delete lpIOContext; 88 continue; 89 } 90 cout<<lpIOContext->wsabuf.buf<<endl; 91 } 92 } 93 return 0; 94 } 95 void main () 96 { 97 WSADATA wsaData; 98 WSAStartup(MAKEWORD(2,2), &wsaData); 99 100 SOCKET m_socket = WSASocket(AF_INET,SOCK_STREAM, IPPROTO_TCP, NULL,0,WSA_FLAG_OVERLAPPED); 101 102 sockaddr_in server; 103 server.sin_family = AF_INET; 104 server.sin_port = htons(6000); 105 server.sin_addr.S_un.S_addr = htonl(INADDR_ANY); 106 107 bind(m_socket ,(sockaddr*)&server,sizeof(server)); 108 109 listen(m_socket, 8); 110 111 SYSTEM_INFO sysInfo; 112 GetSystemInfo(&sysInfo); 113 int g_ThreadCount = sysInfo.dwNumberOfProcessors * 2; 114 115 g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,g_ThreadCount); 116 117 //CreateIoCompletionPort((HANDLE)m_socket,g_hIOCP,0,0); 118 119 for( int i=0;i < g_ThreadCount; ++i){ 120 HANDLE hThread; 121 DWORD dwThreadId; 122 hThread = CreateThread(NULL, 0, WorkerThread, 0, 0, &dwThreadId); 123 CloseHandle(hThread); 124 } 125 126 while(1) 127 { 128 SOCKET client = accept( m_socket, NULL, NULL ); 129 cout << "Client connected." << endl; 130 131 132 if (CreateIoCompletionPort((HANDLE)client, g_hIOCP, 0, 0) == NULL){ 133 cout << "Binding Client Socket to IO Completion Port Failed::Reason Code::"<< GetLastError() << endl; 134 closesocket(client); 135 } 136 else { //post a recv request 137 IO_DATA * data = new IO_DATA; 138 memset(buffer, NULL ,1024); 139 memset(&data->Overlapped, 0 , sizeof(data->Overlapped)); 140 data->opCode = IO_READ; 141 data->nBytes = 0; 142 data->wsabuf.buf = buffer; 143 data->wsabuf.len = sizeof(buffer); 144 data->client = client; 145 DWORD nBytes= 1024 ,dwFlags=0; 146 int nRet = WSARecv(client,&data->wsabuf, 1, &nBytes, 147 &dwFlags, 148 &data->Overlapped, NULL); 149 if(nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError())){ 150 cout << "WASRecv Failed::Reason Code::"<< WSAGetLastError() << endl; 151 closesocket(client); 152 delete data; 153 } 154 cout<<data->wsabuf.buf<<endl; 155 } 156 } 157 closesocket(m_socket); 158 WSACleanup(); 159 }
1 #include <iostream> 2 #include <WinSock2.h> 3 using namespace std; 4 5 #pragma comment(lib,"ws2_32.lib") 6 7 void main() 8 { 9 WSADATA wsaData; 10 WSAStartup(MAKEWORD(2,2), &wsaData); 11 12 sockaddr_in server; 13 server.sin_family = AF_INET; 14 server.sin_port = htons(6000); 15 server.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); 16 17 SOCKET client = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); 18 19 int flag; 20 flag = connect(client, (sockaddr*)&server, sizeof(server)); 21 if(flag < 0){ 22 cout<<"error!"<<endl; 23 return; 24 } 25 while(1){ 26 cout<<"sent hello!!!!"<<endl; 27 char buffer[1024]; 28 strcpy(buffer,"hello"); 29 send(client, buffer, 1024, 0); 30 31 memset(buffer, NULL, sizeof(buffer)); 32 33 cout<<"recv: "<<endl; 34 int rev = recv(client, buffer, 1024, 0); 35 if(rev == 0) 36 cout<<"recv nothing!"<<endl; 37 cout<<buffer<<endl; 38 Sleep(10000); 39 } 40 41 closesocket(client); 42 WSACleanup(); 43 }
參考
http://www.cnblogs.com/lidabo/archive/2012/12/10/2812230.html
http://www.codeproject.com/KB/IP/iocp-multicast-udp.aspx
http://blog.csdn.net/zhongguoren666/article/details/7386592
http://www.baike.com/wiki/%E5%AE%8C%E6%88%90%E7%AB%AF%E5%8F%A3%E6%A8%A1%E5%9E%8B
http://blog.csdn.net/neicole/article/details/7549497
![]()
IOCP模型 由 cococo點點 創作,采用 知識共享 署名-相同方式共享 3.0 未本地化版本 許可協議進行許可。歡迎轉載,請注明出處:
轉載自: cococo點點 http://www.cnblogs.com/coder2012

