規划下將要出爐的IOCP。
1.將接收IO數據改成內存池。
2.擴展lpCompletionKey: DWORD參數.擴展套接字對象。
3.借鑒java netty思路,使用decode –> handler的思路來處理客戶端數據。
//內存池
unit uMemPool; interface uses JwaWinsock2, Windows, SyncObjs; const MAX_BUFFER_SIZE = 1024; type LPPER_IO_OPERATION_DATA = ^PER_IO_OPERATION_DATA; PER_IO_OPERATION_DATA = packed record Overlapped: OVERLAPPED; IO_TYPE: Cardinal; DataBuf: TWSABUF; WorkBytes: Cardinal; //如果是接收,接收的字節數 WorkFlag: Cardinal; pre:LPPER_IO_OPERATION_DATA; next:LPPER_IO_OPERATION_DATA; end; TIODataMemPool = class(TObject) private FCs: TCriticalSection; //第一個可用的內存塊 FHead: LPPER_IO_OPERATION_DATA; //最后一個可用的內存卡 FTail: LPPER_IO_OPERATION_DATA; //可用的內存個數 FUseableCount:Integer; //正在使用的個數 FUsingCount:Integer; /// <summary> /// 將一個內存塊添加到尾部 /// </summary> /// <param name="pvIOData"> (LPPER_IO_OPERATION_DATA) </param> procedure AddData2Pool(pvIOData:LPPER_IO_OPERATION_DATA); /// <summary> /// 得到一塊可以使用的內存 /// </summary> /// <returns> LPPER_IO_OPERATION_DATA /// </returns> function getUsableData: LPPER_IO_OPERATION_DATA; /// <summary> /// 創建一塊內存空間 /// </summary> /// <returns> LPPER_IO_OPERATION_DATA /// </returns> function InnerCreateIOData: LPPER_IO_OPERATION_DATA; procedure clearMemBlock(pvIOData:LPPER_IO_OPERATION_DATA); public class function instance: TIODataMemPool; constructor Create; destructor Destroy; override; //借一塊內存 function borrowIOData: LPPER_IO_OPERATION_DATA; //換會一塊內存 procedure giveBackIOData(const pvIOData: LPPER_IO_OPERATION_DATA); end; implementation var __IODATA_instance:TIODataMemPool; constructor TIODataMemPool.Create; begin inherited Create; FCs := TCriticalSection.Create(); FUseableCount := 0; FUsingCount := 0; end; destructor TIODataMemPool.Destroy; begin FCs.Free; inherited Destroy; end; { TIODataMemPool } procedure TIODataMemPool.AddData2Pool(pvIOData:LPPER_IO_OPERATION_DATA); begin if FHead = nil then begin FHead := pvIOData; FHead.next := nil; FHead.pre := nil; FTail := pvIOData; end else begin FTail.next := pvIOData; pvIOData.pre := FTail; FTail := pvIOData; end; Inc(FUseableCount); end; function TIODataMemPool.InnerCreateIOData: LPPER_IO_OPERATION_DATA; begin Result := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA))); GetMem(Result.DataBuf.buf, MAX_BUFFER_SIZE); Result.DataBuf.len := MAX_BUFFER_SIZE; //清理一塊內存 clearMemBlock(Result); end; function TIODataMemPool.borrowIOData: LPPER_IO_OPERATION_DATA; begin FCs.Enter; try Result := getUsableData; if Result = nil then begin //生產一個內存塊 Result := InnerCreateIOData; //直接借走<增加使用計數器> Inc(FUsingCount); end; finally FCs.Leave; end; end; procedure TIODataMemPool.clearMemBlock(pvIOData: LPPER_IO_OPERATION_DATA); begin //清理一塊內存 pvIOData.IO_TYPE := 0; pvIOData.WorkBytes := 0; pvIOData.WorkFlag := 0; ZeroMemory(@pvIOData.Overlapped, sizeof(OVERLAPPED)); ZeroMemory(pvIOData.DataBuf.buf, pvIOData.DataBuf.len); end; procedure TIODataMemPool.giveBackIOData(const pvIOData: LPPER_IO_OPERATION_DATA); begin FCs.Enter; try //清理內存塊 clearMemBlock(pvIOData); //加入到可以使用的內存空間 AddData2Pool(pvIOData); //減少使用計數器 Dec(FUsingCount); finally FCs.Leave; end; end; function TIODataMemPool.getUsableData: LPPER_IO_OPERATION_DATA; var lvPre:LPPER_IO_OPERATION_DATA; begin if FTail = nil then begin Result := nil; end else begin Result := FTail; lvPre := FTail.pre; if lvPre <> nil then begin lvPre.next := nil; FTail := lvPre; end else //FTail是第一個也是最后一個,只有一個 begin FHead := nil; FTail := nil; end; Result.next := nil; Result.pre := nil; Dec(FUseableCount); Inc(FUsingCount); end; end; class function TIODataMemPool.instance: TIODataMemPool; begin Result := __IODATA_instance; end; initialization __IODATA_instance := TIODataMemPool.Create; finalization if __IODATA_instance <> nil then begin __IODATA_instance.Free; __IODATA_instance := nil; end; end.
//擴展的套接字對象
unit uClientContext; interface uses Windows, JwaWinsock2, uBuffer, uBufferBuilder; type TClientContext = class(TObject) private FSocket: TSocket; FBuffers: TBufferLink; public procedure CloseClientSocket; constructor Create(ASocket: TSocket); procedure AppendBuffer(const buf:WSABUF); overload; function AppendBuffer(buf:PAnsiChar; len:Cardinal): Cardinal; overload; function readBuffer(buf:PAnsiChar; len:Cardinal): Cardinal; destructor Destroy; override; property Socket: TSocket read FSocket; end; implementation procedure TClientContext.AppendBuffer(const buf:WSABUF); begin FBuffers.AddBuffer(buf.buf, buf.len); end; procedure TClientContext.CloseClientSocket; begin if FSocket <> INVALID_SOCKET then begin closesocket(FSocket); end; end; constructor TClientContext.Create(ASocket: TSocket); begin inherited Create; FSocket := ASocket; FBuffers := TBufferLink.Create(); end; destructor TClientContext.Destroy; begin FBuffers.Free; FBuffers := nil; CloseClientSocket; inherited Destroy; end; function TClientContext.AppendBuffer(buf:PAnsiChar; len:Cardinal): Cardinal; begin FBuffers.AddBuffer(buf, len); end; function TClientContext.readBuffer(buf:PAnsiChar; len:Cardinal): Cardinal; begin Result := FBuffers.readBuffer(buf, len); end; end.
//修改后的代碼工作線程和listener
unit uD10_IOCP; interface uses JwaWinsock2, Windows, SysUtils, uMemPool; const DATA_BUFSIZE = 1024; IO_TYPE_Accept = 1; IO_TYPE_Recv = 2; type //(1):單IO數據結構 PWorkData = ^TWorkerData; TWorkerData = packed record IOCPHandle:THandle; WorkerID:Cardinal; end; function D10_IOCPRun(pvData:Pointer): Integer; stdcall; implementation uses logClientWrapper, uClientContext; function ServerWorkerThread(pData:Pointer): Integer; stdcall; var CompletionPort:THANDLE; lvWorkID:Cardinal; BytesTransferred:Cardinal; PerIoData:LPPER_IO_OPERATION_DATA; Flags:Cardinal; RecvBytes:Cardinal; lvResultStatus:BOOL; lvRet:Integer; lvClientContext:TClientContext; begin CompletionPort:=PWorkData(pData).IOCPHandle; lvWorkID := PWorkData(pData).WorkerID; //得到創建線程是傳遞過來的IOCP while(TRUE) do begin //工作者線程會停止到GetQueuedCompletionStatus函數處,直到接受到數據為止 lvResultStatus := GetQueuedCompletionStatus(CompletionPort, BytesTransferred, Cardinal(lvClientContext), POverlapped(PerIoData), INFINITE); if (lvResultStatus = False) then begin //當客戶端連接斷開或者客戶端調用closesocket函數的時候,函數GetQueuedCompletionStatus會返回錯誤。如果我們加入心跳后,在這里就可以來判斷套接字是否依然在連接。 if lvClientContext<>nil then begin lvClientContext.Free; lvClientContext := nil; end; if PerIoData<>nil then begin TIODataMemPool.instance.giveBackIOData(PerIoData); end; continue; end; if PerIoData = nil then begin lvClientContext.Free; lvClientContext := nil; Break; end else if (PerIoData<>nil) then begin if PerIoData.IO_TYPE = IO_TYPE_Accept then //連接請求 begin //發送日志顯示 lvRet := TLogClientWrapper.logINfo('工作線程[' + intToStr(lvWorkID) + ']:有新的連接接入'); TIODataMemPool.instance.giveBackIOData(PerIoData); end else if PerIoData.IO_TYPE = IO_TYPE_Recv then begin //加入到套接字對應的緩存中 lvClientContext.AppendBuffer(PerIoData.DataBuf.buf, PerIoData.Overlapped.InternalHigh); lvRet := TLogClientWrapper.logINfo('工作線程[' + intToStr(lvWorkID) + ']:接收到數據!'); TIODataMemPool.instance.giveBackIOData(PerIoData); end; /////分配內存<可以加入內存池> PerIoData := TIODataMemPool.instance.borrowIOData; PerIoData.IO_TYPE := IO_TYPE_Recv; /////異步收取數據 if (WSARecv(lvClientContext.Socket, @PerIoData.DataBuf, 1, PerIoData.WorkBytes, PerIOData.WorkFlag, @PerIoData^, nil) = SOCKET_ERROR) then begin lvRet := GetLastError(); //重疊IO,出現ERROR_IO_PENDING是正常的, //表示數據尚未接收完成,如果有數據接收,GetQueuedCompletionStatus會有返回值 if (lvRet <> ERROR_IO_PENDING) then begin lvClientContext.Free; if PerIoData <> nil then begin GlobalFree(DWORD(PerIoData)); end; Continue; end; end; end; end; end; function D10_IOCPRun(pvData:Pointer): Integer; var WSData: TWSAData; lvIOPort, lvPerIOPort:THandle; hThread, dwThreadId:DWORD; sSocket, cSocket:TSocket; lvAddr:TSockAddr; lvAddrSize:Integer; lvMsg:String; lvPort:Integer; lvSystemInfo: TSystemInfo; i:Integer; PerIoData:LPPER_IO_OPERATION_DATA; lvWorkerData:PWorkData; Flags:Cardinal; RecvBytes:Cardinal; lvCount:Integer; lvClientContext:TClientContext; begin lvPort := Integer(pvData); //加載SOCKET。使用的是2.2版為了后面方便加入心跳。 WSAStartup($0202, WSData); // 創建一個完成端口(內核對象) lvIOPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); // GetSystemInfo(lvSystemInfo); // lvCount := lvSystemInfo.dwNumberOfProcessors * 2 -1; lvCount := 1; //ServerWorkerThread 是工作線程 for I:=1 to lvCount do begin lvWorkerData := AllocMem(SizeOf(TWorkerData)); lvWorkerData.IOCPHandle := lvIOPort; lvWorkerData.WorkerID := i; hThread := CreateThread(nil, 0, @ServerWorkerThread, lvWorkerData,0, dwThreadId); if (hThread = 0) then begin Exit; end; CloseHandle(hThread); end; //創建一個套接字,將此套接字和一個端口綁定並監聽此端口。 sSocket:=WSASocket(AF_INET,SOCK_STREAM,0,Nil,0,WSA_FLAG_OVERLAPPED); if sSocket=SOCKET_ERROR then begin closesocket(sSocket); WSACleanup(); end; lvAddr.sin_family:=AF_INET; lvAddr.sin_port:=htons(lvPort); lvAddr.sin_addr.s_addr:=htonl(INADDR_ANY); if bind(sSocket,@lvAddr,sizeof(lvAddr))=SOCKET_ERROR then begin closesocket(sSocket); exit; end; listen(sSocket,20); //下面循環進行循環獲取客戶端的請求。 while (TRUE) do begin //當客戶端有連接請求的時候,WSAAccept函數會新創建一個套接字cSocket。這個套接字就是和客戶端通信的時候使用的套接字。 cSocket:= WSAAccept(sSocket, nil, nil, nil, 0); //判斷cSocket套接字創建是否成功,如果不成功則退出。 if (cSocket= SOCKET_ERROR) then begin closesocket(sSocket); exit; end; //start----將套接字、完成端口綁定在一起。 // 最開始的時候沒有明白為什么還要調用一次createIoCompletionPort // // 后來經過google,和測試 // // 是將新的套接字(socket)加入到iocp端口<綁定> // 這樣工作線程才能處理這個套接字(socket)的數據包 //如果把下面注釋掉,WSARecv這個套接字時,GetQueuedCompletionStatus無法處理到收到的數據包 // 注意第三個參數也需要進行綁定, 否則在工作線程中GetQueuedCompletionStatus時completionKey會取不到cSocket值 lvClientContext := TClientContext.Create(cSocket); //將套接字、完成端口客戶端對象綁定在一起。 //2013年4月20日 13:45:10 lvPerIOPort := CreateIoCompletionPort(cSocket, lvIOPort, Cardinal(lvClientContext), 0); if (lvPerIOPort = 0) then begin Exit; end; ////----end //初始化數據包 PerIoData := TIODataMemPool.instance.borrowIOData; //數據包中的IO類型:有連接請求 PerIoData.IO_TYPE := IO_TYPE_Accept; //通知工作線程,有新的套接字連接<第三個參數> PostQueuedCompletionStatus( lvIOPort, 0, Cardinal(lvClientContext), POverlapped(PerIOData)); end; end; initialization finalization WSACleanup; end.
//在ClientContext中使用
unit uBuffer; { 套接字對應的接收緩存,使用鏈條模式。 } interface uses Windows; type PBufRecord = ^_BufRecord; _BufRecord = packed record len: Cardinal; // the length of the buffer buf: PAnsiChar; // the pointer to the buffer preBuf:PBufRecord; //前一個buffer nextBuf:PBufRecord; //后一個buffer end; TBufferLink = class(TObject) private FHead:PBufRecord; FTail:PBufRecord; //當前讀到的Buffer FRead:PBufRecord; //當前讀到的Buffer位置 FReadPosition: Cardinal; FMark:PBufRecord; FMarkPosition: Cardinal; function InnerReadBuf(const pvBufRecord: PBufRecord; pvStartIndex: Cardinal; buf: PAnsiChar; len: Cardinal): Cardinal; public constructor Create; procedure markReadIndex; procedure AddBuffer(buf:PAnsiChar; len:Cardinal); function readBuffer(buf:PAnsiChar; len:Cardinal):Cardinal; end; implementation constructor TBufferLink.Create; begin inherited Create; FReadPosition := 0; end; { TBufferLink } procedure TBufferLink.AddBuffer(buf: PAnsiChar; len: Cardinal); var lvBuf:PBufRecord; begin New(lvBuf); lvBuf.preBuf := nil; lvBuf.nextBuf := nil; lvBuf.buf := GetMemory(len); lvBuf.len := len; CopyMemory(lvBuf.buf, Pointer(LongInt(buf)), len); if FHead = nil then begin FHead := lvBuf; end; if FTail = nil then begin FTail := lvBuf; end else begin FTail.nextBuf := lvBuf; lvBuf.preBuf := FTail; end; end; function TBufferLink.InnerReadBuf(const pvBufRecord: PBufRecord; pvStartIndex: Cardinal; buf: PAnsiChar; len: Cardinal): Cardinal; var lvValidCount:Cardinal; begin Result := 0; if pvBufRecord <> nil then begin lvValidCount := pvBufRecord.len-pvStartIndex; if lvValidCount <= 0 then begin Result := 0; end else begin if len <= lvValidCount then begin CopyMemory(buf, Pointer(Cardinal(pvBufRecord.buf) + pvStartIndex), len); Result := len; end else begin CopyMemory(buf, Pointer(Cardinal(pvBufRecord.buf) + pvStartIndex), lvValidCount); Result := lvValidCount; end; end; end; end; procedure TBufferLink.markReadIndex; begin FMark := FRead; FMarkPosition := FReadPosition; end; function TBufferLink.readBuffer(buf: PAnsiChar; len: Cardinal): Cardinal; var lvBuf:PBufRecord; lvPosition, l, lvReadCount, lvRemain:Cardinal; begin lvReadCount := 0; lvBuf := FRead; lvPosition := FReadPosition; if lvBuf = nil then begin lvBuf := FHead; lvPosition := 0; end; if lvBuf <> nil then begin lvRemain := len; while lvBuf <> nil do begin l := InnerReadBuf(lvBuf, lvPosition, Pointer(Cardinal(buf) + lvReadCount), lvRemain); if l = lvRemain then begin //讀完 inc(lvReadCount, l); Inc(lvPosition, l); FReadPosition := lvPosition; FRead := lvBuf; lvRemain := 0; Break; end else if l < lvRemain then //讀取的比需要讀的長度小 begin lvRemain := lvRemain - l; inc(lvReadCount, l); Inc(lvPosition, l); FReadPosition := lvPosition; FRead := lvBuf; lvBuf := lvBuf.nextBuf; if lvBuf <> nil then //讀下一個 begin FRead := lvBuf; FReadPosition := 0; lvPosition := 0; end; end; end; Result := lvReadCount; end else begin Result := 0; end; end; end.
>>>>后面研究Decoder