感謝rulary的指正!博文中我對IOCP的理解是有誤的,正確的方式請見評論區rulary的回復!
由於項目實際設計的需要,最終IO事件處理沒有采用IOCP,而是采用了NT6.0引入的WSAPoll,其編程模型和linux下poll基本一致,此處就不贅述了!
==================================================
IOCP是windows下IO事件處理的最高效的一種方式了,結合OVERLAPPED IO可以實現真正的完全異步IO。windows在此種模式下提供了一站式服務,只要你提交一個IO請求,接下來windows替你處理其他所有的工作,你只需要等着接受windows的完成通知就行了。
響馬大叔在他的孢子社區有了一個帖子再談select, iocp, epoll,kqueue及各種I/O復用機制對此有比較全面的對比介紹了,故而本文不對IOCP這方面的內容再做贅述了,相反說說自己在自己開發過程中認為IOCP不好的地方。
IOCP不好的地方體現這個地方:一個File/Socket Handle是不能多次調用CreateIoCompletionPort()綁定到不同的IOCP上的,只有第一次是成功的,第二次開始是參數錯誤失敗!因此一旦綁定了一個IOCP就沒法遷移到其他的IOCP了,這個是我經過實際的代碼測試和分析ReactOS代碼實現得出的結論。測試代碼如下

1 int main(int argc, char *argv[]) 2 { 3 HANDLE iocp; 4 HANDLE iocp1; 5 SOCKET s; 6 HANDLE ret; 7 8 WSADATA wsa_data; 9 WSAStartup(MAKEWORD(2, 2), &wsa_data); 10 11 iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 12 iocp1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 13 s = create_client_socket(); 14 15 assert(NULL != iocp); 16 assert(NULL != iocp1); 17 18 ret = CreateIoCompletionPort((HANDLE)s, iocp, 0, 0); 19 printf("first bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); 20 21 ret = CreateIoCompletionPort((HANDLE)s, iocp1, 0, 0); 22 printf("second bind, ret: %lu, error: %u\n", (long)ret, GetLastError()); 23 24 CloseHandle(iocp); 25 CloseHandle(iocp1); 26 closesocket(s); 27 28 WSACleanup(); 29 30 return 0; 31 }
運行結果
Administrator@attention /e/tinylib/windows/net_iocp
$ iocp.exe
first bind, ret: 60, error: 0
second bind, ret: 0, error: 87
ReactOS-0.3.12-REL-src的代碼體現在NtSetInformationFile()中以下代碼片段

1 /* FIXME: Later, we can implement a lot of stuff here and avoid a driver call */ 2 /* Handle IO Completion Port quickly */ 3 if (FileInformationClass == FileCompletionInformation) 4 { 5 /* Check if the file object already has a completion port */ 6 if ((FileObject->Flags & FO_SYNCHRONOUS_IO) || 7 (FileObject->CompletionContext)) 8 { 9 /* Fail */ 10 Status = STATUS_INVALID_PARAMETER; 11 } 12 else 13 { 14 /* Reference the Port */ 15 CompletionInfo = Irp->AssociatedIrp.SystemBuffer; 16 Status = ObReferenceObjectByHandle(CompletionInfo->Port, 17 IO_COMPLETION_MODIFY_STATE, 18 IoCompletionType, 19 PreviousMode, 20 (PVOID*)&Queue, 21 NULL); 22 if (NT_SUCCESS(Status)) 23 { 24 /* Allocate the Context */ 25 Context = ExAllocatePoolWithTag(PagedPool, 26 sizeof(IO_COMPLETION_CONTEXT), 27 IOC_TAG); 28 if (Context) 29 { 30 /* Set the Data */ 31 Context->Key = CompletionInfo->Key; 32 Context->Port = Queue; 33 if (InterlockedCompareExchangePointer((PVOID*)&FileObject-> 34 CompletionContext, 35 Context, 36 NULL)) 37 { 38 /* 39 * Someone else set the completion port in the 40 * meanwhile, so dereference the port and fail. 41 */ 42 ExFreePool(Context); 43 ObDereferenceObject(Queue); 44 Status = STATUS_INVALID_PARAMETER; 45 } 46 } 47 else 48 { 49 /* Dereference the Port now */ 50 ObDereferenceObject(Queue); 51 Status = STATUS_INSUFFICIENT_RESOURCES; 52 } 53 } 54 } 55 56 /* Set the IRP Status */ 57 Irp->IoStatus.Status = Status; 58 Irp->IoStatus.Information = 0; 59 }
MSDN中也明確提倡開發者啟動多個線程使用GetQueuedCompletionStatus()掛在一個IOCP上來處理IO事件,我是如此理解了的,原文如下
- NumberOfConcurrentThreads
-
[in] Maximum number of threads that the operating system allows to concurrently process I/O completion packets for the I/O completion port. If this parameter is zero, the system allows as many concurrently running threads as there are processors in the system.
Although any number of threads can call the GetQueuedCompletionStatus function to wait for an I/O completion port, each thread is associated with only one completion port at a time. That port is the port that was last checked by the thread.
可這對應有另外一個問題:會導致同一個IO handle的完成事件被分散到不同的線程中處理,從而在處理同一個handle的IO事件時會引入額外的並發競爭,對此我也寫了代碼進行測試確認,如下

1 /* 2 編譯命令 3 gcc iocp.c -o iocp -lws2_32 -g 4 5 測試命令 6 nc -u 192.168.100.101 1993 7 快速反復發送數據 8 9 實際運行結果 10 Administrator@attention /e/code 11 $ gdb -q iocp.exe 12 Reading symbols from e:\code\iocp.exe...done. 13 (gdb) r 14 Starting program: e:\code\iocp.exe 15 [New Thread 1984.0x1788] 16 [New Thread 1984.0x914] 17 thread: 6024, 3 bytes received fro 168 notified by IOCP 18 thread: 6024, 3 bytes received fro 168 notified by IOCP 19 thread: 6024, 3 bytes received fro 168 notified by IOCP 20 thread: 6024, 4 bytes received fro 168 notified by IOCP 21 thread: 6024, 3 bytes received fro 168 notified by IOCP 22 thread: 2324, 4 bytes received fro 168 notified by IOCP 23 thread: 2324, 2 bytes received fro 168 notified by IOCP 24 thread: 2324, 4 bytes received fro 168 notified by IOCP 25 thread: 2324, 3 bytes received fro 168 notified by IOCP 26 thread: 6024, 5 bytes received fro 168 notified by IOCP 27 thread: 2324, 4 bytes received fro 168 notified by IOCP 28 thread: 2324, 4 bytes received fro 168 notified by IOCP 29 thread: 2324, 3 bytes received fro 168 notified by IOCP 30 thread: 6024, 4 bytes received fro 168 notified by IOCP 31 thread: 6024, 4 bytes received fro 168 notified by IOCP 32 thread: 6024, 2 bytes received fro 168 notified by IOCP 33 */ 34 35 #include <stdio.h> 36 #include <stdlib.h> 37 38 #define WIN32_LEAN_AND_MEAN 39 #include <windows.h> 40 #include <winsock2.h> 41 #include <process.h> 42 43 HANDLE iocp; 44 SOCKET s_udp; 45 46 void routine(void) 47 { 48 unsigned threadId; 49 50 ULONG_PTR key; 51 LPOVERLAPPED povlp; 52 BOOL result; 53 54 char buffer[65535]; 55 WSABUF wsabuf; 56 DWORD received; 57 DWORD flag; 58 struct sockaddr_in peer_addr; 59 int addr_len; 60 WSAOVERLAPPED ovlp; 61 int error; 62 63 do 64 { 65 wsabuf.len = sizeof(buffer); 66 wsabuf.buf = buffer; 67 received = 0; 68 flag = 0; 69 addr_len = sizeof(peer_addr); 70 memset(&peer_addr, 0, addr_len); 71 memset(&ovlp, 0, sizeof(ovlp)); 72 73 threadId = GetCurrentThreadId(); 74 75 if (WSARecvFrom(s_udp, &wsabuf, 1, &received, &flag, (struct sockaddr*)&peer_addr, &addr_len, &ovlp, NULL) == 0) 76 { 77 printf("thread: %u, %u bytes received for %lu imediately\n", threadId, received, s_udp); 78 continue; 79 } 80 81 while (1) 82 { 83 result = GetQueuedCompletionStatus(iocp, &received, &key, &povlp, 10); 84 if (FALSE == result) 85 { 86 error = WSAGetLastError(); 87 if (WAIT_TIMEOUT != error) 88 { 89 printf("GetQueuedCompletionStatus() failed, error: %d\n", error); 90 } 91 continue; 92 } 93 94 printf("thread: %u, %u bytes received fro %lu notified by IOCP\n", threadId, received, s_udp); 95 break; 96 } 97 } while (1); 98 99 return; 100 } 101 102 unsigned __stdcall thread(void *arg) 103 { 104 routine(); 105 106 return 0; 107 } 108 109 SOCKET create_udp_socket(unsigned short port, const char *ip) 110 { 111 SOCKET fd; 112 struct sockaddr_in addr; 113 unsigned long value = 1; 114 115 fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); 116 if (INVALID_SOCKET == fd) 117 { 118 printf("create_udp_socket: socket() failed, errno: %d", WSAGetLastError()); 119 return INVALID_SOCKET; 120 } 121 122 memset(&addr, 0, sizeof(addr)); 123 addr.sin_family = AF_INET; 124 addr.sin_addr.s_addr = (NULL != ip ? inet_addr(ip) : INADDR_ANY); 125 addr.sin_port = htons(port); 126 if (bind(fd, (struct sockaddr*)&addr, sizeof(addr)) != 0) 127 { 128 printf("create_server_socket: bind() failed, erron: %d", WSAGetLastError()); 129 closesocket(fd); 130 return INVALID_SOCKET; 131 } 132 133 return fd; 134 } 135 136 int main(int argc, char *argv[]) 137 { 138 unsigned threadId; 139 HANDLE t; 140 WSADATA wsadata; 141 142 WSAStartup(MAKEWORD(2,2), &wsadata); 143 144 iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 4); 145 s_udp = create_udp_socket(1993, "0.0.0.0"); 146 CreateIoCompletionPort((HANDLE)s_udp, iocp, 0, 0); 147 148 t = (HANDLE)_beginthreadex(NULL, 0, thread, NULL, 0, &threadId); 149 150 routine(); 151 152 WaitForSingleObject(t, INFINITE); 153 CloseHandle(t); 154 closesocket(s_udp); 155 CloseHandle(iocp); 156 157 WSACleanup(); 158 159 return 0; 160 }
如此的話,由於這些並發競爭的存在實際上差不多抵消了開多個線程進行並發處理的好處,還不如將所有的IO事件全部放在同一個線程中進行處理,還能省去很多鎖的開銷。不過現代的程序幾乎完全是在多核的CPU上運行的,如果因為IOCP,你讓所有相關的工作全部放在一個線程里進行處理,又不能充分利用多核的並行優勢。實際上我們在設計並發模型時,經常開多個worker來實現負載均衡,但IOCP以上的限制是與之相沖突的。
linux下的epoll就額外提供了del操作,可以使得一個fd可以隨時從當期的epoll中detach出去,又立馬add進另外一個epoll,如此的話就可以開多個worker線程開跑多個epoll,可以將不同fd均攤到不同的worker中實現負載均衡,同時又可以隨意的將fd從一個線程遷移到另外一個線程進行處理。這種均衡操作在實際的業務中是很常見的,會需要你根據業務邏輯,將不同的fd交給其他的線程來處理,若使用IOCP的話就不太方便了。
這些就算是我對IOCP吐槽的一個地方了。
~~end~~
===== 分割線 =====
看來我的意思還說得不很清楚,補充一下:
本意也是多個線程跑多個處理循環,在每個循環里都擁有一個IOCP,處理不同的socket,但業務邏輯需要將一個socket從一個線程的處理循環中遷移到另外的一個線程的處理循環,但上面所述的IOCP的限制,沒法綁定到新線程的IOCP中,從而沒法進行遷移!
但是開多個線程掛在同一個IOCP上,又有上面所說的並發競爭的問題!
===== 分割線 =====
得再補充一些情況:
實際業務情況是這樣的:我們這邊有兩個不同的服務,但奈何由於一些我們不能自主的原因,兩個服務的請求只能從一個端口進來。來了一個連接之后,得先接收一小段數據才能知道到底請求哪個,但這兩個服務是在不同的線程循環里實現的。所以額外有一個入口server線程來負責接受請求,並收取這小段數據。若使用IOCP,那么接收到的連接就得先綁定到該入口server的IOCP里頭了,但一旦綁定就沒法遷移出去了,但實際后續兩個服務又需要在各自的循環里進一步在接受到的連接上進行數據收發處理。
本身開始設計實現的時候自然是想到各自擁有一個IOCP各自處理的不同的連接,不做遷移,但實際卻由於這些原因產生了遷移需求。此乃謂之蛋疼也!