WinSock IOCP 模型總結(附一個帶緩存池的IOCP類)


前言

  本文配套代碼:https://github.com/TTGuoying/IOCPServer

  由於篇幅原因,本文假設你已經熟悉了利用Socket進行TCP/IP編程的基本原理,並且也熟練的掌握了多線程編程技術,太基本的概念我這里就略過不提了,網上的資料應該遍地都是。

  IOCP全稱IOCP全稱I/O Completion Port,中文譯為I/O完成端口。IOCP是一個異步I/O的Windows I/O模型,它可以自動處理I/O操作,並在I/O操作完成后將完成通知發送給用戶。本文主要介紹基於IOCP的網絡I/O操作(即socket的Accept、Send、Recv和Close等)。Windows提供了6種網絡通信模型,分別是:

  1. 阻塞模型:accept、recv和send操作會阻塞線程,直到操作完成,極其低效。
  2. 選擇(select)模型:輪詢方式探測socket上是否有收發的操作,再調用accept、recv和send操作,核心是select()函數,比阻塞模型高效一點,缺點是一次只能探測64個socket,需要手動調用recv和send進行收發數據。
  3. 異步選擇(WSAAsyncSelect)模型:利用Windows窗口消息機制響應socket操作,即當socket上有Accept、Send、Recv和Close操作發生時發送一條自定義消息給指定窗口,在窗口中響應socket操作,需要手動調用recv和send進行收發數據。與select模型相比,不需要輪詢方式探測socket,socket上有操作發生即發送通知給窗口窗口,缺點是需要一個窗口對象處理socket的消息,需要手動調用recv和send進行收發數據。
  4. 事件選擇(WSAEventSelect)模型:原理基本同WSAAsyncSelect模型,但是不需要窗口,利用事件(Event)機制來獲取socket上發生的I/O操作。缺點是一次只能等待64個事件,需要手動調用recv和send進行收發數據。
  5. 重疊 I/O(Overlapped I/O)模型:利用重疊數據結構(WSAOVERLAPPED),一次投遞一個或多個Winsock I/O請求,等這些請求完成后,應用程序會收到通知,用戶可以直接使用 I/O操作返回的數據。簡單的說:投遞一個WSASend請求和接受數據的緩沖區,系統在接收完成后在通知用戶,用戶可以直接使用收到的數據,WSASend操作同理。有兩種方式來管理重疊IO請求的完成情況(就是說接到重疊操作完成的通知):

    1). 事件對象通知(event object notification)

    2). 完成例程(completion routines) ,注意,這里並不是完成端口

    優點是不用管收發過程,直接提供(發送時)/使用(接收時)數據。缺點是實現略復雜。
  6. IOCP(I/O Completion Port)模型:本文要介紹的模型,見下文。

  以上I/O模型由1-6理解難度依次提高,性能也相應地依次提高,我個人覺得重疊 I/O(Overlapped I/O)模型和IOCP(I/O Completion Port)模型並不是實現難度大,而是理解其運行機制的難度,5和6的使用比前面幾種所需代碼更少,更簡單。下面開始正式介紹IOCP(I/O Completion Port)模型。

相關概念

  1、異步通信

  我們知道外部設備I/O(比如磁盤讀寫,網絡通信等)速度和CPU速度比起來是很慢的,如果我們進行外部I/O操作時在線程中等待I/O操作完成的話,此線程就會被阻塞住,相當於強迫CPU適應I/O設備的速度,這樣會造成極大的CPU資源浪費。我們沒必要在線程中等待I/O操作完成再執行后續的代碼,而是將I/O操作請求交給設備驅動去處理,我們線程可以繼續做其他事情,然后等待I/O操作完成的通知,大體的流程如下圖所示:

圖一:異步網絡操作流程   

  我們可以從圖中看到一個很明顯的並行操作的過程,這就是異步調用,而“同步”的通信方式是再進行網絡操作的時候主線程就掛起等待直到網絡操作完成之后才可以執行后續的代碼。同步方式流程如下圖:

  圖二:同步網絡操作流程 

   “異步”方式無疑比“阻塞+多線程”的方式效率要高得多。在Windows中實現異步的機制有好幾種,主要區別是圖一中的最后一步“通知主線程”的方式。實現操作系統調用驅動程序去收發數據的操作都是一樣的,關鍵是“如何通知主線程取數據”。有興趣的朋友可以搜索關鍵字“設備內核對象”、“事件內核對象”、APC(synchronous Procedure Call,異步過程調用)和IOCP(完成端口)。

  2、重疊結構(OVERLAPPED)

  在Windows中要實現異步通信,必須要用到重疊結構(OVERLAPPED),Windows中所有的異步通信都是基於它的。至於為什么叫Overlapped?Jeffrey Richter的解釋是因為“執行I/O請求的時間與線程執行其他任務的時間是重疊(overlapped)的”,從這個名字我們也可能看得出來重疊結構發明的初衷了,對於重疊結構的內部細節我這里就不過多的解釋了,就把它當成和其他內核對象一樣,不需要深究其實現機制,只要會使用就可以了,想要了解更多重疊結構內部的朋友,請去翻閱Jeffrey Richter的《Windows via C/C++》 5th 的292頁。

  3、完成端口

  “完成端口”這個名詞中的“端口”和我們網絡通信中的“端口”(0-65535)是不同的,個人感覺應該叫“完成隊列”更直觀一點。之所以叫“完成”端口,是因為系統在IO操作“完成”后再通知我們,也就是說當系統通知我們時,IO操作已經完成,比如說進行網絡操作,系統通知我們時,並非時有數據從網絡到來,而是數據已經接受完畢了,或者是socket接入已經完成等,我們只需處理后面的事情即可。

  所謂的完成端口,其實就是一個“內核對象”,我們不需要深究其實現原理,只需使用相關的API把完成端口框架搭建起來,投遞IO請求,然后就等待IO完成的通知。

使用完成端口的基本流程 

  總的來說,使用完成端口只要遵循如下幾個步驟:

  1.  調用CreateIoCompletionPort() 函數創建一個完成端口。
  2.  建立和處理器的核數相等的工作線程(WorkerThread),這些線程不斷地通過GetQueuedCompletionStatus() 函數掃描完成端口中是否有IO操作完成,如果有的話,將已經完成了的IO操作取出處理,處理完成后,再投遞一個IO請求即可(下文有WorkerThread的流程圖)。
  3.  初始化監聽socket,調用bind(),listen()進行綁定監聽。
  4.  調用CreateIoCompletionPort() 綁定listen socket 到 完成端口,並投遞一個或多個AcceptEx請求。此處的AcceptEx是WinSock2 的擴展函數,作用是投遞一個accept請求,當有socket接入是可以再2中的線程中處理。

  以上即為完成端口的初始化和監聽socket的初始化。下面介紹WorkerThread的工作流程:

  1.  不斷地通過GetQueuedCompletionStatus() 函數掃描完成端口中是否有IO操作完成,如果有的話,將已經完成了的IO操作取出處理。
  2.  判斷IO操作的類型:

1、如果為accept操作,調用CreateIoCompletionPort() 綁定新接入的socket 到 完成端口,向新接入的socket 投遞一個WSARecv請求。

2、如果為WSARecv操作,處理接收到的數據,向這個socket 再投遞一個WSARecv請求。

    流程圖如下:

圖三:IOCP流程圖

 完成端口的實現(配合代碼閱讀更佳)

  1、創建一個完成端口

1 HANDLE completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

  CreateIoCompletionPort的參數如下:

//功能:創建完成端口和關聯完成端口
 HANDLE WINAPI CreateIoCompletionPort(
     *    __in   HANDLE FileHandle,              // 已經打開的文件句柄或者空句柄,一般是客戶端的句柄
     *    __in   HANDLE ExistingCompletionPort,  // 已經存在的IOCP句柄
     *    __in   ULONG_PTR CompletionKey,        // 完成鍵,包含了指定I/O完成包的指定文件
     *    __in   DWORD NumberOfConcurrentThreads // 真正並發同時執行最大線程數,一般推介是CPU核心數*2
     * );

  CreateIoCompletionPort函數有兩個功能:

  1.  創建一個完成端口
  2.  將一個句柄關聯到完成端口

  我們創建時給的參數是(INVALID_HANDLE_VALUE, NULL, 0, 0)就是創建完成端口,下面會介紹關聯完成端口。

  2、建立Worker線程

SYSTEM_INFO si;
GetSystemInfo(&si);
workerThreadNum = si.dwNumberOfProcessors * 2;
HANDLE *workerThreads = new HANDLE[workerThreadNum];

for (int i = 0; i < workerThreadNum; i++)
{
    workerThreads[i] = CreateThread(0, 0, WorkerThreadProc, (void *)this, 0, 0);
}

  我們最好是建立CPU核心數量*2那么多的線程,這樣更可以充分利用CPU資源,因為完成端口的調度是非常智能的,比如我們的Worker線程有的時候可能會有Sleep()或者WaitForSingleObject()之類的情況,這樣同一個CPU核心上的另一個線程就可以代替這個Sleep的線程執行了;因為完成端口的目標是要使得CPU滿負荷的工作。

  WorkerThreadProc是Worker線程的線程函數,線程函數的具體內容我們后面再講。

  3、創建監聽socket

 1 BOOL IOCPBase::InitializeListenSocket()
 2 {
 3     // 生成用於監聽的socket的Context
 4     listenSockContext = new SocketContext;
 5     listenSockContext->connSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
 6     if (INVALID_SOCKET == listenSockContext->connSocket)
 7         return false;
 8 
 9     // 將socket綁定到完成端口中
10     if (NULL == CreateIoCompletionPort((HANDLE)listenSockContext->connSocket, completionPort, (DWORD)listenSockContext, 0))
11     {
12         RELEASE_SOCKET(listenSockContext->connSocket);
13         return false;
14     }
15 
16     //服務器地址信息,用於綁定socket
17     sockaddr_in serverAddr;
18 
19     // 填充地址信息
20     ZeroMemory((char *)&serverAddr, sizeof(serverAddr));
21     serverAddr.sin_family = AF_INET;
22     serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
23     serverAddr.sin_port = htons(port);
24 
25     // 綁定地址和端口
26     if (SOCKET_ERROR == bind(listenSockContext->connSocket, (sockaddr *)&serverAddr, sizeof(serverAddr)))
27     {
28         return false;
29     }
30 
31     // 開始監聽
32     if (SOCKET_ERROR == listen(listenSockContext->connSocket, SOMAXCONN))
33     {
34         return false;
35     }
36 
37     GUID guidAcceptEx = WSAID_ACCEPTEX;
38     GUID guidGetAcceptSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
39     // 提取擴展函數指針
40     DWORD dwBytes = 0;
41     if (SOCKET_ERROR == WSAIoctl(
42         listenSockContext->connSocket,
43         SIO_GET_EXTENSION_FUNCTION_POINTER,
44         &guidAcceptEx,
45         sizeof(guidAcceptEx),
46         &fnAcceptEx,
47         sizeof(fnAcceptEx),
48         &dwBytes,
49         NULL,
50         NULL))
51     {
52         DeInitialize();
53         return false;
54     }
55 
56     if (SOCKET_ERROR == WSAIoctl(
57         listenSockContext->connSocket,
58         SIO_GET_EXTENSION_FUNCTION_POINTER,
59         &guidGetAcceptSockAddrs,
60         sizeof(guidGetAcceptSockAddrs),
61         &fnGetAcceptExSockAddrs,
62         sizeof(fnGetAcceptExSockAddrs),
63         &dwBytes,
64         NULL,
65         NULL))
66     {
67         DeInitialize();
68         return false;
69     }
70 
71     for (size_t i = 0; i < MAX_POST_ACCEPT; i++)
72     {
73         IOContext *ioContext = listenSockContext->GetNewIOContext();
74         if (false == PostAccept(listenSockContext, ioContext))
75         {
76             listenSockContext->RemoveContext(ioContext);
77             return false;
78         }
79     }
80     return true;
81 }
InitializeListenSocket

 

  用 CreateIoCompletionPort()函數把這個監聽Socket和完成端口綁定,bind(),listen(),然后提取擴展函數AcceptEx和GetAcceptSockAddrs的指針,因為AcceptEx 實際上是存在於Winsock2結構體系之外的(因為是微軟另外提供的),所以如果我們直接調用AcceptEx的話,首先我們的代碼就只能在微軟的平台上用了,沒有辦法在其他平台上調用到該平台提供的AcceptEx的版本(如果有的話), 而且我們每次調用AcceptEx時,Service Provider都得要通過WSAIoctl()獲取一次該函數指針,效率太低了,所以我們自己獲取函數指針。然后投遞AcceptEx請求。

  投遞AcceptEx請求的代碼:

BOOL IOCPBase::PostAccept(SocketContext * sockContext, IOContext * ioContext)
{
    DWORD dwBytes = 0;
    ioContext->ioType = ACCEPT_POSTED;
    ioContext->ioSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (INVALID_SOCKET == ioContext->ioSocket)
    {
        return false;
    }
    
    // 將接收緩沖置為0,令AcceptEx直接返回,防止拒絕服務攻擊
    if (false == fnAcceptEx(listenSockContext->connSocket, ioContext->ioSocket, ioContext->wsaBuf.buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytes, &ioContext->overLapped))
    {
        if (WSA_IO_PENDING != WSAGetLastError())
        {
            return false;
        }
    }

    InterlockedIncrement(&acceptPostCnt);
    return true;
}

 

  AcceptEx 函數說明:

 

  • 參數1--sListenSocket, 這個就是那個唯一的用來監聽的Socket了,沒什么說的;
  • 參數2--sAcceptSocket, 用於接受連接的socket,這個就是那個需要我們事先建好的,等有客戶端連接進來直接把這個Socket拿給它用的那個,是AcceptEx高性能的關鍵所在。
  • 參數3--lpOutputBuffer,接收緩沖區,這也是AcceptEx比較有特色的地方,既然AcceptEx不是普通的accpet函數,那么這個緩沖區也不是普通的緩沖區,這個緩沖區包含了三個信息:一是客戶端發來的第一組數據,二是server的地址,三是client地址。
  • 參數4--dwReceiveDataLength,前面那個參數lpOutputBuffer中用於存放數據的空間大小。如果此參數=0,則Accept時將不會待數據到來,而直接返回,如果此參數不為0,那么一定得等接收到數據了才會返回,這里設為0直接返回,防止拒絕服務攻擊
  • 參數5--dwLocalAddressLength,存放本地址地址信息的空間大小;
  • 參數6--dwRemoteAddressLength,存放本遠端地址信息的空間大小;
  • 參數7--lpdwBytesReceived,out參數,對我們來說沒用,不用管;
  • 參數8--lpOverlapped,本次重疊I/O所要用到的重疊結構。

    因為每投遞一次網絡IO請求都要求提供一個WSABuf和WSAOVERLAPPED的參數,所以我們自定義一個IOContext類,每次投遞附帶這個類的變量,但要注意這個變量的生命周期,防止內存泄漏。

 

class IOContext
{
public:
    WSAOVERLAPPED        overLapped;        // 每個socket的每一個IO操作都需要一個重疊結構
    SOCKET                ioSocket;        // 此IO操作對應的socket
    WSABUF                wsaBuf;            // 數據緩沖
    IO_OPERATION_TYPE    ioType;            // IO操作類型
    UINT                connectID;        // 連接ID

    IOContext()
    {
        ZeroMemory(&overLapped, sizeof(overLapped));
        ioSocket = INVALID_SOCKET;
        wsaBuf.buf = (char *)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFF_SIZE);
        wsaBuf.len = BUFF_SIZE;
        ioType = NULL_POSTED;
        connectID = 0;
    }

    ~IOContext()
    {
        RELEASE_SOCKET(ioSocket);

        if (wsaBuf.buf != NULL)
            ::HeapFree(::GetProcessHeap(), 0, wsaBuf.buf);
    }

    void Reset()
    {
        if (wsaBuf.buf != NULL)
            ZeroMemory(wsaBuf.buf, BUFF_SIZE);
        else
            wsaBuf.buf = (char *)::HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, BUFF_SIZE);
        ZeroMemory(&overLapped, sizeof(overLapped));
        ioType = NULL_POSTED;
        connectID = 0;
    }
};

  對於每一個socket也定義了一個SocketContext的類和一個IOContextPool的緩沖池類的具體請查看代碼。

 1 class SocketContext
 2 {
 3 public:
 4     SOCKET connSocket;                                // 連接的socket
 5     SOCKADDR_IN clientAddr;                            // 連接的遠程地址
 6 
 7 private:
 8     vector<IOContext*> arrIoContext;                // 同一個socket上的多個IO請求
 9     static IOContextPool ioContextPool;                // 空閑的IOContext池
10     CRITICAL_SECTION csLock;
11 
12 public:
13     SocketContext()
14     {
15         InitializeCriticalSection(&csLock);
16         arrIoContext.clear();
17         connSocket = INVALID_SOCKET;
18         ZeroMemory(&clientAddr, sizeof(clientAddr));
19     }
20 
21     ~SocketContext()
22     {
23         RELEASE_SOCKET(connSocket);
24 
25         // 回收所有的IOContext
26         for (vector<IOContext*>::iterator it = arrIoContext.begin(); it != arrIoContext.end(); it++)
27         {
28             ioContextPool.ReleaseIOContext(*it);
29         }
30 
31         EnterCriticalSection(&csLock);
32         arrIoContext.clear();
33         LeaveCriticalSection(&csLock);
34 
35         DeleteCriticalSection(&csLock);
36     }
37 
38     // 獲取一個新的IoContext
39     IOContext *GetNewIOContext()
40     {
41         IOContext *context = ioContextPool.AllocateIoContext();
42         if (context != NULL)
43         {
44             EnterCriticalSection(&csLock);
45             arrIoContext.push_back(context);
46             LeaveCriticalSection(&csLock);
47         }
48         return context;
49     }
50 
51     // 從數組中移除一個指定的IoContext
52     void RemoveContext(IOContext* pContext)
53     {
54         for (vector<IOContext*>::iterator it = arrIoContext.begin(); it != arrIoContext.end(); it++)
55         {
56             if (pContext == *it)
57             {
58                 ioContextPool.ReleaseIOContext(*it);
59 
60                 EnterCriticalSection(&csLock);
61                 arrIoContext.erase(it);
62                 LeaveCriticalSection(&csLock);
63 
64                 break;
65             }
66         }
67     }
68 
69     // 
70 };
class SocketContext

 

 1 // 空閑的IOContext管理類(IOContext池)
 2 class IOContextPool
 3 {
 4 private:
 5     list<IOContext *> contextList;
 6     CRITICAL_SECTION csLock;
 7 
 8 public:
 9     IOContextPool()
10     {
11         InitializeCriticalSection(&csLock);
12         contextList.clear();
13 
14         EnterCriticalSection(&csLock);
15         for (size_t i = 0; i < INIT_IOCONTEXT_NUM; i++)
16         {
17             IOContext *context = new IOContext;
18             contextList.push_back(context);
19         }
20         LeaveCriticalSection(&csLock);
21         
22     }
23 
24     ~IOContextPool()
25     {
26         EnterCriticalSection(&csLock);
27         for (list<IOContext *>::iterator it = contextList.begin(); it != contextList.end(); it++)
28         {
29             delete (*it);
30         }
31         contextList.clear();
32         LeaveCriticalSection(&csLock);
33 
34         DeleteCriticalSection(&csLock);
35     }
36 
37     // 分配一個IOContxt
38     IOContext *AllocateIoContext()
39     {
40         IOContext *context = NULL;
41 
42         EnterCriticalSection(&csLock);
43         if (contextList.size() > 0) //list不為空,從list中取一個
44         {
45             context = contextList.back();
46             contextList.pop_back();
47         }
48         else    //list為空,新建一個
49         {
50             context = new IOContext;
51         }
52         LeaveCriticalSection(&csLock);
53 
54         return context;
55     }
56 
57     // 回收一個IOContxt
58     void ReleaseIOContext(IOContext *pContext)
59     {
60         pContext->Reset();
61         EnterCriticalSection(&csLock);
62         contextList.push_front(pContext);
63         LeaveCriticalSection(&csLock);
64     }
65 };
class IOContextPool

 

 

  4、Worker線程

   這個工作線程所要做的工作就是幾個Worker線程哥幾個一起排好隊隊來監視完成端口的隊列中是否有完成的網絡操作就好了,代碼大體如下:

DWORD IOCPBase::WorkerThreadProc(LPVOID lpParam)
{
    IOCPBase *iocp = (IOCPBase*)lpParam;
    OVERLAPPED *ol = NULL;
    SocketContext *sockContext;
    DWORD dwBytes = 0;
    IOContext *ioContext = NULL;

    while (WAIT_OBJECT_0 != WaitForSingleObject(iocp->stopEvent, 0))
    {
        BOOL bRet = GetQueuedCompletionStatus(iocp->completionPort, &dwBytes, (PULONG_PTR)&sockContext, &ol, INFINITE);

        // 讀取傳入的參數
        ioContext = CONTAINING_RECORD(ol, IOContext, overLapped);

        // 收到退出標志
        if (EXIT_CODE == (DWORD)sockContext)
        {
            break;
        }

        if (!bRet)
        {
            DWORD dwErr = GetLastError();

            // 如果是超時了,就再繼續等吧  
            if (WAIT_TIMEOUT == dwErr)
            {
                // 確認客戶端是否還活着...
                if (!iocp->IsSocketAlive(sockContext->connSocket))
                {
                    iocp->OnConnectionClosed(sockContext);

                    // 回收socket
                    iocp->DoClose(sockContext);
                    continue;
                }
                else
                {
                    continue;
                }
            }
            // 可能是客戶端異常退出了(64)
            else if (ERROR_NETNAME_DELETED == dwErr)
            {
                iocp->OnConnectionError(sockContext, dwErr);

                // 回收socket
                iocp->DoClose(sockContext);
                continue;
            }
            else
            {
                iocp->OnConnectionError(sockContext, dwErr);

                // 回收socket
                iocp->DoClose(sockContext);
                continue;
            }
        }
        else
        {
            // 判斷是否有客戶端斷開
            if ((0 == dwBytes) && (RECV_POSTED == ioContext->ioType || SEND_POSTED == ioContext->ioType))
            {
                iocp->OnConnectionClosed(sockContext);

                // 回收socket
                iocp->DoClose(sockContext);
                continue;
            }
            else
            {
                switch (ioContext->ioType)
                {
                case ACCEPT_POSTED:
                    iocp->DoAccpet(sockContext, ioContext);
                    break;
                case RECV_POSTED:
                    iocp->DoRecv(sockContext, ioContext);
                    break;
                case SEND_POSTED:
                    iocp->DoSend(sockContext, ioContext);
                    break;
                default:
                    break;
                }
            }
        }
    }

    // 釋放線程參數
    RELEASE(lpParam);
    return 0;
}
WorkerThreadProc

  其中的GetQueuedCompletionStatus()就是Worker線程里第一件也是最重要的一件事了,會讓Worker線程進入不占用CPU的睡眠狀態,直到完成端口上出現了需要處理的網絡操作或者超出了等待的時間限制為止。

        一旦完成端口上出現了已完成的I/O請求,那么等待的線程會被立刻喚醒,然后繼續執行后續的代碼。

       至於這個神奇的函數,原型是這樣的:

BOOL WINAPI GetQueuedCompletionStatus(  
    __in   HANDLE          CompletionPort,    // 這個就是我們建立的那個唯一的完成端口  
    __out  LPDWORD         lpNumberOfBytes,   //這個是操作完成后返回的字節數  
    __out  PULONG_PTR      lpCompletionKey,   // 這個是我們建立完成端口的時候綁定的那個自定義結構體參數  
    __out  LPOVERLAPPED    *lpOverlapped,     // 這個是我們在連入Socket的時候一起建立的那個重疊結構  
    __in   DWORD           dwMilliseconds     // 等待完成端口的超時時間,如果線程不需要做其他的事情,那就INFINITE就行了  
    );  

  如果這個函數突然返回了,那就說明有需要處理的網絡操作了 --- 當然,在沒有出現錯誤的情況下。 然后switch()一下,根據需要處理的操作類型,那我們來進行相應的處理。

  那我們如何直到需要處理的操作類型呢?這個就要用到我們定義的IOContext類,里面有一個WSAOVERLAPPED的變量和操作類型(參見第3步)。那有如何吧IOContext變量傳進來呢?同樣參見第三步我們投遞AcceptEx請求時傳入了一個&ioContext->overLapped參數。我們可以使用PER_IO_CONTEXT這個宏來通過ioContext->overLapped取得ioContext的地址,如此我們便取得操作類型和ioContext中的WSAbuf。數據就存放在WSABuf中。

  另外,我們注意到關聯socket到完成端口時,我們給CreateIoCompletionPort()函數的第三個參數ULONG_PTR CompletionKey參數傳遞了listenSockContext變量,我們可以在GetQueuedCompletionStatus的第三個參數取得這個傳進來的變量。如此我們就通過完成端口穿進去了兩個變量,理解這兩個變量的傳遞時理解完成端口模式的關鍵,我之前就時卡着這里。

  WorkerThreadProc線程中還有一些錯誤處理函數,自行查看。

  5、收到accept通知時調用DoAccept()

    在用戶收到AcceptEx的完成通知時,需要后續代碼並不多,我們把代碼放到DoAccept()中:需要做三件事情:

  1. 為新接入的socket分配資源。
  2. 向新接入的socket投遞一個WSARecv請求
  3. 向監聽socket投遞繼續Accept請求
 1 BOOL IOCPBase::DoAccpet(SocketContext * sockContext, IOContext * ioContext)
 2 {
 3     
 4     InterlockedIncrement(&connectCnt);
 5     InterlockedDecrement(&acceptPostCnt);
 6     SOCKADDR_IN *clientAddr = NULL;
 7     SOCKADDR_IN *localAddr = NULL;
 8     int clientAddrLen, localAddrLen;
 9     clientAddrLen = localAddrLen = sizeof(SOCKADDR_IN);
10 
11     // 1. 獲取地址信息 (GetAcceptExSockAddrs函數不僅可以獲取地址信息,還可以順便取出第一組數據)
12     fnGetAcceptExSockAddrs(ioContext->wsaBuf.buf, 0, localAddrLen, clientAddrLen, (LPSOCKADDR *)&localAddr, &localAddrLen, (LPSOCKADDR *)&clientAddr, &clientAddrLen);
13 
14     // 2. 為新連接建立一個SocketContext 
15     SocketContext *newSockContext = new SocketContext;
16     newSockContext->connSocket = ioContext->ioSocket;
17     memcpy_s(&(newSockContext->clientAddr), sizeof(SOCKADDR_IN), clientAddr, sizeof(SOCKADDR_IN));
18 
19     // 3. 將listenSocketContext的IOContext 重置后繼續投遞AcceptEx
20     ioContext->Reset();
21     if (false == PostAccept(listenSockContext, ioContext))
22     {
23         listenSockContext->RemoveContext(ioContext);
24     }
25 
26     // 4. 將新socket和完成端口綁定
27     if (NULL == CreateIoCompletionPort((HANDLE)newSockContext->connSocket, completionPort, (DWORD)newSockContext, 0))
28     {
29         DWORD dwErr = WSAGetLastError();
30         if (dwErr != ERROR_INVALID_PARAMETER)
31         {
32             DoClose(newSockContext);
33             return false;
34         }
35     }
36 
37     // 並設置tcp_keepalive
38     tcp_keepalive alive_in;
39     tcp_keepalive alive_out;
40     alive_in.onoff = TRUE;
41     alive_in.keepalivetime = 1000 * 60;  // 60s  多長時間( ms )沒有數據就開始 send 心跳包
42     alive_in.keepaliveinterval = 1000 * 10; //10s  每隔多長時間( ms ) send 一個心跳包
43     unsigned long ulBytesReturn = 0;
44     if (SOCKET_ERROR == WSAIoctl(newSockContext->connSocket, SIO_KEEPALIVE_VALS, &alive_in, sizeof(alive_in), &alive_out, sizeof(alive_out), &ulBytesReturn, NULL, NULL))
45     {
46         TRACE(L"WSAIoctl failed: %d/n", WSAGetLastError());
47     }
48 
49 
50     OnConnectionEstablished(newSockContext);
51 
52     // 5. 建立recv操作所需的ioContext,在新連接的socket上投遞recv請求
53     IOContext *newIoContext = newSockContext->GetNewIOContext();
54     newIoContext->ioType = RECV_POSTED;
55     newIoContext->ioSocket = newSockContext->connSocket;
56     // 投遞recv請求
57     if (false == PostRecv(newSockContext, newIoContext))
58     {
59         DoClose(sockContext);
60         return false;
61     }
62 
63     return true;
64 }
DoAccpet
 1 BOOL IOCPBase::PostAccept(SocketContext * sockContext, IOContext * ioContext)
 2 {
 3     DWORD dwBytes = 0;
 4     ioContext->ioType = ACCEPT_POSTED;
 5     ioContext->ioSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
 6     if (INVALID_SOCKET == ioContext->ioSocket)
 7     {
 8         return false;
 9     }
10     
11     // 將接收緩沖置為0,令AcceptEx直接返回,防止拒絕服務攻擊
12     if (false == fnAcceptEx(listenSockContext->connSocket, ioContext->ioSocket, ioContext->wsaBuf.buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytes, &ioContext->overLapped))
13     {
14         if (WSA_IO_PENDING != WSAGetLastError())
15         {
16             return false;
17         }
18     }
19 
20     InterlockedIncrement(&acceptPostCnt);
21     return true;
22 }
PostAccept
 1 BOOL IOCPBase::PostRecv(SocketContext * sockContext, IOContext *ioContext)
 2 {
 3     DWORD dwFlags = 0, dwBytes = 0;
 4     ioContext->Reset();
 5     ioContext->ioType = RECV_POSTED;
 6 
 7     int nBytesRecv = WSARecv(ioContext->ioSocket, &ioContext->wsaBuf, 1, &dwBytes, &dwFlags, &ioContext->overLapped, NULL);
 8     // 如果返回值錯誤,並且錯誤的代碼並非是Pending的話,那就說明這個重疊請求失敗了
 9     if ((SOCKET_ERROR == nBytesRecv) && (WSA_IO_PENDING != WSAGetLastError()))
10     {
11         DoClose(sockContext);
12         return false;
13     }
14     return true;
15 }
PostRecv

  此處要注意理清第4步中說的兩個變量的傳入。

  DoAccept中還調用了OnConnectionEstablished()函數,這是一個虛函數,派生類重載這個函數即可處理連接接入的通知。具體看代碼里的例程。

  6、收到recv通知時調用DoRecv()

   在用戶收到recv的完成通知時,需要后續代碼並不多,我們把代碼放到DoRecv()中:需要做兩件事情:

  1. 處理WSABuf中的數據
  2. 向此socket重新投遞一個WSARecv請求
BOOL IOCPBase::DoRecv(SocketContext * sockContext, IOContext * ioContext)
{
    OnRecvCompleted(sockContext, ioContext);
    ioContext->Reset();
    if (false == PostRecv(sockContext, ioContext))
    {
        DoClose(sockContext);
        return false;
    }
    return true;
}
DoRecv

  此處要注意理清第4步中說的兩個變量的傳入。

  7、關閉完成端口

   Worker線程一旦進入了GetQueuedCompletionStatus()的階段,就會進入睡眠狀態,INFINITE的等待完成端口中,如果完成端口上一直都沒有已經完成的I/O請求,那么這些線程將無法被喚醒,這也意味着線程沒法正常退出。

  熟悉或者不熟悉多線程編程的朋友,都應該知道,如果在線程睡眠的時候,簡單粗暴的就把線程關閉掉的話,那是會一個很可怕的事情,因為很多線程體內很多資源都來不及釋放掉,無論是這些資源最后是否會被操作系統回收,我們作為一個C++程序員來講,都不應該允許這樣的事情出現。

  所以我們必須得有一個很優雅的,讓線程自己退出的辦法。

       這時會用到我們這次見到的與完成端口有關的最后一個API,叫 PostQueuedCompletionStatus(),從名字上也能看得出來,這個是和 GetQueuedCompletionStatus() 函數相對的,這個函數的用途就是可以讓我們手動的添加一個完成端口I/O操作,這樣處於睡眠等待的狀態的線程就會有一個被喚醒,如果為我們每一個Worker線程都調用一次PostQueuedCompletionStatus()的話,那么所有的線程也就會因此而被喚醒了。

  PostQueuedCompletionStatus()函數的原型是這樣定義的:

BOOL WINAPI PostQueuedCompletionStatus(  
                   __in      HANDLE CompletionPort,  
                   __in      DWORD dwNumberOfBytesTransferred,  
                   __in      ULONG_PTR dwCompletionKey,  
                   __in_opt  LPOVERLAPPED lpOverlapped  
);  

  我們可以看到,這個函數的參數幾乎和GetQueuedCompletionStatus()的一模一樣,都是需要把我們建立的完成端口傳進去,然后后面的三個參數是 傳輸字節數、結構體參數、重疊結構的指針.

       注意,這里也有一個很神奇的事情,正常情況下,GetQueuedCompletionStatus()獲取回來的參數本來是應該是系統幫我們填充的,或者是在綁定完成端口時就有的,但是我們這里卻可以直接使用PostQueuedCompletionStatus()直接將后面三個參數傳遞給GetQueuedCompletionStatus(),這樣就非常方便了。

       例如,我們為了能夠實現通知線程退出的效果,可以自己定義一些約定,比如把這后面三個參數設置一個特殊的值,然后Worker線程接收到完成通知之后,通過判斷這3個參數中是否出現了特殊的值,來決定是否是應該退出線程了。

       例如我們在調用的時候,就可以這樣:

for (int i = 0; i < workerThreadNum; i++)
{
	// 通知所有完成端口退出
	PostQueuedCompletionStatus(completionPort, 0, (DWORD)EXIT_CODE, NULL);
}

  謝謝大家看到這里!!!(完)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM