編寫 Windows Socket TCP 客戶端其實並不困難,Windows 提供了6種 I/O 通信模型供大家選擇。但本座看過很多客戶端程序都把 Socket 通信和業務邏輯混在一起,剪不斷理還亂。每個程序都 Copy / Parse 類似的代碼再進行修改,實在有點情何以堪。因此本座利用一些閑暇時光寫了一個基於 IOCP 的通用異步 Windows Socket TCP 高性能服務端組件和一個通用異步 Windows Socket TCP 客戶端組件供各位看官參詳參詳,希望能激發下大家的靈感。本篇文章講述客戶端組件。閑話少說,我們現在步入正題。
- 最重要的第一個問題:如何才能達到通用?
答:很簡單。
1、限制組件的職能,說白了,通信組件的唯一職責就是接受和發送字節流,絕對不能參與上層協議解析等工作。不在其位不謀其政就是這個意思。
2、與上層使用者解耦、互不依賴,組件與使用者通過接口方法進行交互,組件實現 ISocketClient 接口為上層提供操作方法;使用者通過 IClientSocketListener 接口把自己注冊為組件的 Listener,接收組件通知。因此,任何使用者只要實現了 IClientSocketListener 接口都可以使用組件;另一方面,你甚至可以自己重新寫一個實現方式完全不同的組件實現給使用者調用,只要該組件遵從 ISocketClient 接口。這也是 DIP 設計原則的體現(若想了解更多關於設計原則的內容請猛擊這里 ^_^)。
- 最重要的第二個問題:可用性如何,也就是說使用起來是否是否方便?
答:這個問題問得很好,可用性對所有通用組件都是至關重要的,如果太難用還不如自己重頭寫一個來得方便。因此,ISocketClient 和 IClientSocketListener 接口設計得盡量簡單易用(通俗來說就是“傻瓜化”),這兩個接口的主要方法均不超過 5 個。
- 最重要的第三個問題:組件的性能如何?
作為底層的通用組件,性能問題是必須考慮的,絕對不能成為系統的瓶頸。而另一方面,從實際出發,畢竟只是一個客戶端組件,它的並發性要求遠沒有服務端那么高。因此,組件在設計上充分考慮了性能、現實使用情景、可用性和實現復雜性等因素,確保滿足性能要求的同時又不會寫得太復雜。做出以下兩點設計決策:
-
- 在單獨線程中實現 Socket 通信交互。這樣可以避免與主線程或其他線程相互干擾。
- I/O 模型選擇 WSAEventSelect。細說一下選擇這種 I/O 模型的原因:(各種 I/O 模型的性能比較可以參考:《Windows 網絡編程(中文第二版)》第 154 頁)
- 阻塞模型:(不解析,你懂的^_^)
- 非阻塞模型:(性能太低)
- WSAAsyncSelect: (兩個原因:a、性能太低;b、對於純 Console 程序還要背負 HWND 實在是傷不起呀!)
- 重疊 I/O:(有點復雜了)
- 完成端口:(何必呢?)
唉,理論的東西就先別吹那么多了,直接上代碼吧,求你了 !!
OK!先看看 ISocketClient 和 IClientSocketListener 的接口定義:
// 組件操作類型
enum EnSocketOperation
{
SO_UNKNOWN = 0,
SO_ACCEPT = 1,
SO_CONNECT = 2,
SO_SEND = 3,
SO_RECEIVE = 4,
};
// 組件監聽器基接口
class ISocketListener
{
public:
enum EnHandleResult
{
HR_OK = 0,
HR_IGNORE = 1,
HR_ERROR = 2,
};
public:
// 已發出數據通知
virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 已接收數據通知
virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLength) = 0;
// 關閉連接通知
virtual EnHandleResult OnClose(DWORD dwConnectionID) = 0;
// 通信錯誤通知
virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode) = 0;
public:
virtual ~ISocketListener() {}
};
// 服務端組件監聽器接口(暫時無視之)
class IServerSocketListener : public ISocketListener
{
public:
// 接收連接通知
virtual EnHandleResult OnAccept(DWORD dwConnectionID) = 0;
// 服務關閉通知
virtual EnHandleResult OnServerShutdown() = 0;
};
// 客戶端組件監聽器接口
class IClientSocketListener : public ISocketListener
{
public:
// 連接完成通知
virtual EnHandleResult OnConnect(DWORD dwConnectionID) = 0;
};
// 服務端組件接口(暫時無視之)
class ISocketServer
{
public:
enum En_ISS_Error
{
ISS_OK = 0,
ISS_SOCKET_CREATE = 1,
ISS_SOCKET_BIND = 2,
ISS_SOCKET_LISTEN = 3,
ISS_CP_CREATE = 4,
ISS_WORKER_THREAD_CREATE = 5,
ISS_SOCKE_ATTACH_TO_CP = 6,
ISS_ACCEPT_THREAD_CREATE = 7,
};
public:
virtual BOOL Start (LPCTSTR pszBindAddress, USHORT usPort, long lThreadCount) = 0;
virtual BOOL Stop () = 0;
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;
virtual BOOL HasStarted () = 0;
virtual En_ISS_Error GetLastError () = 0;
virtual LPCTSTR GetLastErrorDesc() = 0;
virtual BOOL GetConnectionAddress(DWORD dwConnID, CString& strAddress, USHORT& usPort) = 0;
public:
virtual ~ISocketServer() {}
};
// 服務端組件接口智能指針
typedef auto_ptr<ISocketServer> ISocketServerPtr;
// 客戶端組件接口
class ISocketClient
{
public:
// 操作結果碼
enum En_ISC_Error
{
ISC_OK = 0,
ISC_CLIENT_HAD_STARTED = 1,
ISC_CLIENT_NOT_STARTED = 2,
ISC_SOCKET_CREATE_FAIL = 3,
ISC_CONNECT_SERVER_FAIL = 4,
ISC_WORKER_CREATE_FAIL = 5,
ISC_NETWORK_ERROR = 6,
ISC_PROTOCOL_ERROR = 7,
};
public:
// 啟動通信
virtual BOOL Start (LPCTSTR pszRemoteAddress, USHORT usPort) = 0;
// 關閉通信
virtual BOOL Stop () = 0;
// 發送數據
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen) = 0;
// 是否已啟動
virtual BOOL HasStarted () = 0;
// 獲取錯誤碼
virtual En_ISC_Error GetLastError () = 0;
// 獲取錯誤描述
virtual LPCTSTR GetLastErrorDesc() = 0;
public:
virtual ~ISocketClient() {}
};
// 客戶端組件接口智能指針
typedef auto_ptr<ISocketClient> ISocketClientPtr;
ISocketClient 接口主要有以下三個方法:
- Start():啟動通信
- Send():發送數據
- Stop():停止通信
IClientSocketListener 接口有以下五個通知方法:
- OnConnect()
- OnSend()
- OnReceive()
- OnClose()
- OnError()
夠簡單了吧^_^,使用者只需通過三個方法操作組件,然后處理五個組件通知。下面我們再看看組件的具體實現,先看組件類定義:
/* 組件實現類 */
class CSocketClient : public ISocketClient
{
// ISocketClient 接口方法
public:
virtual BOOL Start (LPCTSTR pszRemoteAddress, USHORT usPortt);
virtual BOOL Stop ();
virtual BOOL Send (DWORD dwConnID, const BYTE* pBuffer, int iLen);
virtual BOOL HasStarted () {return m_bStarted;}
virtual En_ISC_Error GetLastError () {return sm_enLastError;}
virtual LPCTSTR GetLastErrorDesc();
private:
BOOL CreateClientSocket();
BOOL ConnectToServer(LPCTSTR pszRemoteAddress, USHORT usPort);
BOOL CreateWorkerThread();
// 網絡事件處理方法
BOOL ProcessNetworkEvent();
void WaitForWorkerThreadEnd();
BOOL ReadData();
BOOL SendData();
void SetLastError(En_ISC_Error code, LPCTSTR func, int ec);
// 通信線程函數
static
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
WINAPI WorkerThreadProc(LPVOID pv);
private:
static const int RECEIVE_BUFFER_SIZE = 8 * 1024;
static const int WORKER_THREAD_END_TIME = 3 * 1000;
static const long DEFALUT_KEEPALIVE_TIMES = 3;
static const long DEFALUT_KEEPALIVE_INTERVAL = 10 * 1000;
// 構造函數
public:
CSocketClient(IClientSocketListener* pListener)
: m_pListener(pListener) // 設置監聽器對象
, m_soClient(INVALID_SOCKET)
, m_evSocket(NULL)
, m_dwConnID(0)
, m_hWorker(NULL)
, m_dwWorkerID(0)
, m_bStarted(FALSE)
#ifdef _WIN32_WCE
, sm_enLastError(ISC_OK)
#endif
{
ASSERT(m_pListener);
}
virtual ~CSocketClient() {if(HasStarted()) Stop();}
private:
// 這是神馬 ???
CInitSocket m_wsSocket;
SOCKET m_soClient;
HANDLE m_evSocket;
DWORD m_dwConnID;
CCriSec m_scStop;
CEvt m_evStop;
HANDLE m_hWorker;
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
m_dwWorkerID;
CBufferPtr m_sndBuffer;
CCriSec m_scBuffer;
CEvt m_evBuffer;
volatile BOOL m_bStarted;
private:
// 監聽器對象指針
IClientSocketListener* m_pListener;
#ifndef _WIN32_WCE
__declspec(thread) static En_ISC_Error sm_enLastError;
#else
volatile En_ISC_Error sm_enLastError;
#endif
};
從上面的定義可以看出,組件實現類本身並沒有提供額外的公共方法,它完全是可以被替換的。組件在構造函數中接收監聽器對象,並且保存為其成員屬性,因此可以在需要的時候向監聽器發送事件通知。
另外,不知各位看官是否注意到一個奇怪的成員屬性:“CInitSocket m_wsSocket; ”,這個屬性在其它地方從來都不會用到,那么它是干嘛的呢?在回答這個問題之前,首先想問問大家:Windows Socket 操作的整個操作過程中,第一個以及最后一個被調用的方法是什么?是 socket()、connect()、bind()、還是 closesocket() 嗎?都錯!答案是 —— ::WSAStartup() 和 ::WSACleanup()。每個程序都要調用一下這兩個方法確實是很煩的,又不雅觀。 其實,m_wsSocket 的唯一目的就是為了避免手工調用者兩個方法,看看它的定義就明白了:
class CInitSocket
{
public:
CInitSocket(LPWSADATA lpWSAData = NULL, BYTE minorVersion = 2, BYTE majorVersion = 2)
{
LPWSADATA lpTemp = lpWSAData;
if(!lpTemp)
lpTemp = (LPWSADATA)_alloca(sizeof(WSADATA));
m_iResult = ::WSAStartup(MAKEWORD(minorVersion, majorVersion), lpTemp);
}
~CInitSocket()
{
if(IsValid())
::WSACleanup();
}
int GetResult() {return m_iResult;}
BOOL IsValid() {return m_iResult == 0;}
private:
int m_iResult;
};
現在我們看看組件類實現文件中幾個重要方法的定義:
// 組件事件觸發宏定義
#define FireConnect(id) m_pListener->OnConnect(id)
#define FireSend(id, data, len) (m_bStarted ? m_pListener->OnSend(id, data, len) : ISocketListener::HR_IGNORE)
#define FireReceive(id, data, len) (m_bStarted ? m_pListener->OnReceive(id, data, len) : ISocketListener::HR_IGNORE)
#define FireClose(id) (m_bStarted ? m_pListener->OnClose(id) : ISocketListener::HR_IGNORE)
#define FireError(id, op, code) (m_bStarted ? m_pListener->OnError(id, op, code) : ISocketListener::HR_IGNORE)
// 啟動組件
BOOL CSocketClient::Start(LPCTSTR pszRemoteAddress, USHORT usPort)
{
BOOL isOK = FALSE;
if(HasStarted())
{
SetLastError(ISC_CLIENT_HAD_STARTED, _T(__FUNCTION__), 0);
return isOK;
}
// 創建 socket
if(CreateClientSocket())
{
// 連接服務器(內部會調用 FireConnect() )
if(ConnectToServer(pszRemoteAddress, usPort))
{
// 創建工作線程
if(CreateWorkerThread())
isOK = TRUE;
else
SetLastError(ISC_WORKER_CREATE_FAIL, _T(__FUNCTION__), 0);
}
else
SetLastError(ISC_CONNECT_SERVER_FAIL, _T(__FUNCTION__), ::WSAGetLastError());
}
else
SetLastError(ISC_SOCKET_CREATE_FAIL, _T(__FUNCTION__), ::WSAGetLastError());
isOK ? m_bStarted = TRUE : Stop();
return isOK;
}
// 關閉組件
BOOL CSocketClient::Stop()
{
{
CCriSecLock locallock(m_scStop);
m_bStarted = FALSE;
if(m_hWorker != NULL)
{
// 停止工作線程
if(::GetCurrentThreadId() != m_dwWorkerID)
WaitForWorkerThreadEnd();
::CloseHandle(m_hWorker);
m_hWorker = NULL;
m_dwWorkerID = 0;
}
if(m_evSocket != NULL)
{
// 關閉 WSAEvent
::WSACloseEvent(m_evSocket);
m_evSocket = NULL;
}
if(m_soClient != INVALID_SOCKET)
{
// 關閉socket
shutdown(m_soClient, SD_SEND);
closesocket(m_soClient);
m_soClient = INVALID_SOCKET;
}
m_dwConnID = 0;
}
// 釋放其它資源
m_sndBuffer.Free();
m_evBuffer.Reset();
m_evStop.Reset();
return TRUE;
}
// 發送數據
BOOL CSocketClient::Send(DWORD dwConnID, const BYTE* pBuffer, int iLen)
{
ASSERT(iLen > 0);
if(!HasStarted())
{
SetLastError(ISC_CLIENT_NOT_STARTED, _T(__FUNCTION__), 0);
return FALSE;
}
CCriSecLock locallock(m_scBuffer);
// 把數據存入緩沖器
m_sndBuffer.Cat(pBuffer, iLen);
// 喚醒工作現場,發送數據
m_evBuffer.Set();
return TRUE;
}
// 工作線程函數
#ifndef _WIN32_WCE
UINT
#else
DWORD
#endif
WINAPI CSocketClient::WorkerThreadProc(LPVOID pv)
{
CSocketClient* pClient = (CSocketClient*)pv;
TRACE0("---------------> 啟動工作線程 <---------------\n");
HANDLE hEvents[] = {pClient->m_evSocket, pClient->m_evBuffer, pClient->m_evStop};
while(pClient->HasStarted())
{
// 等待 socket 事件、發送數據事件和停止通信事件
DWORD retval = ::MsgWaitForMultipleObjectsEx(3, hEvents, WSA_INFINITE, QS_ALLINPUT, MWMO_INPUTAVAILABLE);
if(retval == WSA_WAIT_EVENT_0)
{
// 處理網絡消息
if(!pClient->ProcessNetworkEvent())
{
if(pClient->HasStarted())
pClient->Stop();
break;
}
}
else if(retval == WSA_WAIT_EVENT_0 + 1)
{
// 發送數據(內部調用 FireSend() )
if(!pClient->SendData())
{
if(pClient->HasStarted())
pClient->Stop();
break;
}
}
else if(retval == WSA_WAIT_EVENT_0 + 2)
break;
else if(retval == WSA_WAIT_EVENT_0 + 3)
// 消息循環
::PeekMessageLoop();
else
ASSERT(FALSE);
}
TRACE0("---------------> 退出工作線程 <---------------\n");
return 0;
}
// 處理網絡消息
BOOL CSocketClient::ProcessNetworkEvent()
{
::WSAResetEvent(m_evSocket);
WSANETWORKEVENTS events;
int rc = ::WSAEnumNetworkEvents(m_soClient, m_evSocket, &events);
if(rc == SOCKET_ERROR)
{
int code = ::WSAGetLastError();
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), code);
FireError(m_dwConnID, SO_UNKNOWN, code);
return FALSE;
}
/* 可讀取 */
if(events.lNetworkEvents & FD_READ)
{
int iCode = events.iErrorCode[FD_READ_BIT];
if(iCode == 0)
// 讀取數據(內部調用 FireReceive() )
return ReadData();
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_RECEIVE, iCode);
return FALSE;
}
}
/* 可發送 */
if(events.lNetworkEvents & FD_WRITE)
{
int iCode = events.iErrorCode[FD_WRITE_BIT];
if(iCode == 0)
// 發送數據(內部調用 FireSend() )
return SendData();
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_SEND, iCode);
return FALSE;
}
}
/* socket 已關閉 */
if(events.lNetworkEvents & FD_CLOSE)
{
int iCode = events.iErrorCode[FD_CLOSE_BIT];
if(iCode == 0)
FireClose(m_dwConnID);
else
{
SetLastError(ISC_NETWORK_ERROR, _T(__FUNCTION__), iCode);
FireError(m_dwConnID, SO_UNKNOWN, iCode);
}
return FALSE;
}
return TRUE;
}
從上面的代碼可以看出:通信過程中,組件的使用者不需要對通信過程進行任何干預,整個底層通信過程對使用者來說是透明的,使用只需集中精力處理好幾個組件通知。下面來看看組件的一個使用示例:
/* 組件使用者:實現 IClientSocketListener */
class CMainClient : public IClientSocketListener
{
// 這些方法會操作組件
public:
bool Login(LPCTSTR pszAddress, USHORT usPort, const T_101_Data* pData);
bool Logout(const T_201_Data* pData);
BOOL SendData(EnCommandType enCmdType, const TCommandData* pCmdData, WORD wCmdDataLen);
long GetLastError();
LPCTSTR GetLastErrorDesc();
// 實現 IClientSocketListener
public:
virtual EnHandleResult OnConnect(DWORD dwConnectionID);
virtual EnHandleResult OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength);
virtual EnHandleResult OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLen);
virtual EnHandleResult OnClose(DWORD dwConnectionID);
virtual EnHandleResult OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode);
private:
BOOL ParseReceiveBuffer();
// 其它方法 。。。
// 構造函數
public:
CMainClient()
// 創建組件,並把自己設置為組件的監聽器
: m_pscClient(new CSocketClient(this))
, m_dwConnID(0)
{
}
virtual ~CMainClient() {}
private:
// 組件屬性
ISocketClientPtr m_pscClient;
DWORD m_dwConnID;
// 其它屬性 。。。
};
BOOL CMainClient::Login(LPCTSTR pszAddress, USHORT usPort, const T_101_Data* pData)
{
// 啟動通信
return m_pscClient->Start(pszAddress, usPort) &&
SendData(CS_C_LOGIN_REQ, pData, sizeof(T_101_Data));
}
BOOL CMainClient::Logout(const T_201_Data* pData)
{
if(pData)
{
SendData(CS_C_SET_STATUS, pData, sizeof(T_201_Data));
::WaitWithMessageLoop(LOGOUT_WAIT_TIME);
}
// 停止通信
return m_pscClient->Stop();
}
BOOL CMainClient::SendData(EnCommandType enCmdType, const TCommandData* pCmdData, WORD wCmdDataLen)
{
const WORD wBufferLen = CMD_ADDITIVE_SIZE + wCmdDataLen;
CPrivateHeapByteBuffer buffer(m_hpPrivate, wBufferLen);
BYTE* pBuffer = buffer;
memcpy(pBuffer, &wBufferLen, CMD_LEN_SIZE);
pBuffer += CMD_LEN_SIZE;
memcpy(pBuffer, &enCmdType, CMD_TYPE_SIZE);
pBuffer += CMD_TYPE_SIZE;
memcpy(pBuffer, pCmdData, wCmdDataLen);
pBuffer += wCmdDataLen;
memcpy(pBuffer, &CMD_FLAG, CMD_FLAG_SIZE);
// 發送數據
return m_pscClient->Send(m_dwConnID, buffer, wBufferLen);
}
long CMainClient::GetLastError()
{
// 獲取通信錯誤碼
return m_pscClient->GetLastError();
}
LPCTSTR CMainClient::GetLastErrorDesc()
{
// 獲取通信錯誤描述
return m_pscClient->GetLastErrorDesc();
}
/* 處理連接成功事件 */
ISocketListener::EnHandleResult CMainClient::OnConnect(DWORD dwConnectionID)
{
TRACE1("<CNNID: %d> 已連接\n", dwConnectionID);
m_dwConnID = dwConnectionID;
return HR_OK;
}
/* 處理數據已發出事件 */
ISocketListener::EnHandleResult CMainClient::OnSend(DWORD dwConnectionID, const BYTE* pData, int iLength)
{
TRACE2("<CNNID: %d> 發出數據包 (%d bytes)\n", dwConnectionID, iLength);
return HR_OK;
}
/* 處理接收到數據事件*/
ISocketListener::EnHandleResult CMainClient::OnReceive(DWORD dwConnectionID, const BYTE* pData, int iLen)
{
TRACE2("<CNNID: %d> 接收數據包 (%d bytes)\n", dwConnectionID, iLen);
ASSERT(pData != NULL && iLen > 0);
// 保存數據
m_rcBuffer.Cat(pData, iLen);
// 解析數據
return ParseReceiveBuffer() ? HR_OK : HR_ERROR;;
}
/* 處理通信關閉事件*/
ISocketListener::EnHandleResult CMainClient::OnClose(DWORD dwConnectionID)
{
TRACE1("CNNID: %d> 關閉連接\n", dwConnectionID);
// 清理緩沖區
m_rcBuffer.Realloc(0);
return HR_OK;
}
/* 處理通信錯誤事件 */
ISocketListener::EnHandleResult CMainClient::OnError(DWORD dwConnectionID, EnSocketOperation enOperation, int iErrorCode)
{
TRACE3("<CNNID: %d> 網絡錯誤 (OP: %d, CO: %d)\n", dwConnectionID, enOperation, iErrorCode);
// 清理緩沖區
m_rcBuffer.Realloc(0);
return HR_OK;
}
好了,碼了一個晚上的字,累啊!到此為止吧,感謝收看~
敬請繼續收看《基於 IOCP 的通用異步 Windows Socket TCP 高性能服務端組件的設計與實現》,晚安 ^_^