重疊I/O之事件通知


   在 Winsock 中,重疊 I/O(Overlapped I/O)模型能達到更佳的系統性能,高於select模型、異步選擇和事件選擇三種。重疊模型的基本設計原理便是讓應用程序使

用一個重疊的數據結構(WSAOVERLAPPED),一次投遞一個或多個 Winsock I/O 請求。針對這些提交的請求,在它們完成之后,我們的應用程序會收到通知,於是

我們就可以對數據進行處理了。

    要想在一個套接字上使用重疊 I/O 模型,首先必須使用 WSA_FLAG_OVERLAPPED 這個標志,創建一個套接字。例如:

SOCKET s = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    注:創建套接字的時候,假如使用的是 socket 函數,那么會默認設置 WSA_FLAG_OVERLAPPED 標志。

    成功建好一個套接字,同時將其與一個本地接口綁定到一起后,便可開始進行重疊 I/O 操作,為了要使用重疊結構,我們常用的 send、recv 等收發數據的函數也都要

被 WSASend、WSARecv 替換掉了,方法是調用下述的 Winsock 函數,同時指定一個 WSAOVERLAPPED 結構(可選):

    WSASend()

    WSASendTo()

    WSARecv()

    WSARecvFrom()

    WSAIoctl()

    AcceptEx()

    TrnasmitFile()

    WSA_IO_PENDING : 最常見的返回值,這是說明我們的重疊函數調用成功了,但是 I/O 操作還沒有完成。

    若隨一個 WSAOVERLAPPED 結構一起調用這些函數,函數會立即完成並返回,無論套接字是否設為阻塞模式。那么我們如何來得知我們的 I/O 請求是否成功了呢?

方法有兩個:

    1、事件對象;(有64個socket的限制)

    2、完成例程。

    這兩種方法用法差不多,區別即在於我們投遞的操作:比如WSARecv操作,完成之后,操作系統以什么樣的方式通知我們。這篇隨筆主要講得是事件通知,完成例程

在另一篇隨筆。

   這里只需要注意一點,重疊函數(如:WSARecv)的參數中都有一個 Overlapped 參數,我們可以假設是把我們的WSARecv這樣的操作“綁定”到這個重疊結構上,

提交一個請求,而不是將操作立即完成,其他的事情就交給重疊結構去做,而其中重疊結構又要與Windows的事件對象“綁定”在一起,這樣我們調用完 WSARecv 以后

就可以“坐享其成”,等到重疊操作完成以后,自然會有與之對應的事件來通知我們操作完成,然后我們就可以來根據重疊操作的結果取得我們想要的數據了。

    重疊 I/O 的事件通知方法要求將 Win32事件對象與 WSAOVERLAPPED 結構關聯在一起,當 I/O 操作完成后,事件的狀態會變成“已傳信”狀態,即激發態;下面來

看一下 WSAOVERLAPPED 結構的定義:

typedef struct _WSAOVERLAPPED {
    DWORD    Internal;
    DWORD    InternalHigh;
    DWORD    Offset;
    DWORD    OffsetHigh;
    WSAEVENT hEvent;
} WSAOVERLAPPED, FAR * LPWSAOVERLAPPED;

    其中,Internal、InternalHigh、Offset 和 OffsetHigh 字段均由系統在內部使用,不應由應用程序直接進行處理或使用。而另一方面,hEvent 字段有點兒特殊,它

允許應用程序將一個事件對象句柄同一個套接字關聯起來。大家可能會覺得奇怪,如何將一個事件對象句柄分配給該字段呢?正如我們早先在 WSAEventSelect 模型中

講述的那樣,可用 WSACreateEvent 函數來創建一個事件對象句柄。一旦創建好一個事件句柄,簡單地將重疊結構的 hEvent 字段分配給事件句柄,再使用重疊結構,

調用一個Winsock函數即可,比如 WSASend 或 WSARecv。

    一個重疊 I/O 請求最終完成后,我們的應用程序要負責取回重疊 I/O 操作的結果。一個重疊請求操作最終完成之后,在事件通知方法中,Winsock會更改與一個

WSAOVERLAPPED 結構對應的一個事件對象的事件傳信狀態,將其從“未傳信”變成“已傳信”。由於一個事件對象已分配給 WSAOVERLAPPED 結構,所以只需簡單地

調用 WSAWaitForMultipleEvents 函數,從而判斷出一個重疊 I/O 調用在什么時候完成。WSAWaitForMultipleEvents 函數已在事件選擇模型中說過,這里不再細

說。

    發現一次重疊請求完成之后,接着需要調用 WSAGetOverlappedResult(取得重疊結構)函數,判斷那個重疊調用到底是成功,還是失敗。該函數的定義如下:

BOOL WSAAPI WSAGetOverlappedResult(
  __in          SOCKET s,
  __in          LPWSAOVERLAPPED lpOverlapped,
  __out         LPDWORD lpcbTransfer,
  __in          BOOL fWait,
  __out         LPDWORD lpdwFlags
);

    s 參數用於指定在重疊操作開始的時候,與之對應的那個套接字。

    lpOverlapped 參數是一個指針,對應於在重疊操作開始時,指定的那個 WSAOVERLAPPED 結構。

    lpcbTransfer 參數也是一個指針,對應一個DWORD(雙字)變量,負責接收一次重疊發送或接收操作實際傳輸的字節數。

    fWait 參數用於決定函數是否應該等待一次待決(未決)的重疊操作完成。若將 fWait設為 TRUE,那么除非操作完成,否則函數不會返回;若設為FALSE,而且操作

仍然處於“待決”狀態,那么WSAGetOverlappedResult 函數會返回 FALSE值,同時返回一個WSAIOINCOMPLETE(I/O操作未完成)錯誤。但就我們目前的情況來

說,由於需要等候重疊操作的一個已傳信事件完成,所以該參數無論采用什么設置,都沒有任何效果。

    lpdwFlags參數對應於一個指針,指向一個DWORD(雙字),負責接收結果標志(假如原先的重疊調用是用WSARecv或WSARecvFrom函數發出的)。

    返回值:若 WSAGetOverlappedResult 函數調用成功,返回值就是TRUE。這意味着我們的重疊 I/O 操作已成功完成,而且由 lpcbTransfer 參數指向的值已進行

了更新。若返回值是FALSE,那么可能是由下述任何一種原因造成的:

    1、重疊 I/O操 作仍處在“待決”狀態。

    2、重疊操作已經完成,但含有錯誤。

    3、重疊操作的完成狀態不可判決,因為在提供給 WSAGetOverlappedResult函數的一個或多個參數中,存在着錯誤。

    失敗后,由 lpcbTransfer 參數指向的值不會進行更新,而且我們的應用程序應調用 WSAGetLastError 函數,調查到底是何種原因造成了調用失敗。

    在 Windows NT 和 Windows 2000 中,重疊 I/O 模型也允許應用程序以一種重疊方式,實現對客戶端連接的接受。具體的做法是在監聽套接字上調用 AcceptEx

函數。AcceptEx 是一個特殊的 Winsock1.1 擴展函數,位於 Mswsock.h 頭文件以及 Mswsock.lib 庫文件內。AcceptEx 函數的定義如下:

BOOL AcceptEx(
    __in SOCKET sListenSocket,
    __in SOCKET sAcceptSocket,
    __in PVOID lpOutputBuffer,
    __in DWORD dwReceiveDataLength,
    __in DWORD dwLocalAddressLength,
    __in DWORD dwRemoteAddressLength,
    __out LPDWORD lpdwBytesReceived,
    __in LPOVERLAPPED lpOverlapped
);

    sListenSocket 參數指定的是一個監聽套接字。

    sAcceptSocket 參數指定的是另一個套接字,負責對進入連接請求的“接受”。AcceptEx 函數和 accept 函數的區別在於,我們必須提供接受的套接字,而不是讓函數

自動為我們創建。正是由於要提供套接字,所以要求我們事先調用 socket 或 WSASocket 函數,創建一個套接字,以便通過 sAcceptSocket 參數,將其傳遞給

AcceptEx。
    lpOutputBuffer 參數指定的是一個特殊的緩沖區,因為它要負責三種數據的接收:服務器的本地地址,客戶機的遠程地址,以及在新建連接上發送的第一個數據塊。

    dwReceiveDataLength參數以字節為單位,指定了在 lpOutputBuffer 緩沖區中,保留多大的空間,用於數據的接收。

如這個參數設為0,那么在連接的接受過程中,不會再一道接收任何數據。

    dwLocalAddressLength 和 dwRemoteAddressLength 參數也是以字節為單位,指定在 lpOutputBuffer 緩沖區中,保留多大的空間,在一個套接字被接受的時

候,用於本地和遠程地址信息的保存。要注意的是,和當前采用的傳送協議允許的最大地址長度比較起來,這里指定的緩沖區大小至少應多出16字節。舉個例子來說:假

定正在使用的是 TCP/IP 協議,那么這里的大小應設為“SOCKADDRIN 結構的長度+16字節”。

    lpdwBytesReceived 參數用於返回接收到的實際數據量,以字節為單位。只有在操作以同步方式完成的前提下,才會設置這個參數。假如 AcceptEx 函數返回 ERROR_IO_PENDING,那么這個參數永遠都不會設置,我們必須利用完成事件通知機制,獲知實際讀取的字節量。

    lpOverlapped 參數對應的是一個 OVERLAPPED 結構,允許 AcceptEx 以一種異步方式工作。如我們早先所述,只有在一個重疊 I/O 應用中,該函數才需要使用事

件對象通知機制,這是由於此時沒有一個完成例程參數可供使用。也就是說 AcceptEx 函數只能由本節課給大家講的“事件通知”方式獲取異步 I/O 請求的結果,而“完成例

程”方法無法被使用。

    重疊 I/O 模型的編程步驟總結如下:

    1、創建一個套接字,開始在指定的端口上監聽連接請求;

    2、 接受一個客戶端進入的連接請求;

    3、為接受的套接字新建一個 WSAOVERLAPPED 結構,並為該結構分配一個事件對象句柄,同時將該事件對象句柄分配給一個事件數組,以便稍后由

WSAWaitForMultipleEvents 函數使用。

    4、在套接字上投遞一個異步 WSARecv 請求,指定參數為 WSAOVERLAPPED 結構。注意函數通常會以失敗告終,返回 SOCKET_ERROR 錯誤狀態

WSA_IO_PENDING(I/O操作尚未完成);

    5、 使用步驟3)的事件數組,調用 WSAWaitForMultipleEvents 函數,並等待與重疊調用關聯在一起的事件進入“已傳信”狀態(換言之,等待那個事件的“觸發”);

    6、 WSAWaitForMultipleEvents 函數返回后,針對“已傳信”狀態的事件,調用 WSAResetEvent(重設事件)函數,從而重設事件對象,並對完成的重疊請求進行

處理;

    7、 使用 WSAGetOverlappedResult 函數,判斷重疊調用的返回狀態是什么;

    8、 在套接字上投遞另一個重疊 WSARecv 請求;

    9、 重復步驟5)~8)。

    下面我們來看看界面的演示:

    服務器端:

    

    客戶端界面:

    

    客戶端與服務器通信的信息:

    

    下面,我們來看看代碼如何實現的,初始化socket等步驟我們就不再細說,我們直接來看看重疊IO之事件通知的核心部分。

    1、我們定義幾個結構,做好准備工作

#define BUF_SIZE 4096   //緩沖區大小4k

enum IO_TYPE            //操作類型
{
    IO_ACCEPT,
    IO_READ,
    IO_WRITE,
    IO_UNKNOWN
};

struct szOverlapped               //自定義結構,第一個成員必定是WSAOVERLAPPED
{ 
    WSAOVERLAPPED m_overlapped;   //不多說
    SOCKET m_socket;              //接受客戶端連接的socket或者是與客戶端通信的socket  
    SOCKET m_accSocket;           //客戶端連接的socket
    IO_TYPE m_iotype;             //投遞操作的類型
    char m_buf[BUF_SIZE];         //接受客戶端發送的信息
    szOverlapped()
    {
        ZeroMemory(&m_overlapped, sizeof(WSAOVERLAPPED));
        m_socket = INVALID_SOCKET;
        m_accSocket = INVALID_SOCKET;
        m_overlapped.hEvent = WSACreateEvent();
        m_iotype = IO_UNKNOWN;
        ZeroMemory(m_buf, BUF_SIZE);
    }
};

    2、啟動一個工作線程,幾乎所有工作都在這個線程里操作,防止主界面卡死,線程代碼如下

UINT ThreadProc(LPVOID lpParameter)
{
    COverlapped1Dlg *pDlg = (COverlapped1Dlg*)lpParameter;
    ASSERT(pDlg != NULL);

    SOCKET listenSocket = INVALID_SOCKET;
    listenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (INVALID_SOCKET == listenSocket)
    {
        return FALSE;
    }
    char ipbuf[1024] = {0};
    wcstombs(ipbuf, pDlg->GetIpAddress(), pDlg->GetIpAddress().GetLength());
    const char *p = ipbuf;
    sockaddr_in serverAddress;
    serverAddress.sin_addr.S_un.S_addr = inet_addr(p);
    serverAddress.sin_family = AF_INET;
    serverAddress.sin_port = htons(pDlg->m_iPort);

    if (SOCKET_ERROR == bind(listenSocket, (sockaddr*)&serverAddress, sizeof(sockaddr_in)))
    {
        return FALSE;
    }

    listen(listenSocket, SOMAXCONN);
    pDlg->ShowText(_T("系統消息:服務器開始監聽。。。"));
    pDlg->PostAccept(listenSocket);
    CString cstrText;
    DWORD cbTransfer;
    DWORD dwFlag;

    while (TRUE)
    {
        DWORD Index = WSAWaitForMultipleEvents(pDlg->m_dwTotal, pDlg->m_eventArray, FALSE, 100, FALSE);
        if (WSA_WAIT_TIMEOUT  == Index)
        {
            continue;
        }
        Index = Index - WSA_WAIT_EVENT_0;  //事件通知都是等待事件的發生,然后根據事件去獲取究竟是哪個重疊結構和socket,然后根據獲取的重疊結構和socket進行相應的操作
        szOverlapped *pOverlapped = pDlg->FindOverlappedByEvent(pDlg->m_eventArray[Index]);
        WSAResetEvent(pDlg->m_eventArray[Index]);
        ASSERT(pOverlapped != NULL);
        if (!WSAGetOverlappedResult(pDlg->FindSocketByEvent(pDlg->m_eventArray[Index]), &(pOverlapped->m_overlapped), &cbTransfer, TRUE, &dwFlag))
        {
            continue;
        }
        switch(pOverlapped->m_iotype)     
        {
        case IO_ACCEPT:                              //客戶端接入的socket就是szOverlapped結構的成員m_accSocket,這里必定先投遞
            pDlg->PostRecv(pOverlapped->m_accSocket);//這個socket接受數據,然后才在監聽socket投遞下一accept命令
            pDlg->PostAccept(pOverlapped);
            pDlg->ShowText(_T("系統消息:客戶端接入成功。。。"));
            break;
        case IO_READ:
            if (cbTransfer > 0)
            {
                cstrText.Format(_T("%s"), pOverlapped->m_buf);
                pDlg->ShowText(_T("Client: >") + cstrText);
                pDlg->PostRecv(pOverlapped);
            }
            else
            {
                pDlg->CleanUp(pOverlapped);
            }
            break;
        default:
            break;
        }        
    }

    if (listenSocket != INVALID_SOCKET) {
        closesocket(listenSocket);
    }

    pDlg->CleanAll();   //清理操作
    WSACleanup();
    return TRUE;
}

    同步調用和異步調用的區別:同步調用等待客戶端接入時會一直卡在那里,直到有客戶端接入,而異步調用等待客戶端接入就好比發送一個等待客戶端接入的命令給操

作系統,意思就是說你(操作系統)幫我接入一個客戶端,客戶端接入成功之后你再告訴我,這就是為什么上面的代碼中進入循環之前我需要調用PostAccept函數的原因。

同樣的,接受客戶端的信息也一樣,當收到客戶端的信息之后,也要在這個客戶端投遞下一個recv命令。

    PostAccept()函數與PostRecv()函數,這里我都進行了重載,不過代碼都大致一樣

BOOL COverlapped1Dlg::PostAccept(SOCKET sock)
{
    if (INVALID_SOCKET == sock)
    {
        return FALSE;
    }
    DWORD dwBytesReceived = 0;
    szOverlapped *pItem = new szOverlapped();
    pItem->m_socket = sock;
    pItem->m_iotype = IO_ACCEPT;
    pItem->m_accSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (!AddSzOverlapped(pItem))
    {
        return FALSE;
    }
    m_eventArray[m_dwTotal] = pItem->m_overlapped.hEvent;
    m_dwTotal++;
    if (!AcceptEx(pItem->m_socket, pItem->m_accSocket, pItem->m_buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesReceived, &(pItem->m_overlapped)))
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            return FALSE;
        }
    }
    return TRUE;
}

BOOL COverlapped1Dlg::PostAccept(szOverlapped * pOverlapped)
{
    ASSERT(pOverlapped != NULL);
    DWORD dwBytesReceived;
    pOverlapped->m_iotype = IO_ACCEPT;
    pOverlapped->m_accSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

    if (AcceptEx(pOverlapped->m_socket, pOverlapped->m_accSocket, pOverlapped->m_buf, 0, sizeof(sockaddr_in) + 16, sizeof(sockaddr_in) + 16, &dwBytesReceived, &(pOverlapped->m_overlapped)))
    {
        if (WSAGetLastError() != WSA_IO_PENDING)
        {
            return FALSE;
        }
    }
    return TRUE;
}

BOOL COverlapped1Dlg::PostRecv(SOCKET sock)
{
    WSABUF wsabuf = {0};
    szOverlapped *pOverlapped = new szOverlapped();
    pOverlapped->m_socket = sock;
    pOverlapped->m_iotype = IO_READ;
    wsabuf.buf = pOverlapped->m_buf;
    wsabuf.len = BUF_SIZE;
    DWORD numberOfBytesRecvd = 0, flags = 0;

    if (!AddSzOverlapped(pOverlapped))
    {
        return FALSE;
    }
    m_eventArray[m_dwTotal] = pOverlapped->m_overlapped.hEvent;
    m_dwTotal++;
    int iRet = WSARecv(pOverlapped->m_socket, &wsabuf, 1, &numberOfBytesRecvd, &flags, &(pOverlapped->m_overlapped), NULL);
    if (NO_ERROR != iRet)
    {
        if (WSA_IO_PENDING != WSAGetLastError())
        {
            return FALSE;
        }
    }
    return TRUE;
}

BOOL COverlapped1Dlg::PostRecv(szOverlapped * pOverlapped)
{
    WSABUF wsabuf = {0};
    pOverlapped->m_iotype = IO_READ;
    ZeroMemory(pOverlapped->m_buf, BUF_SIZE);
    wsabuf.buf = pOverlapped->m_buf;
    wsabuf.len = BUF_SIZE;
    DWORD numberOfBytesRecvd = 0, flags = 0;

    int iRet = WSARecv(pOverlapped->m_socket, &wsabuf, 1, &numberOfBytesRecvd, &flags, &(pOverlapped->m_overlapped), NULL);
    
    if (NO_ERROR != iRet)
    {
        if (WSA_IO_PENDING != WSAGetLastError())
        {
            return FALSE;
        }
    }
    return TRUE;
}

    另外一些輔助函數都很簡單,一看就明白了,代碼如下:

BOOL COverlapped1Dlg::AddSzOverlapped(szOverlapped * pOverlapped)  //添加szOverlapped結構函數,保存每一個重疊結構和socket
{
    if (m_AllOverlapped.GetCount() > WSA_MAXIMUM_WAIT_EVENTS)
    {
        closesocket(pOverlapped->m_accSocket);
        delete pOverlapped;
        return FALSE;
    }
    m_AllOverlapped.Add(pOverlapped);
    return TRUE;
}

szOverlapped* COverlapped1Dlg::FindOverlappedByEvent(WSAEVENT wsaevent)  //通過事件查找重疊結構
{
    szOverlapped* pItem = NULL;
    for (int i = 0; i < m_AllOverlapped.GetCount(); i++)
    {
        pItem = m_AllOverlapped.GetAt(i);
        if (pItem->m_overlapped.hEvent == wsaevent)
        {
            break;
        }
    }
    return pItem;
}

SOCKET COverlapped1Dlg::FindSocketByEvent(WSAEVENT wsaevent)     //通過事件查找socket
{
    for (int i = 0; i < m_AllOverlapped.GetCount(); i++)
    {
        if (m_AllOverlapped.GetAt(i)->m_overlapped.hEvent == wsaevent)
        {
            return m_AllOverlapped.GetAt(i)->m_socket;
        }
    }
    return INVALID_SOCKET;
}

BOOL COverlapped1Dlg::RemoveEvent(WSAEVENT wsaevent)   //移除事件
{
    DWORD idx;
    for (int i = 0; m_eventArray[i] != 0; i++)
    {
        if (wsaevent == m_eventArray[i])
        {
            idx = i;
        }
    }
    for (int i = idx; m_eventArray[i] != 0; i++)
    {
        m_eventArray[idx] = m_eventArray[idx + 1];
    }
    m_dwTotal--;
    return TRUE;
}

     事件通知最多只能接受64個socket的接入,而完成例程就沒有這樣的限制,我將會在后面給出完成例程的演示。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM