基於 IOCP 的通用異步 Windows Socket TCP 高性能服務端組件的設計與實現


設計概述

  服務端通信組件的設計是一項非常嚴謹的工作,其中性能、伸縮性和穩定性是必須考慮的硬性質量指標,若要把組件設計為通用組件提供給多種已知或未知的上層應用使用,則設計的難度更會大大增加,通用性、可用性和靈活性必須考慮在內。

  現以一個基於 IOCP 的通用異步 Windows Socket TCP 服務端組件為例子,講述其設計與實現相關的問題,希望能引發大家的思考,對大家日后開展相關類似工作時有所幫助。關於通用性、可用性、Socket 模型選型以及接口模型的設計等問題已經在本座前段時間發表的《通用異步 Windows Socket TCP 客戶端組件的設計與實現》中進行過闡述,此處就不再重復了。現在主要針對服務端通信組件的特點闡述設計其設計和實現相關的問題。

  一、線程結構

  與組件相關的線程有 3 種:使用者線程、Accept  線程和工作線程,其中后 2 種由組件實現。

    1. 使用者線程:通過調用 Start/Stop/Send 等組件方法操作組件的一個或多個線程,通常是程序的主線程或其它業務邏輯線程。
    2. Accept 線程:使用 AcceptEx() 接收客戶端連接請求並創建 Client Socket 的線程,將其獨立出來,實現為單獨的線程將使組件的模塊划分更清晰,更重要的是避免與業務邏輯和通信處理的相互影響。
    3. 工作線程:使用 GetQueuedCompletionStatus() 監聽網絡事件並處理網絡交互的多個線程,工作線程處理完網絡事件后會向上層應用發送 OnAccept/OnSend/OnReceive 等組件通知。工作線程的數量可以根據實際情況之行設置(通常建議為:CPU Core Number * 2 + 2)。

  注意:如果上層應用在接收到 OnAccept/OnSend/OnReceive 這些組件通知時直接進行業務邏輯處理並在其中操作組件,則工作線程也成為了使用者線程。另外,如果要處理的業務邏輯比較耗時,上層應用應該在接收到組件通知后交由其他線程處理。

 

  二、性能

  組件采用 Windows 平台效率最高的 IOCP Socket 通信模型,因此在通信接口的性能方面是有保證的,這里就不多說了。現在從組件的設計與實現的角度來來闡述性能的優化。組件在代碼級別做了很多優化,一些看似多余或繁瑣的代碼其實都是為了性能服務;組件在設計方面主要采用了 2 中優化策略:緩存池和私有堆。

    1. 緩存池:在通信的過程中,通常需要頻繁的申請和釋放內存緩沖區(TBufferObj)和 Socket 相關的結構體(TSocketObj),這會大大影響組件的性能,因此,組件為 TBufferObj 和 TSocketObj 建立了動態緩存池, 只有當緩存池中沒有可用對象時才創建新對象,而當緩存對象過多時則會壓縮緩存池。
    2. 私有堆(Private Heap):在操作系統中,new / malloc 等操作是串行化的,雖然一般的應用程序不用太在乎這個問題,但是在一個高並發的服務器中則是個不可忽略的問題,另外 TBufferObj 和 TSocketObj 均為大小固定的結構體,因此非常適合在私有堆中分配內存,避免與 new / malloc 競爭同時又減少內存空洞。(關於私有堆的使用方法請參考這里 ^_^

   

  三、通用性與可用性

  與《通用異步 Windows Socket TCP 客戶端組件的設計與實現》描述的客戶端接口一樣,服務端組件也提供了兩組接口:ISocketServer 接口提供組件操作方法,由上層應用直接調用;IServerSocketListener 接口提供組件通知方法,由上層應用實現,這兩個接口設計得非常簡單,主要方法均不超過 5 個。由於組件自身功能完備(不需要附帶其它庫或代碼)並且職責單一(只管通信,不參與業務邏輯),因此可以十分方便第整合到任何類型的應用程序中。

 

  四、伸縮性

  可以根據實際的使用環境要求設置工作線程的數量、 TBufferObj 和 TSocketObj 緩存池的大小、TBufferObj 緩沖區的大小、Socket 監聽隊列的大小、AccepEx 派發的數目以及心跳檢查的間隔等。

 

  五、連接標識

  組件完全封裝了所有的底層 Socket 通信,上層應用看不到任何通信細節,不必也不能干預任何通信操作。另外,組件在 IServerSocketListener 通知接口的所有方法中都有一個 Connection ID 參數,該參數作為連接標識提供給上層應用識別不同的連接。

 

  下面我們來看看組件的主要實現邏輯。


 

組件實現

  • ISocketServer 和  IServerSocketListener 接口
// 操作類型
enum EnSocketOperation
{
SO_UNKNOWN = 0,
SO_ACCEPT = 1,
SO_CONNECT = 2,
SO_SEND = 3,
SO_RECEIVE = 4,
};

// 監聽器基接口
class ISocketListener
{
public:
// 返回值類型
enum EnHandleResult
{
HR_OK = 0,
HR_IGNORE = 1,
HR_ERROR = 2,
};

public:
  // 已發出數據通知
  virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;

  // 已接收數據通知
  virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;

  // 關閉連接通知
  virtual EnHandleResult OnClose(DWORD dwConnectionID) = 0;

  // 通信錯誤通知
  virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode) = 0;


public:
virtual ~ISocketListener() {}
};

// 監聽器接口
class IServerSocketListener : public ISocketListener

{
public:
  // 接收連接通知
  virtual EnHandleResult OnAccept(DWORD dwConnectionID) = 0;

  // 服務關閉通知
  virtual EnHandleResult OnServerShutdown() = 0;

};

// 操作接口
class ISocketServer
{
public:
// 錯誤碼
enum En_ISS_Error
{
ISS_OK = 0,
ISS_SOCKET_CREATE = 1,
ISS_SOCKET_BIND = 2,
ISS_SOCKET_LISTEN = 3,
ISS_CP_CREATE = 4,
ISS_WORKER_THREAD_CREATE = 5,
ISS_SOCKE_ATTACH_TO_CP = 6,
ISS_ACCEPT_THREAD_CREATE = 7,
};

public:
  // 啟動通行
  virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount) = 0;

  // 關閉通信
  virtual BOOL Stop () = 0;

  // 發送數據
  virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;

  // 是否已啟動
  virtual BOOL HasStarted () = 0;

  // 獲取錯誤碼
  virtual En_ISS_Error GetLastError () = 0;

  // 獲取錯誤描述
  virtual LPCTSTR GetLastErrorDesc() = 0;

  // 獲取客戶端的地址信息
  virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort) = 0;


public:
virtual ~ISocketServer() {}
};

// ISocketServer 接口只能指針
typedef auto_ptr<ISocketServer> ISocketServerPtr;

   從上面的接口定義可以看出,ISocketServer 和  IServerSocketListener 接口非常簡單,上層應用只需調用 ISocketServer 接口的 Start()、Stop() 和 Send() 方法操作組件,並實現 IServerSocketListener 接口的幾個 On***() 方法接收組件通知,底層通信過程並不需要上層應用參與。

 

  • TBufferObjTSocketObj 結構體
struct TBufferObjBase
{
OVERLAPPED ov; // 異步 Overlapped
WSABUF buff; // 數據緩沖 buffer
EnSocketOperation operation; // 操作類型
};

struct TBufferObj : public TBufferObjBase
{
SOCKET client; // Client Socket
};

typedef list<TBufferObj*> TBufferObjPtrList;

struct TSocketObjBase
{
SOCKET socket; // Client Socket
};

struct TSocketObj : public TSocketObjBase
{
SOCKADDR_IN clientAddr; // Socket 地址
DWORD connID;   // Connection ID
CCriSec2 crisec;   // Critical Session
};

typedef list<TSocketObj*> TSocketObjPtrList;
typedef hash_map<DWORD, TSocketObj*> TSocketObjPtrMap;
typedef TSocketObjPtrMap::iterator TSocketObjPtrMapI;
typedef TSocketObjPtrMap::const_iterator TSocketObjPtrMapCI;

  TBufferObjTSocketObj 是負責通信數據交換的載體,並由對應的緩沖池負責管理它們的實例對象。

  • CIocpServer
// 組件實現類
class CIocpServer : public ISocketServer
{
public:
/* 如果需要,可以提供 getter & setter 方法設置下列工作參數 */
static const long DEFAULT_IOCP_THREAD_COUNT; // 默認工作線程數
static const long DEFAULT_ACCEPT_SOCKET_COUNT; // 默認並發 AcceptEx 調用次數
static const long DEFAULT_IOCP_BUFFER_SIZE; // 默認 TBufferObj 數據緩沖區大小
static const long DEFAULT_SOCKET_LISTEN_QUEUE; // 默認 Socket 等候隊列數目
static const long DEFAULT_FREE_SOCKETOBJ_POOL; // TSocketObj 緩沖池大小
static const long DEFAULT_FREE_BUFFEROBJ_POOL; // TBufferObj 緩沖池大小
static const long DEFAULT_FREE_SOCKETOBJ_HOLD; // TSocketObj 緩沖池壓縮閥值
static const long DEFAULT_FREE_BUFFEROBJ_HOLD; // TBufferObj 緩沖池壓縮閥值
static const long DEFALUT_KEEPALIVE_TIMES; // 心跳檢測次數
static const long DEFALUT_KEEPALIVE_INTERVAL; // 心跳檢測間隔

public:
CIocpServer(IServerSocketListener* pListener) // IServerSocketListener 監聽器
: m_psoListener(pListener)
, m_hAcceptThread(NULL)
, m_hCompletePort(NULL)
, m_soListen(INVALID_SOCKET)
, m_pfnAcceptEx(NULL)
, m_pfnGetAcceptExSockaddrs(NULL)
, m_enLastError(ISS_OK)
, m_bStarted(FALSE)
, m_semAccept(DEFAULT_IOCP_THREAD_COUNT, DEFAULT_IOCP_THREAD_COUNT)
{
ASSERT(m_wsSocket.IsValid());
ASSERT(m_psoListener);

Reset();
}

virtual ~CIocpServer()
{
if(HasStarted())
Stop();
}

public:
/* ISocketServer 接口方法實現 */
  virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount = DEFAULT_IOCP_THREAD_COUNT);
  virtual BOOL Stop ();
  virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen);
  virtual BOOL HasStarted () {return m_bStarted;}
  virtual En_ISS_Error GetLastError () {return m_enLastError;}
  virtual LPCTSTR GetLastErrorDesc();
  virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort);

private:
void SetLastError(En_ISS_Error code, LPCTSTR func, int ec);
void Reset();

private:
BOOL CreateListenSocket(LPCTSTR pszBindAddress, USHORT usPort);
BOOL CreateCompletePort();
BOOL CreateWorkerThreads(long lThreadCount);
BOOL StartAcceptThread();

void CloseListenSocket();
void WaitForAcceptThreadEnd();
void CloseClientSocket();
void ReleaseFreeSocket();
void CompressFreeSocket(size_t size);
void ReleaseFreeBuffer();
void CompressFreeBuffer(size_t size);
void WaitForWorkerThreadEnd();
void TerminateWorkerThread();
void CloseCompletePort();

/* TBufferObj 和 TSocketObj 緩沖池系列方法 */
TBufferObj* GetFreeBufferObj(int iLen = DEFAULT_IOCP_BUFFER_SIZE);
TSocketObj* GetFreeSocketObj();
void AddFreeBufferObj(TBufferObj* pBufferObj);
void AddFreeSocketObj(DWORD dwConnID, BOOL bClose = TRUE, BOOL bGraceful = TRUE, BOOL bReuseAddress = FALSE);
TBufferObj* CreateBufferObj();
TSocketObj* CreateSocketObj();
void DeleteBufferObj(TBufferObj* pBufferObj);
void DeleteSocketObj(TSocketObj* pSocketObj);

void AddClientSocketObj(DWORD dwConnID, TSocketObj* pSocketObj);
TSocketObj* FindSocketObj(DWORD dwConnID);

private:
static UINT WINAPI AcceptThreadProc(LPVOID pv); // Accept 線程函數
static UINT WINAPI WorkerThreadProc(LPVOID pv); // 工作線程函數

void HandleIo (TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwBytes, DWORD dwErrorCode);
void HandleAccept (SOCKET soListen, TBufferObj* pBufferObj);
void HandleSend (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
void HandleReceive (TSocketObj* pSocketObj, TBufferObj* pBufferObj);

int DoSend (TSocketObj* pSocketObj, TBufferObj* pBufferObj);
int DoReceive (TSocketObj* pSocketObj, TBufferObj* pBufferObj);

private:
SOCKET GetAcceptSocket();
BOOL DeleteAcceptSocket(SOCKET socket, BOOL bCloseSocket = FALSE);
void ReleaseAcceptSockets();

private:
// 這個屬性是否似曾相識 ^_^ (參考講述客戶端組件的那篇文章)
  CInitSocket m_wsSocket;
// AcceptEx() 函數指針
LPFN_ACCEPTEX m_pfnAcceptEx;
// GetAcceptExSockAddrs() 函數指針
LPFN_GETACCEPTEXSOCKADDRS m_pfnGetAcceptExSockaddrs;
private:
IServerSocketListener* m_psoListener; // 監聽器指針

volatile BOOL m_bStarted; // 啟動標識
volatile DWORD m_dwConnID; // Connection ID 當前值

En_ISS_Error m_enLastError;

SOCKET m_soListen; // 監聽 Socket
HANDLE m_hCompletePort; // 完成端口
HANDLE m_hAcceptThread; // Accept 線程句柄
vector<HANDLE> m_vtWorkerThreads; // 工作線程句柄集合

TBufferObjPtrList m_lsFreeBuffer; // TBufferObj 緩沖池隊列
TSocketObjPtrList m_lsFreeSocket; // TSocketObj 緩沖池隊列
TSocketObjPtrMap m_mpClientSocket; // Connection ID 映射

CCriSec m_csFreeBuffer;
CCriSec m_csFreeSocket;
CCriSec m_csClientSocket;

CEvt m_evtAccept;
CSEM m_semAccept;
CCriSec m_csAccept;
ulong_set m_setAccept;

  CPrivateHeap m_hpPrivate; // 緩沖池私有堆
};

  這個類定義文件看上去有點復雜,但我們只需關注被注釋的那些方法和屬性就可以了。從上面的類定義可以看出,CIocpServer 實現了 ISocketServer 接口,而它本身並沒有增加任何 public 方法,因此它的使用方式十分簡單。另外,CIocpServer 的構造函數接收一個 ISocketServerListener 指針,CIocpServer 就是通過該指針把組件通知發送給上層應用的。

  • CIocpServer 的主要方法
// 事件觸發宏
#define FireAccept(id) (m_bStarted ? m_psoListener->OnAccept(id) : ISocketListener::HR_IGNORE)
#define FireSend(id, buff, len) (m_bStarted ? m_psoListener->OnSend(id, buff, len) : ISocketListener::HR_IGNORE)
#define FireReceive(id, buff, len) (m_bStarted ? m_psoListener->OnReceive(id, buff, len) : ISocketListener::HR_IGNORE)
#define FireClose(id) (m_bStarted ? m_psoListener->OnClose(id) : ISocketListener::HR_IGNORE)
#define FireError(id, op, code) (m_bStarted ? m_psoListener->OnError(id, op, code) : ISocketListener::HR_IGNORE)
#define FireServerShutdown() m_psoListener->OnServerShutdown()

// 成員常量定義
const long CIocpServer::DEFAULT_IOCP_THREAD_COUNT = ::GetCpuCount() * 2 + 2;
const long CIocpServer::DEFAULT_ACCEPT_SOCKET_COUNT = DEFAULT_IOCP_THREAD_COUNT;
const long CIocpServer::DEFAULT_IOCP_BUFFER_SIZE = 4 * 1024;
const long CIocpServer::DEFAULT_SOCKET_LISTEN_QUEUE = 30;
const long CIocpServer::DEFAULT_FREE_SOCKETOBJ_POOL = 50;
const long CIocpServer::DEFAULT_FREE_BUFFEROBJ_POOL = 150;
const long CIocpServer::DEFAULT_FREE_SOCKETOBJ_HOLD = 150;
const long CIocpServer::DEFAULT_FREE_BUFFEROBJ_HOLD = 450;
const long CIocpServer::DEFALUT_KEEPALIVE_TIMES = 3;
const long CIocpServer::DEFALUT_KEEPALIVE_INTERVAL = 10 * 1000;

BOOL CIocpServer::Start(LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount)
{
if(CreateListenSocket(pszBindAddress, usPort)) // 創建監聽 Socket
if(CreateCompletePort()) // 創建完成端口
if(CreateWorkerThreads(lThreadCount)) // 啟動工作線程
if(StartAcceptThread()) // 啟動 Accept 線程
return (m_bStarted = TRUE);

Stop();
return (m_bStarted = FALSE);
}

BOOL CIocpServer::Stop()
{
BOOL bStarted = m_bStarted;
m_bStarted = FALSE;

CloseListenSocket();

if(bStarted)
{
WaitForAcceptThreadEnd(); // 停止 Accept 線程

FireServerShutdown(); // 發送關閉通知

CloseClientSocket(); // 關閉所有連接

WaitForWorkerThreadEnd(); // 停止工作線程

ReleaseFreeSocket(); // 釋放 TSocketObj 緩沖池
ReleaseFreeBuffer(); // 釋放 TBufferObj 緩沖池
}
else
TerminateWorkerThread(); // 終止工作線程

CloseCompletePort(); // 關閉完成端口

Reset(); // 重設組件屬性

return TRUE;
}

BOOL CIocpServer::Send(DWORD dwConnID, const BYTE* pBuffer, int iLen)
{
ASSERT(iLen > 0);

TSocketObj* pSocketObj = NULL;

{
CCriSecLock locallock1(m_csClientSocket);

// 根據 Connection ID 查找對應 TSocketObj 對象
TSocketObjPtrMapCI it = m_mpClientSocket.find(dwConnID);
if(it != m_mpClientSocket.end())
pSocketObj = it->second;
}

if(pSocketObj == NULL)
return FALSE;

CCriSecLock2 locallock2(pSocketObj->crisec);

int iRemain = iLen;

while(iRemain > 0)
{
// 填充 TBufferObj 緩沖區
int iBufferSize = min(iRemain, DEFAULT_IOCP_BUFFER_SIZE);
TBufferObj* pBufferObj = GetFreeBufferObj(iBufferSize);
memcpy(pBufferObj->buff.buf, pBuffer, iBufferSize);

// 發送數據
if(DoSend(pSocketObj, pBufferObj) != NO_ERROR)
return FALSE;

iRemain -= iBufferSize;
pBuffer += iBufferSize;
}

return TRUE;
}

// Accept 線程函數
UINT WINAPI CIocpServer::AcceptThreadProc(LPVOID pv)
{
CIocpServer* pServer = (CIocpServer*)pv;

ASSERT(pServer->m_soListen != INVALID_SOCKET);

TRACE("-----> 啟動監聽線程 <-----\n");

while(TRUE)
{
HANDLE handles[] = {pServer->m_semAccept, pServer->m_evtAccept};
DWORD dwResult = ::WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE);

if(dwResult == WAIT_OBJECT_0)
{
TBufferObj* pBufferObj = pServer->GetFreeBufferObj();
SOCKET soClient = pServer->GetAcceptSocket();

// 調用 AcceptEx() 異步接收連接請求
if(::PostAccept(pServer->m_pfnAcceptEx, pServer->m_soListen, soClient, pBufferObj) != NO_ERROR)
{
pServer->DeleteBufferObj(pBufferObj);
pServer->DeleteAcceptSocket(soClient);
::ManualCloseSocket(soClient);

TRACE1("-----> 監聽線程異常終止 (EC: %d) <-----\n", ::WSAGetLastError());
break;
}
}
else if(dwResult == WAIT_OBJECT_0 + 1)
{
pServer->ReleaseAcceptSockets();

TRACE("-----> 停止監聽線程 <-----\n");
break;
}
else
VERIFY(FALSE);
}

return 0;
}

// 工作線程函數
UINT WINAPI CIocpServer::WorkerThreadProc(LPVOID pv)
{
CIocpServer* pServer = (CIocpServer*)pv;

while (TRUE)
{
DWORD dwErrorCode = NO_ERROR;

DWORD dwBytes;
OVERLAPPED* pOverlapped;
TSocketObj* pSocketObj;
TBufferObj* pBufferObj;

// 等待完成事件
BOOL result = ::GetQueuedCompletionStatus
(
pServer->m_hCompletePort,
&dwBytes,
(PULONG_PTR)&pSocketObj,
&pOverlapped,
INFINITE
);

if(dwBytes == 0 && pSocketObj == NULL && pOverlapped == NULL)
return 0;

pBufferObj = CONTAINING_RECORD(pOverlapped, TBufferObj, ov);

if (!result)
{
DWORD dwFlag = 0;
DWORD dwSysCode = ::GetLastError();

if(pServer->m_bStarted)
{
SOCKET sock = pBufferObj->operation != SO_ACCEPT ? pSocketObj->socket : (SOCKET)pSocketObj;
result = ::WSAGetOverlappedResult(pSocketObj->socket, &pBufferObj->ov, &dwBytes, FALSE, &dwFlag);

if (!result)
{
dwErrorCode = ::WSAGetLastError();
TRACE3("GetQueuedCompletionStatus failed (SYS: %d, SOCK: %d, FLAG: %d)\n", dwSysCode, dwErrorCode, dwFlag);
}
}
else
dwErrorCode = dwSysCode;
}

// 處理 IO 事件
pServer->HandleIo(pSocketObj, pBufferObj, dwBytes, dwErrorCode);
}

return 0;
}

// 處理 IO 事件
void CIocpServer::HandleIo(TSocketObj* pSocketObj, TBufferObj* pBufferObj, DWORD dwBytes, DWORD dwErrorCode)
{
ASSERT(pBufferObj != NULL);
ASSERT(pSocketObj != NULL);

if(dwErrorCode != NO_ERROR)
{
if(pBufferObj->operation != SO_ACCEPT)
{
FireError(pSocketObj->connID, pBufferObj->operation, dwErrorCode);
AddFreeSocketObj(pSocketObj->connID);
}
else
{
DeleteAcceptSocket(pBufferObj->client);
::ManualCloseSocket(pBufferObj->client);
}

AddFreeBufferObj(pBufferObj);
return;
}

if(dwBytes == 0 && pBufferObj->operation != SO_ACCEPT)
{
FireClose(pSocketObj->connID);
AddFreeSocketObj(pSocketObj->connID);
AddFreeBufferObj(pBufferObj);
return;
}

pBufferObj->buff.len = dwBytes;

switch(pBufferObj->operation)
{
case SO_ACCEPT: // 處理 Accept 事件(內部調用 FireAccept())
HandleAccept((SOCKET)pSocketObj, pBufferObj);
break;
case SO_SEND: // 處理 Send 事件(內部調用 FireSend())
HandleSend(pSocketObj, pBufferObj);
break;
case SO_RECEIVE: // 處理 Receive 事件(內部調用 FireReceive())
HandleReceive(pSocketObj, pBufferObj);
break;
default:
ASSERT(FALSE);
}
}

   上面的代碼就不多作解析了,有興趣的朋友可以下載完整的代碼煙酒煙酒 ^_^ 下面一起來看一個組件的使用示例。


 

使用示例

ServiceEntry.h
 1 // 監聽器
2 class CServiceEntry : public IServerSocketListener
3 {
4 public:
5 // 業務方法
6 BOOL Start();
7 BOOL Stop();
8 BOOL Dispatch(DWORD dwConnID, EnCommandType enCmdType, WORD wCmdDataLen, TCommandData* pCmdData);
9
10 protected:
11 // IServerSocketListener 通知方法
12 virtual EnHandleResult OnAccept (DWORD dwConnectionID);
13 virtual EnHandleResult OnSend (DWORD dwConnectionID, DWORD dwNumberOfBytes);
14 virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pBuffer, int iLen);
15 virtual EnHandleResult OnClose (DWORD dwConnectionID);
16 virtual EnHandleResult OnError (DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode);
17 virtual EnHandleResult OnServerShutdown();
18
19
20 private:
21 CServiceEntry()
22 : m_psoServer(new CIocpServer(this)) // 創建通信組件,並把自己設置為組件的監聽器
23 {
24
25 }
26
27 ~CServiceEntry() {if(m_bStarted) Stop();}
28
29 private:
30 ISocketServerPtr m_psoServer; // 通信組件
31 };
ServiceEntry.cpp
  1 BOOL CServiceEntry::Start()
2 {
3 if(m_bStarted)
4 return FALSE;
5
6 if(Init())
7 {
8 // 啟動通信
9 if(m_psoServer->Start(Config.GetIocpAddress(), Config.GetIocpPort(), Config.GetIocpThreadCount()))
10 {
11 Logger.InfoEx(_T("IOCP Server start --> OK!"));
12
13 VERIFY(StartMonitorThread());
14
15 m_bStarted = TRUE;
16 }
17 else
18 {
19 Logger.FatalEx(_T("IOCP Server start --> Fail! (%s)"), m_psoServer->GetLastErrorDesc());
20 UnInit();
21 }
22 }
23
24 return m_bStarted;
25 }
26
27 BOOL CServiceEntry::Stop()
28 {
29 if(!m_bStarted)
30 return FALSE;
31
32 if(m_psoServer->HasStarted())
33 {
34 // 停止通信
35 m_psoServer->Stop();
36 Logger.InfoEx(_T("IOCP Server stop --> OK!"));
37 }
38
39 WaitForMonitorThreadEnd();
40
41 m_bStarted = !UnInit();
42
43 return !m_bStarted;
44 }
45
46 BOOL CServiceEntry::Dispatch(DWORD dwConnID, EnCommandType enCmdType, WORD wCmdDataLen, TCommandData* pCmdData)
47 {
48 const WORD wBufferLen = CMD_ADDITIVE_SIZE + wCmdDataLen;
49 CLocalByteMemoryBuffer buffer(m_hpPrivate, wBufferLen);
50 BYTE* pBuffer = buffer;
51
52 memcpy(pBuffer, &wBufferLen, CMD_LEN_SIZE);
53 pBuffer += CMD_LEN_SIZE;
54 memcpy(pBuffer, &enCmdType, CMD_TYPE_SIZE);
55 pBuffer += CMD_TYPE_SIZE;
56 memcpy(pBuffer, pCmdData, wCmdDataLen);
57 pBuffer += wCmdDataLen;
58 memcpy(pBuffer, &CMD_FLAG, CMD_FLAG_SIZE);
59
60 // 發送數據
61 return m_psoServer->Send(dwConnID, buffer, wBufferLen);
62 }
63
64 // 處理 OnAccept 事件
65 ISocketListener::EnHandleResult CServiceEntry::OnAccept(DWORD dwConnectionID)
66 {
67 Logger_TryDebug1(_T("<CNNID: %d> 接收連接請求"), dwConnectionID);
68
69 return HR_OK;
70 }
71
72 // 處理 OnSend 事件
73 ISocketListener::EnHandleResult CServiceEntry::OnSend(DWORD dwConnectionID, DWORD dwNumberOfBytes)
74 {
75 Logger_TryDebug2(_T("<CNNID: %d> 發出數據包 (%d bytes)"), dwConnectionID, dwNumberOfBytes);
76
77 return HR_OK;
78 }
79
80 // 處理 OnReceive 事件
81 ISocketListener::EnHandleResult CServiceEntry::OnReceive(DWORD dwConnectionID, const BYTE* pBuffer, int iLen)
82 {
83 ASSERT(pBuffer != NULL && iLen > 0);
84
85 Logger_TryDebug2(_T("<CNNID: %d> 接收數據包 (%d bytes)"), dwConnectionID, iLen);
86
87 CBufferPtr* pRecvBuffer = GetConnectionBuffer(dwConnectionID);
88 ASSERT(pRecvBuffer);
89
90 pRecvBuffer->Cat(pBuffer, iLen);
91 return ParseReceiveBuffer(dwConnectionID, pRecvBuffer) ? HR_OK : HR_ERROR;
92 }
93
94 // 處理 OnClose 事件
95 ISocketListener::EnHandleResult CServiceEntry::OnClose(DWORD dwConnectionID)
96 {
97 Logger_TryDebug1(_T("<CNNID: %d> 關閉連接"), dwConnectionID);
98
99 CommandFactory.ExecuteCommand(this, dwConnectionID, CS_C_SESSION_END, NULL);
100
101 DeleteConnectionBuffer(dwConnectionID);
102
103 return HR_OK;
104 }
105
106 // 處理 OnError 事件
107 ISocketListener::EnHandleResult CServiceEntry::OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode)
108 {
109 Logger_TryDebug3(_T("<CNNID: %d> 網絡錯誤(OP: %d, EC: %d)"), dwConnectionID, enOperation, iErrorCode);
110
111 if(enOperation == SO_RECEIVE)
112 {
113 CommandFactory.ExecuteCommand(this, dwConnectionID, CS_C_SESSION_ABEND, NULL);
114
115 DeleteConnectionBuffer(dwConnectionID);
116
117 return HR_OK;
118 }
119 else
120 return HR_IGNORE;
121 }
122
123 // 處理 OnServerShutdown 事件
124 ISocketListener::EnHandleResult CServiceEntry::OnServerShutdown()
125 {
126 CommandFactory.ExecuteCommand(this, 0, CS_C_SERVER_SHUTDOWN, NULL);
127
128 ClearConnectionBuffer();
129
130 return HR_OK;
131 }

 

  好了,碼了一個晚上的字,累啊!到此為止吧,感謝收看~ 晚安 ^_^


  (想看源代碼的朋友請輕踩這里

 

CodeProject


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM