自己開發了一個股票智能分析軟件,功能很強大,需要的點擊下面的鏈接獲取:
https://www.cnblogs.com/bclshuai/p/11380657.html
Socket網絡通訊
一. socket網絡通訊之select
socket服務器端連接多個客戶端,通過select函數去遍歷每個連接,獲取客戶端發過來的數據信息。
1.select的作用
將多個套接字放入數組中,檢查數組中的套接字是否有信號,連接請求,讀寫請求,一旦有立即返回有信號套接字的數量,超時返回0,錯誤返回負值。
詳細介紹本函數用於確定一個或多個套接口的狀態。對每一個套接口,調用者可查詢它的可讀性、可寫性及錯誤狀態信息。用fd_set結構來表示一組等待檢查的套接口。在調用返回時,這個結構存有套接口數組集合,並且select()返回有信號的套接口的數目。
2.Select函數解析
2.1select函數參數
int select ( int nfds,fd_set *readfds, *writefds, *exceptfds,timeval *timeout )
ndfs:select監視的文件句柄數,視進程中打開的文件數而定,一般設為呢要監視各文件 中的最大文件號加一。Window下這個參數無用,liniux下這個參數有用。
readfds:select監視的可讀文件句柄集合。
writefds: select監視的可寫文件句柄集合。
exceptfds:select監視的異常文件句柄集合。
readfds參數標識等待可讀性檢查的套接口。如果該套接口正處於監聽listen()狀態,則若有連接請求到達,該套接口便被標識為可讀,這樣一個accept()調用保證可以無阻塞完成。對其他套接口而言,可讀性意味着有排隊數據供讀取。或者對於SOCK_STREAM類型套接口來說,相對於該套接口的虛套接口已關閉,於是recv()或recvfrom()操作均能無阻塞完成,返回值為負值,可以通過這個來判斷sokect已經斷開連接。writefds參數標識等待可寫性檢查的套接口。
typedef struct fd_set {
u_int fd_count; /* how many are SET? */
SOCKET fd_array[FD_SETSIZE]; /* an array of SOCKETs */
} fd_set;
這三個參數傳入的是一個socket的結構體指針,結構體是一個存socket的數組加上一個存socket數量的變量;SETSIZE等於64,是個宏定義,可修改。
timeout:本次select()的超時結束時間。(見/usr/sys/select.h,可精確至百萬分之一秒!)傳入NULL,阻塞模式,傳入時間結構體對象,則超時select返回0,timeout兩個值都為0時,則非阻塞模式,立即返回。
2.2 select函數返回值
負值:select錯誤
正值:某些文件可讀寫或出錯
0:等待超時,沒有可讀寫或錯誤的文件
2.3宏定義操作
FD_ZERO(fd_set *fdset):FD_ZERO 是將fd_set結構體中的count置為0
FD_SET(int fd, fd_set *fdset):是將套接字插入數組,並將count 加1
FD_CLR(int fd, fd_set *fdset):將套接字從數組中刪除,將后續套接字前移,將count減1.
FD_ISSET(int fd, fdset *fdset):檢查fdset聯系的文件句柄fd是否在set數組中,可讀寫,>0表示可讀寫。
2.4調用流程
(1)定義fd_set變量,調用FD_ZERO宏將fd_set清空,實際上只是將count置0.
(2)調用FD_SET將關注的套接字插入到結構體數組中
(3)使用 select函數查詢結構體的數組中的套接字是否有信號,
(4)調用FD_ISSET判斷套接字是否在有信號的列表中,底層維護的列表。
(5)在列表中則調用讀寫函數進行操作。
3.例程
3.1select服務器
// MulitSocket.cpp : 定義控制台應用程序的入口點。
//
#include "stdafx.h"
#include "winsock2.h"
#pragma comment(lib,"ws2_32.lib")
#include<iostream>
using namespace std;
#define PORT 5050
#define BUFFSIZE 1024
SOCKET g_socketclientArray[FD_SETSIZE];
int g_num=0;
bool g_bRun=true;
char sendbuff[1024]="what's up ? dute !";
SOCKET g_socketserver;
DWORD WINAPI AcceptWork(LPVOID pPama)
{
if (g_socketserver==INVALID_SOCKET)
{
cout<<"socket server is invalid, thread is stoped"<<endl;
return-1;
}
fd_set fdset;
timeval outtime;
outtime.tv_sec=5;
outtime.tv_usec=0;
while (g_bRun)
{
FD_ZERO(&fdset);//清空socket數組
FD_SET(g_socketserver,&fdset);//將g_socketserver保存到socket數組中
int iRet=-1;
//(最大socket值linux有效,socket數組讀,socket數組寫,socket數組錯誤,超時時間)
iRet=::select(g_socketserver+1,&fdset,NULL,NULL,&outtime);//查詢數組中套接字是否有信號,返回有信號的socket數目
if (iRet==0)//超時
{
cout<<"accept work wait outtime "<<endl;
continue;
}
else if (iRet<0)//錯誤
{
int iRetErr = WSAGetLastError();
cout<<"select error code"<<iRetErr<<endl;
continue;
}
if (FD_ISSET(g_socketserver,&fdset)<=0)///判斷是否准備好
{
cout<<"FD_ISSET return value is <=0"<<endl;
continue;
}
sockaddr addr;
int addresslen=sizeof(sockaddr);
SOCKET sok=accept(g_socketserver,&addr,&addresslen);//接受連接
if (sok>0)
{
if (g_num>=FD_SETSIZE)
{
closesocket(sok);
cout<<"client number exceed the range,can't accept client connect anymore ! "<<endl;
continue;
}
g_socketclientArray[g_num]=sok;
g_num++;
cout<<"one client connected !"<<endl;
}
else
{
cout<<"accept socket return value <=0"<<endl;
}
}
return 0;
}
DWORD WINAPI RecvWork(LPVOID pPama)
{
timeval timeout;
timeout.tv_sec=0;
timeout.tv_usec=5;
char recvbuff[BUFFSIZE];
while (g_bRun)
{
if (g_num==0)
{
Sleep(1000);
continue;
}
SOCKET maxsock=0;
for (int i =0;i<g_num;i++)
{
fd_set fdset;
FD_ZERO(&fdset);
FD_SET(g_socketclientArray[i],&fdset);
int iRet=-1;
if ((iRet=select(maxsock,&fdset,NULL,NULL,&timeout))==0)
{
//cout<<"select wait time out"<<endl;
continue;
}
else if (iRet<0)
{
cout<<"select return value is below zero"<<endl;
Sleep(2000);
//continue;
}
if (FD_ISSET(g_socketclientArray[i],&fdset)<=0)
{
continue;
}
memset(recvbuff,0,BUFFSIZE);
if (recv(g_socketclientArray[i],recvbuff,BUFFSIZE,0)>0)
{
cout<<"recvfrom client"<<i+1<<":"<<recvbuff<<endl;
send(g_socketclientArray[i],sendbuff,1024,0);
cout<<"send to client"<<i+1<<":"<<sendbuff<<endl;
}
else
{
closesocket(g_socketclientArray[i]);
int j=i;
for (;(j+1)<g_num;j++)
{
g_socketclientArray[j]=g_socketclientArray[j+1];
}
g_socketclientArray[j+1]=INVALID_SOCKET;
g_num--;
cout<<"one client has lost connect ,left client num is "<<g_num<<endl;
}
}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
WSADATA wsadata;
if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)
{
cout<<"WSAStartup failed"<<endl;
Sleep(2000);
return -1;
}
g_socketserver=::socket(AF_INET,SOCK_STREAM,0);
if (g_socketserver==INVALID_SOCKET)
{
cout<<"create socket failed"<<endl;
Sleep(2000);
return -1;
}
SOCKADDR_IN serverAddressIn;
serverAddressIn.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
serverAddressIn.sin_family=AF_INET;
serverAddressIn.sin_port=htons(PORT);
if (::bind(g_socketserver,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN))!=0)
{
cout<<"bind address to m_socketserver failed"<<endl;
Sleep(2000);
return -1;
}
if (::listen(g_socketserver,5)!=0)
{
cout<<"listen socket failed"<<endl;
Sleep(2000);
return -1;
}
HANDLE m_socketServerAccept=NULL;
m_socketServerAccept=CreateThread(NULL,0,AcceptWork,NULL,0,NULL);
if (m_socketServerAccept==NULL)
{
cout<<"create thread failed "<<endl;
Sleep(2000);
return -1;
}
HANDLE m_threadRev=NULL;
m_threadRev=CreateThread(NULL,0,RecvWork,NULL,0,NULL);
if(m_threadRev==NULL)
{
cout<<"create thread is failed "<<endl;
Sleep(3000);
return -1;
}
system("PAUSE");
cout<<"stop program now"<<endl;
Sleep(3000);
g_bRun=false;
return 0;
}
3.2客戶端
// SocketClient.cpp : 定義控制台應用程序的入口點。
//
#include "stdafx.h"
#include "winsock2.h"
#pragma comment(lib,"ws2_32.lib")
#include<iostream>
using namespace std;
#define PORT 5050
bool g_bRun=true;
SOCKET g_socketClient;
char sendbuff[1024]="hello beautiful girl !";
char recvBuff[1024];
DWORD WINAPI SendWork(LPVOID pPama)
{
timeval timeout;
timeout.tv_sec=5;
timeout.tv_usec=0;
while(g_bRun)
{
if (send(g_socketClient,sendbuff,1024,0)<=0)
{
cout<<"send data error"<<endl;
}
cout<<"send: "<<sendbuff<<endl;
fd_set fdset_r;
FD_ZERO(&fdset_r);
FD_SET(g_socketClient,&fdset_r);
int iRval=0;
if ((iRval=select(g_socketClient+1,&fdset_r,NULL,NULL,&timeout))==0)//查看g_socketClient是否有可讀信號
{
continue;
}
else if (iRval<0)//返回錯誤
{
cout<<"socket connect is broken,thread is stoped "<<endl;
g_bRun=false;
break;
}
else
{
if (FD_ISSET(g_socketClient,&fdset_r)<=0)//判斷是否准備好接受數據
{
continue;
}
else
{
memset(recvBuff,0,1024);
recv(g_socketClient,recvBuff,1024,0);
cout<<"recv: "<<recvBuff<<endl;
}
}
Sleep(5000);
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
WSADATA wsadata;
if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)//初始化函數庫
{
cout<<"WSAStartup failed"<<endl;
Sleep(3000);
return -1;
}
g_socketClient=::socket(AF_INET,SOCK_STREAM,0);//創建套接字
if (g_socketClient==INVALID_SOCKET)
{
cout<<"create socket failed"<<endl;
Sleep(3000);
return -1;
}
SOCKADDR_IN serverAddressIn;
serverAddressIn.sin_addr.S_un.S_addr=inet_addr("10.20.48.12");//輸入服務器地址和端口
serverAddressIn.sin_family=AF_INET;
serverAddressIn.sin_port=htons(PORT);//和端口
int reval=0;
reval=connect(g_socketClient,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN));//連接服務器
if (reval==SOCKET_ERROR)
{
cout<<"connect socket failed"<<endl;
Sleep(3000);
return -1;
}
HANDLE m_recvThread=NULL;
m_recvThread=CreateThread(NULL,0,SendWork,NULL,0,NULL);
system("PAUSE");
cout<<"stop program now"<<endl;
Sleep(3000);
g_bRun=false;
return 0;
}
二 socket網絡通訊之完成端口
- 1. 完成端口的作用
使用單線程處理服務器的收發消息,會發生阻塞,使界面卡頓。使用一個線程處理一個客戶端的通訊,客戶端數量多時,線程間的切換,會使cpu不堪重負。完成端口的作用就是解決一個線程對應一個客戶端的情況,采用幾個線程處理所有的客戶端通訊。
2.完成端口的原理
事先開好幾個線程,你有幾個CPU我就開幾個,首先是避免了線程的上下文切換,因為線程想要執行的時候,總有CPU資源可用,然后讓這幾個線程等着,等到有用戶請求來到的時候,就把這些請求都加入到一個公共消息隊列中去,然后這幾個開好的線程就排隊逐一去從消息隊列中取出消息並加以處理,這種方式就很優雅的實現了異步通信和負載均衡的問題,因為它提供了一種機制來使用幾個線程“公平的”處理來自於多個客戶端的輸入/輸出,並且線程如果沒事干的時候也會被系統掛起,不會占用CPU周期,這個關鍵的作為交換的消息隊列,就是完成端口。
3.完成端口的使用步驟
使用完成端口只用遵循如下幾個步驟:
(1) 調用 CreateIoCompletionPort() 函數創建一個完成端口,把它的句柄保存好。
(2) 根據系統中有多少個處理器,就建立多少個工作者(為了醒目起見,下面直接說Worker)線程,這幾個線程是專門用來和客戶端進行通信的。
(3) 下面就是接收連入的Socket連接了,這里有兩種實現方式:一是和別的編程模型一樣,還需要啟動一個獨立的線程,專門用來accept客戶端的連接請求;二是用性能更高更好的異步AcceptEx()請求。
(4) 每當有客戶端連入的時候,我們還是得調用CreateIoCompletionPort()函數,這里卻不是新建立完成端口了,而是把新連入的Socket(也就是前面所謂的設備句柄),與目前的完成端口綁定在一起。
(5) 例如,客戶端連入之后,我們可以在這個Socket上提交一個網絡請求,例如WSARecv(),然后系統就會幫咱們乖乖的去執行接收數據的操作,我們大可以放心的去干別的事情了;
(6) 而此時,我們預先准備的那幾個Worker線程就不能閑着了, 我們在前面建立的幾個Worker就要忙活起來了,都需要分別調用GetQueuedCompletionStatus() 函數在掃描完成端口的隊列里是否有網絡通信的請求存在(例如讀取數據,發送數據等),一旦有的話,就將這個請求從完成端口的隊列中取回來,繼續執行本線程中后面的處理代碼,處理完畢之后,我們再繼續投遞下一個網絡通信的請求就OK了,如此循環。
4. acceptEx相對accept的優點
(1) 這個好處是最關鍵的,是因為AcceptEx是在客戶端連入之前,就把客戶端的Socket建立好了,也就是說,AcceptEx是先建立的Socket,然后才發出的AcceptEx調用,也就是說,在進行客戶端的通信之前,無論是否有客戶端連入,Socket都是提前建立好了;而不需要像accept是在客戶端連入了之后,再現場去花費時間建立Socket。
(2) 相比accept只能阻塞方式建立一個連入的入口,對於大量的並發客戶端來講,入口實在是有點擠;而AcceptEx可以同時在完成端口上投遞多個請求。
(3) AcceptEx還有一個非常體貼的優點,就是在投遞AcceptEx的時候,我們還可以順便在AcceptEx的同時,收取客戶端發來的第一組數據,這個是同時進行的,如果客戶端只是連入但是不發送數據的話,我們就不會收到這個AcceptEx完成的通知。
5.使用完成端口的步驟
(1)創建完成端口
HANDLE m_hIOCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0 );
返回值不為空則認為創建成功
(2)根據系統CPU個數創建工作線程
我們最好是建立CPU核心數量*2那么多的線程,這樣更可以充分利用CPU資源,
SYSTEM_INFO si;
GetSystemInfo(&si);
int m_nProcessors = si.dwNumberOfProcessors;//CPU個數
m_nThreads = 2 * m_nProcessors;
HANDLE* m_phWorkerThreads = new HANDLE[m_nThreads];
for (int i = 0; i < m_nThreads; i++)
{
m_phWorkerThreads[i] = ::CreateThread(0, 0, _WorkerThread, …);
}
(3)創建一個監聽的socket綁定到完成端口,監聽
// 初始化Socket庫
WSADATA wsaData;
WSAStartup(MAKEWORD(2,2), &wsaData);
//初始化Socket
struct sockaddr_in ServerAddress;
// 這里需要特別注意,如果要使用重疊I/O的話,這里必須要使用WSASocket來初始化Socket
// 注意里面有個WSA_FLAG_OVERLAPPED參數
SOCKET m_sockListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); //表示異步
// 填充地址結構信息
ZeroMemory((char *)&ServerAddress, sizeof(ServerAddress));
ServerAddress.sin_family = AF_INET;
// 這里可以選擇綁定任何一個可用的地址,或者是自己指定的一個IP地址
//ServerAddress.sin_addr.s_addr = htonl(INADDR_ANY);
ServerAddress.sin_addr.s_addr = inet_addr(“你的IP”);
ServerAddress.sin_port = htons(11111);
// 綁定端口
if (SOCKET_ERROR == bind(m_sockListen, (struct sockaddr *) &ServerAddress, sizeof(ServerAddress)))
// 開始監聽
listen(m_sockListen,SOMAXCONN)) //在winsock32.h中的宏定義Maximum queue length specifiable by listen.,
//調用創建完成端口的函數將監聽套接字綁定到完成端口
HANDLE WINAPI CreateIoCompletionPort(
__in HANDLE FileHandle, // 這里當然是連入的這個套接字句柄了
__in_opt HANDLE ExistingCompletionPort, // 這個就是前面創建的那個完成端口
__in ULONG_PTR CompletionKey, // 這個參數就是類似於線程參數一樣,在
// 綁定的時候把自己定義的結構體指針傳遞
// 這樣到了Worker線程中,也可以使用這個
// 結構體的數據了,相當於參數的傳遞
__in DWORD NumberOfConcurrentThreads // 這里同樣置0
);
(4)啟動監聽線程,開始接收客戶端連接
方法1:開通線程接收客戶端連接
DWORD WINAPI AcceptThread(LPVOID pVoid)
{
while(g_bRun)
{
SOCKET client=INVALID_SOCKET;
sockaddr sockadd;
int len =sizeof(sockaddr);
client=accept(g_socketserver,&sockadd,&len);//阻塞連接客戶端
if (client==INVALID_SOCKET)
{
cout<<"accept socket failed !"<<endl;
continue;
}
else
{
cout<<"one client connected ip is "<<sockadd.sa_data;
}
IOPORTCINENT* pIoClient=new IOPORTCINENT;//創建客戶端變量保存信息
if (pIoClient==NULL)
{
cout<<"IOPORTCINENT malloc failed "<<endl;
continue;
}
pIoClient->m_sock=client;
IODATA* pSendData=NULL;
IODATA* pRecvData=NULL;
pRecvData=pIoClient->GetNewIOData();
pSendData=pIoClient->GetNewIOData();
if (pRecvData==NULL||pRecvData==NULL)
{
cout<<"GetNewIOData failed !"<<endl;
delete(pIoClient);
continue;
}
pSendData->OperationType=SOCK_SEND;
pSendData->m_sock=pIoClient->m_sock;
pRecvData->OperationType=SOCK_RECV;
pRecvData->m_sock=pIoClient->m_sock;
CreateIoCompletionPort((HANDLE)client, g_IOPort,(ULONG_PTR) pIoClient, 0);//綁定socket到完成端口用pIoClient作為鍵值,
if (WSARecv(client,&(pRecvData->buff),1,&(pRecvData->NumberofByteRevc),&pRecvData->Flags,&pRecvData->overlap,NULL)==SOCKET_ERROR)
{//投遞接收請求
cout<<"WSARecv post recv request failed "<<client<<endl;
delete(pIoClient);
continue;
}
strcpy_s(pSendData->szMessage,"server to client data");
pSendData->NumberofByteRevc=21;
if (WSASend(client,&(pSendData->buff),1,&(pSendData->NumberofByteRevc),pSendData->Flags,&pSendData->overlap,NULL)==SOCKET_ERROR)
{//投遞發送請求
cout<<"WSASend post send request failed "<<client<<endl;
delete(pIoClient);
continue;
}
AddToClinetMap(pIoClient);
Sleep(1000);
}
return 0;
}
方法2:投遞Accept請求
這是為什么呢?因為我們在未取得函數指針的情況下就調用AcceptEx的開銷是很大的,因為AcceptEx 實際上是存在於Winsock2結構體系之外的(因為是微軟另外提供的),所以如果我們直接調用AcceptEx的話,首先我們的代碼就只能在微軟的平台上用了,沒有辦法在其他平台上調用到該平台提供的AcceptEx的版本(如果有的話), 而且更糟糕的是,我們每次調用AcceptEx時,Service Provider都得要通過WSAIoctl()獲取一次該函數指針,效率太低了,所以還不如我們自己直接在代碼中直接去這么獲取一下指針好了。微軟的實現是通過mswsock.dll中提供的,所以我們可以通過靜態鏈接mswsock.lib來使用AcceptEx。但是這是一個不推薦的方式,我們應該用WSAIoctl 配合SIO_GET_EXTENSION_FUNCTION_POINTER參數來獲取函數的指針,然后再調用AcceptEx。
// 使用AcceptEx函數,因為這個是屬於WinSock2規范之外的微軟另外提供的擴展函數
// 所以需要額外獲取一下函數的指針,
// 獲取AcceptEx函數指針
BOOL AcceptEx (
SOCKET sListenSocket,
SOCKET sAcceptSocket,
PVOID lpOutputBuffer,
DWORD dwReceiveDataLength,
DWORD dwLocalAddressLength,
DWORD dwRemoteAddressLength,
LPDWORD lpdwBytesReceived,
LPOVERLAPPED lpOverlapped
);
•
參數1--sListenSocket, 這個就是那個唯一的用來監聽的Socket了,沒什么說的;
•
參數2--sAcceptSocket, 用於接受連接的socket,這個就是那個需要我們事先建好的,等有客戶端連接進來直接把這個Socket拿給它用的那個,是AcceptEx高性能的關鍵所在。
•
參數3--lpOutputBuffer,接收緩沖區,這也是AcceptEx比較有特色的地方,既然AcceptEx不是普通的accpet函數,那么這個緩沖區也不是普通的緩沖區,這個緩沖區包含了三個信息:一是客戶端發來的第一組數據,二是server的地址,三是client地址,都是精華啊…但是讀取起來就會很麻煩,不過后面有一個更好的解決方案。
•
參數4--dwReceiveDataLength,前面那個參數lpOutputBuffer中用於存放數據的空間大小。如果此參數=0,則Accept時將不會待數據到來,而直接返回,如果此參數不為0,那么一定得等接收到數據了才會返回…… 所以通常當需要Accept接收數據時,就需要將該參數設成為:sizeof(lpOutputBuffer) - 2*(sizeof sockaddr_in +16),也就是說總長度減去兩個地址空間的長度就是了,看起來復雜,其實想明白了也沒啥……
•
參數5--dwLocalAddressLength,存放本地址地址信息的空間大小;
•
參數6--dwRemoteAddressLength,存放本遠端地址信息的空間大小;
•
參數7--lpdwBytesReceived,out參數,對我們來說沒用,不用管;
•
參數8--lpOverlapped,本次重疊I/O所要用到的重疊結構。
包含頭文件
#include <MSWSock.h>
定義函數指針
LPFN_ACCEPTEX m_lpfnAcceptEx; // AcceptEx 和GetAcceptExSockaddrs 的函數指針,用於調用這兩個擴展函數
LPFN_GETACCEPTEXSOCKADDRS m_lpfnGetAcceptExSockAddrs;
定義GUID變量
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS;
根據GUID獲取函數指針
DWORD dwBytes = 0;
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx,
sizeof(GuidAcceptEx),
&m_lpfnAcceptEx,
sizeof(m_lpfnAcceptEx),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage("WSAIoctl 未能獲取AcceptEx函數指針。錯誤代碼: %d\n", WSAGetLastError());
this->_DeInitialize();
return false;
}
// 獲取GetAcceptExSockAddrs函數指針,也是同理
if(SOCKET_ERROR == WSAIoctl(
m_pListenContext->m_Socket,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockAddrs,
sizeof(GuidGetAcceptExSockAddrs),
&m_lpfnGetAcceptExSockAddrs,
sizeof(m_lpfnGetAcceptExSockAddrs),
&dwBytes,
NULL,
NULL))
{
this->_ShowMessage("WSAIoctl 未能獲取GuidGetAcceptExSockAddrs函數指針。錯誤代碼: %d\n", WSAGetLastError());
this->_DeInitialize();
return false;
}
// 為AcceptEx 准備參數,然后投遞AcceptEx I/O請求
for( int i=0;i<MAX_POST_ACCEPT;i++ )
{
// 新建一個IO_CONTEXT
PER_IO_CONTEXT* pAcceptIoContext = m_pListenContext->GetNewIoContext();
if( false==this->_PostAccept( pAcceptIoContext ) )
{
m_pListenContext->RemoveContext(pAcceptIoContext);
return false;
}
}
this->_ShowMessage( _T("投遞%d 個AcceptEx請求完畢"),MAX_POST_ACCEPT );
return true;
6. GetQueuedCompletionStatus函數介紹
BOOL WINAPI GetQueuedCompletionStatus(
__in HANDLE CompletionPort, // 這個就是我們建立的那個唯一的完成端口
__out LPDWORD lpNumberOfBytes, //這個是操作完成后返回的字節數
__out PULONG_PTR lpCompletionKey, // 這個是我們建立完成端口的時候綁定的那個自定義結構體參數
__out LPOVERLAPPED *lpOverlapped, // 這個是我們在連入Socket的時候一起建立的那個重疊結構
__in DWORD dwMilliseconds // 等待完成端口的超時時間,如果線程不需要做其他的事情,那就INFINITE就行了
);
7.完成端口服務例程
// IOPortSocketServer.cpp : 定義控制台應用程序的入口點。
//
#include "stdafx.h"
#include "winsock2.h"
#pragma comment(lib,"ws2_32.lib")
#include<iostream>
#include <map>
using namespace std;
#define PORT 5050
#define MSGLEN 1024
bool g_bRun=true;
HANDLE g_IOPort=NULL;
SOCKET g_socketserver=INVALID_SOCKET;
CRITICAL_SECTION m_csContextList;
enum SocketType
{
SOCK_RECV=1,
SOCK_ACCEPT=2,
SOCK_SEND=3
};
char sendbuff[1024]="what's up ? dute !";
struct IODATA//用來接收完成端口數據的結構體
{
WSAOVERLAPPED overlap;//重疊結構體,完成端口內部用於標志客戶端請求的標志
WSABUF buff;//一個指針和一個長度變量
char szMessage[MSGLEN];//實際存儲數據的緩沖區
DWORD NumberofByteRevc;//緩沖區的長度
DWORD Flags;//標志位
int OperationType;//發送,接收,連接標識符
SOCKET m_sock;//連接客戶端的套接字
IODATA()
{
memset(&overlap, 0,sizeof(WSAOVERLAPPED));
NumberofByteRevc=MSGLEN;
buff.buf=szMessage;
buff.len=NumberofByteRevc;
memset(szMessage,0,MSGLEN);
Flags=0;
OperationType=0;
m_sock=INVALID_SOCKET;
}
};
struct IOPORTCINENT//每個客戶端連接,都會建立一個變量用於保存客戶端信息
{
SOCKET m_sock;//連接客戶端的套接字
map<IODATA*,IODATA*> m_mapIOData;//保存發送請求的結構體
IOPORTCINENT()
{
m_sock=INVALID_SOCKET;
}
~IOPORTCINENT()//析構函數
{
if (m_sock!=INVALID_SOCKET)
{
closesocket(m_sock);
m_sock=INVALID_SOCKET;
}
if (m_mapIOData.size()>0)
{
map<IODATA*,IODATA*>::iterator it=m_mapIOData.begin();
for(;it!=m_mapIOData.end();it++)
{
delete it->second;
}
m_mapIOData.clear();
}
}
IODATA*GetNewIOData()//客戶端連接時,申請數據結構保存完成端口返回的數據
{
IODATA*p=new IODATA;
if (p!=NULL)
{
m_mapIOData.insert(pair<IODATA*,IODATA*>(p,p));
return p;
}
else
{
return NULL;
}
}
bool DeleteIOData(IODATA* p)//釋放內存
{
map<IODATA*,IODATA*>::iterator it=m_mapIOData.find(p);
if (it!=m_mapIOData.end())
{
delete p;
m_mapIOData.erase(it);
return true;
}
else
{
return false;
}
}
};
map<SOCKET,IOPORTCINENT*> g_mapIoPortClient;//用於保存客戶端連接map
bool AddToClinetMap(IOPORTCINENT*p)//添加到map,方便后續釋放內存
{
EnterCriticalSection(&m_csContextList);
map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.find(p->m_sock);
if (it==g_mapIoPortClient.end())
{
g_mapIoPortClient.insert(pair<SOCKET,IOPORTCINENT*>(p->m_sock,p));
LeaveCriticalSection(&m_csContextList);
return true;
}
else
{
LeaveCriticalSection(&m_csContextList);
return false;
}
}
bool DeleteFromClientMap(SOCKET sock)//客戶端斷開連接后將map中對應的變量刪除
{
EnterCriticalSection(&m_csContextList);
map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.find(sock);
if (it!=g_mapIoPortClient.end())
{
closesocket(it->first);
delete it->second;
g_mapIoPortClient.erase(it);
LeaveCriticalSection(&m_csContextList);
return true;
}
else
{
LeaveCriticalSection(&m_csContextList);
return false;
}
}
void ClearClientMap()//服務器退出時,釋放所有客戶端的連接數據,並清空map
{
EnterCriticalSection(&m_csContextList);
map<SOCKET,IOPORTCINENT*>::iterator it=g_mapIoPortClient.begin();
if (g_mapIoPortClient.size()>0)
{
for (;it!=g_mapIoPortClient.end();it++)
{
closesocket(it->first);
delete it->second;
}
g_mapIoPortClient.clear();
}
LeaveCriticalSection(&m_csContextList);
}
//接收完成端口發送和接收數據的工作線程
DWORD WINAPI WorkThread(LPVOID pVoid)
{
while(g_bRun)
{
IODATA* pRecvData=NULL;
OVERLAPPED *pOverlapped = NULL;
DWORD dwBytesTransferred=0;
IOPORTCINENT* pIoClient=NULL;
/*從隊列中取出完成端口完成的操作(發送、接收、連接操作)
pOverlapped參數返回的是WSARecv中傳入的那個&pRecvData->overlap,因為overlapp是結構體IODATA的第一個
變量,所以overlapp的地址就是pRecvData的地址
*/
if (GetQueuedCompletionStatus(g_IOPort,&dwBytesTransferred,(PULONG_PTR)&pIoClient,&pOverlapped,INFINITE)==FALSE)
{
DWORD dwErr = GetLastError();
if (dwErr==WAIT_TIMEOUT)//這里是INFINITE是一直等待,所以不會出現超時情況
{
continue;
}
else//連接出現問題要刪除
{
if (DeleteFromClientMap(pIoClient->m_sock)==false)
{
cout<<"DeleteFromClientMap failed "<<pIoClient->m_sock<<endl;
}
}
cout<<"GetQueuedCompletionStatus return false "<<dwErr<<endl;
Sleep(3000);
continue;
}
else
{
if (dwBytesTransferred==0)//客戶端關閉
{
if (DeleteFromClientMap(pIoClient->m_sock)==false)
{
cout<<"DeleteFromClientMap failed "<<pIoClient->m_sock<<endl;
}
}
else
{
pRecvData=(IODATA*)pOverlapped;
//pRecvData=CONTAINING_RECORD(pOverlapped, IODATA, overlap);
/***************
主線程向完成端口發送接收數據請求,然后繼續執行其他事情,不會阻塞
當完成端口完成某個操作后,會向隊列里放入消息,工作線程中GetQueuedCompletionStatus
不斷的讀取隊列中的消息,通過pOverlapped這個指針的地址判斷是哪個客戶端發來的數據
#define CONTAINING_RECORD(address, type, field) ((type *)( \
(PCHAR)(address) - \
(ULONG_PTR)(&((type *)0)->field)))
這個宏定義的作用是通過結構體中某個變量的地址,計算這個變量的首地址
address是結構體中某個變量的地址,(ULONG_PTR)(&((type *)0)->field))這部分計算的是成員變量在結構體中的
偏移量,成員變量地址減去偏移量,得到結構體變量的首地址。如果成員變量是結構體的第一個變量,則
成員變量的地址就是結構體變量的地址,直接強制轉換即可。這個是很多人難以理解的地方
**************/
if (pRecvData->OperationType==SOCK_RECV)//接收請求完成
{
cout<<"recv:"<<pRecvData->m_sock<<"~"<<pRecvData->szMessage<<endl;
memset(pRecvData->szMessage,0,MSGLEN);//清空接收區,再次用相同的pRecvData發送接收數據請求
WSARecv(pRecvData->m_sock,&(pRecvData->buff),1,&pRecvData->NumberofByteRevc,&pRecvData->Flags,&pRecvData->overlap,NULL);//投遞接收請求
send(pRecvData->m_sock,sendbuff,18,0);//向客戶端發送數據,告知接收到數據
cout<<"send:"<<pRecvData->m_sock<<"~"<<sendbuff<<endl;
}
else if (pRecvData->OperationType==SOCK_SEND)//發送請求完成
{
cout<<"WSASend success"<<endl;
cout<<"WSASend:"<<pRecvData->szMessage<<endl;
pRecvData->NumberofByteRevc=21;
//向完成端口投遞發送請求
WSASend(pRecvData->m_sock,&(pRecvData->buff),1,&pRecvData->NumberofByteRevc,pRecvData->Flags,&pRecvData->overlap,NULL);//投遞發送請求
Sleep(3000);
}
}
}
}
return 0;
}
DWORD WINAPI AcceptThread(LPVOID pVoid)
{
while(g_bRun)
{
SOCKET client=INVALID_SOCKET;
sockaddr sockadd;
int len =sizeof(sockaddr);
client=accept(g_socketserver,&sockadd,&len);//阻塞連接客戶端
if (client==INVALID_SOCKET)
{
cout<<"accept socket failed !"<<endl;
continue;
}
else
{
cout<<"one client connected ip is "<<sockadd.sa_data;
}
IOPORTCINENT* pIoClient=new IOPORTCINENT;//創建客戶端變量保存信息
if (pIoClient==NULL)
{
cout<<"IOPORTCINENT malloc failed "<<endl;
continue;
}
pIoClient->m_sock=client;
IODATA* pSendData=NULL;
IODATA* pRecvData=NULL;
pRecvData=pIoClient->GetNewIOData();
pSendData=pIoClient->GetNewIOData();
if (pRecvData==NULL||pRecvData==NULL)
{
cout<<"GetNewIOData failed !"<<endl;
delete(pIoClient);
continue;
}
pSendData->OperationType=SOCK_SEND;
pSendData->m_sock=pIoClient->m_sock;
pRecvData->OperationType=SOCK_RECV;
pRecvData->m_sock=pIoClient->m_sock;
CreateIoCompletionPort((HANDLE)client, g_IOPort,(ULONG_PTR) pIoClient, 0);//綁定socket到完成端口用pIoClient作為鍵值,
if (WSARecv(client,&(pRecvData->buff),1,&(pRecvData->NumberofByteRevc),&pRecvData->Flags,&pRecvData->overlap,NULL)==SOCKET_ERROR)
{//投遞接收請求
cout<<"WSARecv post recv request failed "<<client<<endl;
delete(pIoClient);
continue;
}
strcpy_s(pSendData->szMessage,"server to client data");
pSendData->NumberofByteRevc=21;
if (WSASend(client,&(pSendData->buff),1,&(pSendData->NumberofByteRevc),pSendData->Flags,&pSendData->overlap,NULL)==SOCKET_ERROR)
{//投遞發送請求
cout<<"WSASend post send request failed "<<client<<endl;
delete(pIoClient);
continue;
}
AddToClinetMap(pIoClient);
Sleep(1000);
}
return 0;
}
void finit()
{
g_bRun=false;
WSACleanup();
closesocket(g_socketserver);
ClearClientMap();
}
int _tmain(int argc, _TCHAR* argv[])
{
/*IODATA data;
cout<<"data's address is :"<<&data<<endl;
cout<<"data's member WSABUF's address: "<<&data.buff<<endl;
unsigned long pianyi=(unsigned long)(&(((IODATA*)0)->buff));
cout<<"offset is :"<<pianyi<<endl;
IODATA* p=(IODATA*)((char*)(&data.buff)-pianyi);
cout<<"data address is :"<<p<<endl;
system("PAUSE");*/
//初始化套接字庫
WSADATA wsadata;
if (WSAStartup(MAKEWORD(2,2),&wsadata)!=0)
{
cout<<"WSAStartup failed"<<endl;
Sleep(2000);
return -1;
}
InitializeCriticalSection(&m_csContextList);
//創建完成端口
g_IOPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
if (g_IOPort==NULL)
{
cout<<"CreateIoCompletionPort failed"<<endl;
Sleep(2000);
finit();
return -1;
}
//創建監聽套接字
g_socketserver=::WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
if (g_socketserver==INVALID_SOCKET)
{
cout<<"create socket failed"<<endl;
Sleep(2000);
finit();
return -1;
}
//綁定地址到套接字
SOCKADDR_IN serverAddressIn;
serverAddressIn.sin_addr.S_un.S_addr=htonl(INADDR_ANY);
serverAddressIn.sin_family=AF_INET;
serverAddressIn.sin_port=htons(PORT);
if (::bind(g_socketserver,(sockaddr*)&serverAddressIn,sizeof(SOCKADDR_IN))!=0)
{
cout<<"bind address to m_socketserver failed"<<endl;
Sleep(2000);
return -1;
}
//監聽套接字
if (::listen(g_socketserver,SOMAXCONN)!=0)
{
cout<<"listen socket failed"<<endl;
Sleep(2000);
return -1;
}
//獲取CPU個數
SYSTEM_INFO systeminfo;
GetSystemInfo(&systeminfo);
//創建工作線程
for (int i = 0; i < systeminfo.dwNumberOfProcessors*2; i++)
{
HANDLE hthread=CreateThread(NULL, 0, WorkThread, g_IOPort, 0, NULL);
CloseHandle(hthread);//關閉句柄,線程仍然運行
}
//創建接收客戶端連接的線程
HANDLE hthread=CreateThread(NULL, 0, AcceptThread, g_IOPort, 0, NULL);
CloseHandle(hthread);
cout<<"server is started, waiting for client to connect!"<<endl;
system("PAUSE");
g_bRun=false;//讓線程自動退出
ClearClientMap();//清除客戶端信息map
DeleteCriticalSection(&m_csContextList);
cout<<"stop program now"<<endl;
Sleep(3000);
return 0;
}
參考文獻
http://www.cnblogs.com/c1230v/archive/2012/11/25/2788280.html