Delphi-IOCP 學習筆記<六>=====IO內存池和擴展套接字(ClientContext)


規划下將要出爐的IOCP。

1.將接收IO數據改成內存池。

2.擴展lpCompletionKey: DWORD參數.擴展套接字對象。

3.借鑒java netty思路,使用decode –> handler的思路來處理客戶端數據。

 

 

//內存池

unit uMemPool;

interface

uses
  JwaWinsock2, Windows, SyncObjs;

const
  MAX_BUFFER_SIZE = 1024;

type
  LPPER_IO_OPERATION_DATA = ^PER_IO_OPERATION_DATA;

  PER_IO_OPERATION_DATA = packed record
    Overlapped: OVERLAPPED;
    IO_TYPE: Cardinal;
    DataBuf: TWSABUF;
    WorkBytes: Cardinal;    //如果是接收,接收的字節數
    WorkFlag: Cardinal;
    pre:LPPER_IO_OPERATION_DATA;
    next:LPPER_IO_OPERATION_DATA;
  end;
  
  TIODataMemPool = class(TObject)
  private
    FCs: TCriticalSection;

    //第一個可用的內存塊
    FHead: LPPER_IO_OPERATION_DATA;

    //最后一個可用的內存卡
    FTail: LPPER_IO_OPERATION_DATA;

    //可用的內存個數
    FUseableCount:Integer;

    //正在使用的個數
    FUsingCount:Integer;

    /// <summary>
    ///   將一個內存塊添加到尾部
    /// </summary>
    /// <param name="pvIOData"> (LPPER_IO_OPERATION_DATA) </param>
    procedure AddData2Pool(pvIOData:LPPER_IO_OPERATION_DATA);

    /// <summary>
    ///   得到一塊可以使用的內存
    /// </summary>
    /// <returns> LPPER_IO_OPERATION_DATA
    /// </returns>
    function getUsableData: LPPER_IO_OPERATION_DATA;

    /// <summary>
    ///   創建一塊內存空間
    /// </summary>
    /// <returns> LPPER_IO_OPERATION_DATA
    /// </returns>
    function InnerCreateIOData: LPPER_IO_OPERATION_DATA;

    procedure clearMemBlock(pvIOData:LPPER_IO_OPERATION_DATA);
  public
    class function instance: TIODataMemPool;
    constructor Create;
    destructor Destroy; override;

    //借一塊內存
    function borrowIOData: LPPER_IO_OPERATION_DATA;

    //換會一塊內存
    procedure giveBackIOData(const pvIOData: LPPER_IO_OPERATION_DATA);
  end;

implementation

var
  __IODATA_instance:TIODataMemPool;

constructor TIODataMemPool.Create;
begin
  inherited Create;
  FCs := TCriticalSection.Create();
  FUseableCount := 0;
  FUsingCount := 0;
end;

destructor TIODataMemPool.Destroy;
begin
  FCs.Free;
  inherited Destroy;
end;

{ TIODataMemPool }

procedure TIODataMemPool.AddData2Pool(pvIOData:LPPER_IO_OPERATION_DATA);
begin
  if FHead = nil then
  begin
    FHead := pvIOData;
    FHead.next := nil;
    FHead.pre := nil;
    FTail := pvIOData;
  end else
  begin
    FTail.next := pvIOData;
    pvIOData.pre := FTail;
    FTail := pvIOData;
  end;
  Inc(FUseableCount);
end;

function TIODataMemPool.InnerCreateIOData: LPPER_IO_OPERATION_DATA;
begin
  Result := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA)));
  GetMem(Result.DataBuf.buf, MAX_BUFFER_SIZE);
  Result.DataBuf.len := MAX_BUFFER_SIZE;

  //清理一塊內存
  clearMemBlock(Result);
end;

function TIODataMemPool.borrowIOData: LPPER_IO_OPERATION_DATA;
begin
  FCs.Enter;
  try
    Result := getUsableData;
    if Result = nil then
    begin
      //生產一個內存塊
      Result := InnerCreateIOData;

      //直接借走<增加使用計數器>
      Inc(FUsingCount);
    end;
  finally
    FCs.Leave;
  end;
end;

procedure TIODataMemPool.clearMemBlock(pvIOData: LPPER_IO_OPERATION_DATA);
begin
  //清理一塊內存
  pvIOData.IO_TYPE := 0;
  
  pvIOData.WorkBytes := 0;
  pvIOData.WorkFlag := 0;

  ZeroMemory(@pvIOData.Overlapped, sizeof(OVERLAPPED));
  ZeroMemory(pvIOData.DataBuf.buf, pvIOData.DataBuf.len);
end;

procedure TIODataMemPool.giveBackIOData(const pvIOData:
    LPPER_IO_OPERATION_DATA);
begin
  FCs.Enter;
  try
    //清理內存塊
    clearMemBlock(pvIOData);

    //加入到可以使用的內存空間
    AddData2Pool(pvIOData);

    //減少使用計數器
    Dec(FUsingCount);
  finally
    FCs.Leave;
  end;
end;

function TIODataMemPool.getUsableData: LPPER_IO_OPERATION_DATA;
var
  lvPre:LPPER_IO_OPERATION_DATA;
begin
  if FTail = nil then
  begin
    Result := nil;
  end else  
  begin   
    Result := FTail;

    lvPre := FTail.pre;
    if lvPre <> nil then
    begin
      lvPre.next := nil;
      FTail := lvPre;
    end else  //FTail是第一個也是最后一個,只有一個
    begin
      FHead := nil;
      FTail := nil;
    end;  

    Result.next := nil;
    Result.pre := nil;

    Dec(FUseableCount);
    Inc(FUsingCount);
  end;
end;

class function TIODataMemPool.instance: TIODataMemPool;
begin
  Result := __IODATA_instance;
end;


initialization
  __IODATA_instance := TIODataMemPool.Create;

finalization
  if __IODATA_instance <> nil then
  begin
    __IODATA_instance.Free;
    __IODATA_instance := nil;
  end;

end.

 

 

//擴展的套接字對象

unit uClientContext;

interface

uses
  Windows, JwaWinsock2, uBuffer, uBufferBuilder;

type
  TClientContext = class(TObject)
  private
    FSocket: TSocket;

    FBuffers: TBufferLink;
  public

    procedure CloseClientSocket;

    constructor Create(ASocket: TSocket);

    procedure AppendBuffer(const buf:WSABUF); overload; 

    function AppendBuffer(buf:PAnsiChar; len:Cardinal): Cardinal; overload;

    function readBuffer(buf:PAnsiChar; len:Cardinal): Cardinal;

    destructor Destroy; override;

    property Socket: TSocket read FSocket;

  end;

implementation

procedure TClientContext.AppendBuffer(const buf:WSABUF);
begin
  FBuffers.AddBuffer(buf.buf, buf.len);
end;

procedure TClientContext.CloseClientSocket;
begin
  if FSocket <> INVALID_SOCKET then
  begin
    closesocket(FSocket);
  end;
end;

constructor TClientContext.Create(ASocket: TSocket);
begin
  inherited Create;
  FSocket := ASocket;
  FBuffers := TBufferLink.Create();
end;

destructor TClientContext.Destroy;
begin
  FBuffers.Free;
  FBuffers := nil;  
  CloseClientSocket;
  inherited Destroy;
end;

function TClientContext.AppendBuffer(buf:PAnsiChar; len:Cardinal): Cardinal;
begin
  FBuffers.AddBuffer(buf, len);
end;

function TClientContext.readBuffer(buf:PAnsiChar; len:Cardinal): Cardinal;
begin
  Result := FBuffers.readBuffer(buf, len);
end;

end.

 

 

//修改后的代碼工作線程和listener

unit uD10_IOCP;

interface

uses
  JwaWinsock2, Windows, SysUtils, uMemPool;

const
  DATA_BUFSIZE = 1024;

  IO_TYPE_Accept = 1;
  IO_TYPE_Recv = 2;



type
  //(1):單IO數據結構
  PWorkData = ^TWorkerData;
  TWorkerData = packed record
    IOCPHandle:THandle;
    WorkerID:Cardinal;
  end;


function D10_IOCPRun(pvData:Pointer): Integer; stdcall;


implementation

uses
  logClientWrapper, uClientContext;


function ServerWorkerThread(pData:Pointer): Integer; stdcall;
var
  CompletionPort:THANDLE;
  lvWorkID:Cardinal;
  BytesTransferred:Cardinal;
  PerIoData:LPPER_IO_OPERATION_DATA;
  Flags:Cardinal;
  RecvBytes:Cardinal;
  lvResultStatus:BOOL;
  lvRet:Integer;
  
  lvClientContext:TClientContext;
begin
  CompletionPort:=PWorkData(pData).IOCPHandle;
  lvWorkID := PWorkData(pData).WorkerID;
   //得到創建線程是傳遞過來的IOCP
   while(TRUE) do
   begin
        //工作者線程會停止到GetQueuedCompletionStatus函數處,直到接受到數據為止
        lvResultStatus := GetQueuedCompletionStatus(CompletionPort,
          BytesTransferred,
          Cardinal(lvClientContext),
          POverlapped(PerIoData), INFINITE);

        if (lvResultStatus = False) then
        begin
          //當客戶端連接斷開或者客戶端調用closesocket函數的時候,函數GetQueuedCompletionStatus會返回錯誤。如果我們加入心跳后,在這里就可以來判斷套接字是否依然在連接。
          if lvClientContext<>nil then
          begin
            lvClientContext.Free;
            lvClientContext := nil;
          end;
          if PerIoData<>nil then
          begin
            TIODataMemPool.instance.giveBackIOData(PerIoData);
          end;
          continue;
        end;

        if PerIoData = nil then
        begin
          lvClientContext.Free;
          lvClientContext := nil;
          Break;
        end else  if (PerIoData<>nil) then
        begin
          if PerIoData.IO_TYPE = IO_TYPE_Accept then  //連接請求
          begin
            //發送日志顯示
            lvRet := TLogClientWrapper.logINfo('工作線程[' + intToStr(lvWorkID) + ']:有新的連接接入');
            TIODataMemPool.instance.giveBackIOData(PerIoData);
          end else if PerIoData.IO_TYPE = IO_TYPE_Recv then
          begin
            //加入到套接字對應的緩存中
            lvClientContext.AppendBuffer(PerIoData.DataBuf.buf, PerIoData.Overlapped.InternalHigh);
            lvRet := TLogClientWrapper.logINfo('工作線程[' + intToStr(lvWorkID) + ']:接收到數據!');
            TIODataMemPool.instance.giveBackIOData(PerIoData);
          end;

          /////分配內存<可以加入內存池>
          PerIoData := TIODataMemPool.instance.borrowIOData;
          PerIoData.IO_TYPE := IO_TYPE_Recv;



          /////異步收取數據
          if (WSARecv(lvClientContext.Socket,
             @PerIoData.DataBuf,
             1,
             PerIoData.WorkBytes,
             PerIOData.WorkFlag,
             @PerIoData^, nil) = SOCKET_ERROR) then
          begin
            lvRet := GetLastError();
            //重疊IO,出現ERROR_IO_PENDING是正常的,
            //表示數據尚未接收完成,如果有數據接收,GetQueuedCompletionStatus會有返回值
            if (lvRet <> ERROR_IO_PENDING) then
            begin
              lvClientContext.Free;
              if PerIoData <> nil then
              begin
                GlobalFree(DWORD(PerIoData));
              end;
              Continue;
            end;
          end;
        end;
   end;
end;

function D10_IOCPRun(pvData:Pointer): Integer;
var
  WSData: TWSAData;
  lvIOPort, lvPerIOPort:THandle;
  hThread, dwThreadId:DWORD;

  sSocket, cSocket:TSocket;
  lvAddr:TSockAddr;
  lvAddrSize:Integer;
  lvMsg:String;
  lvPort:Integer;

  lvSystemInfo: TSystemInfo;
  i:Integer;

  PerIoData:LPPER_IO_OPERATION_DATA;

  lvWorkerData:PWorkData;

  Flags:Cardinal;
  RecvBytes:Cardinal;
  lvCount:Integer;
  lvClientContext:TClientContext;
begin

  lvPort := Integer(pvData);

  //加載SOCKET。使用的是2.2版為了后面方便加入心跳。
  WSAStartup($0202, WSData);

  // 創建一個完成端口(內核對象)
  lvIOPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);


//  GetSystemInfo(lvSystemInfo);
//  lvCount := lvSystemInfo.dwNumberOfProcessors * 2 -1;

  lvCount := 1;

  //ServerWorkerThread 是工作線程
  for I:=1 to lvCount do
  begin
     lvWorkerData := AllocMem(SizeOf(TWorkerData));
     lvWorkerData.IOCPHandle := lvIOPort;
     lvWorkerData.WorkerID := i;
     hThread := CreateThread(nil, 0, @ServerWorkerThread,
       lvWorkerData,0, dwThreadId);
     if (hThread = 0) then
     begin
         Exit;
     end;
     CloseHandle(hThread);
  end;

  //創建一個套接字,將此套接字和一個端口綁定並監聽此端口。
  sSocket:=WSASocket(AF_INET,SOCK_STREAM,0,Nil,0,WSA_FLAG_OVERLAPPED);
  if sSocket=SOCKET_ERROR then
  begin
      closesocket(sSocket);
      WSACleanup();
  end;
  lvAddr.sin_family:=AF_INET;
  lvAddr.sin_port:=htons(lvPort);
  lvAddr.sin_addr.s_addr:=htonl(INADDR_ANY);
  if bind(sSocket,@lvAddr,sizeof(lvAddr))=SOCKET_ERROR then
  begin
     closesocket(sSocket);
     exit;
  end;

  listen(sSocket,20);

  //下面循環進行循環獲取客戶端的請求。
  while (TRUE) do
  begin
     //當客戶端有連接請求的時候,WSAAccept函數會新創建一個套接字cSocket。這個套接字就是和客戶端通信的時候使用的套接字。
     cSocket:= WSAAccept(sSocket, nil, nil, nil, 0);

     //判斷cSocket套接字創建是否成功,如果不成功則退出。
     if (cSocket= SOCKET_ERROR) then
     begin
        closesocket(sSocket);
        exit;
     end;

     //start----將套接字、完成端口綁定在一起。

     //     最開始的時候沒有明白為什么還要調用一次createIoCompletionPort
     //
     //     后來經過google,和測試
     //
     //     是將新的套接字(socket)加入到iocp端口<綁定>
     //     這樣工作線程才能處理這個套接字(socket)的數據包
     //如果把下面注釋掉,WSARecv這個套接字時,GetQueuedCompletionStatus無法處理到收到的數據包
     //    注意第三個參數也需要進行綁定, 否則在工作線程中GetQueuedCompletionStatus時completionKey會取不到cSocket值

     lvClientContext := TClientContext.Create(cSocket);

     //將套接字、完成端口客戶端對象綁定在一起。
     //2013年4月20日 13:45:10
     lvPerIOPort := CreateIoCompletionPort(cSocket, lvIOPort, Cardinal(lvClientContext), 0);
     if (lvPerIOPort = 0) then
     begin
        Exit;
     end;
     ////----end

     //初始化數據包
     PerIoData := TIODataMemPool.instance.borrowIOData;

     //數據包中的IO類型:有連接請求
     PerIoData.IO_TYPE := IO_TYPE_Accept;

     //通知工作線程,有新的套接字連接<第三個參數>
     PostQueuedCompletionStatus(
        lvIOPort, 0,
        Cardinal(lvClientContext),
        POverlapped(PerIOData));
  end;


end;

initialization

finalization
   WSACleanup;

end.

 

//在ClientContext中使用

unit uBuffer;
{
   套接字對應的接收緩存,使用鏈條模式。
}

interface

uses
  Windows;

type
  PBufRecord = ^_BufRecord;
  _BufRecord = packed record
    len: Cardinal; // the length of the buffer
    buf: PAnsiChar; // the pointer to the buffer

    preBuf:PBufRecord;   //前一個buffer
    nextBuf:PBufRecord;  //后一個buffer
  end;

  TBufferLink = class(TObject)
  private
    FHead:PBufRecord;
    FTail:PBufRecord;

    //當前讀到的Buffer
    FRead:PBufRecord;
    //當前讀到的Buffer位置
    FReadPosition: Cardinal;

    FMark:PBufRecord;
    FMarkPosition: Cardinal;

    function InnerReadBuf(const pvBufRecord: PBufRecord; pvStartIndex: Cardinal;
        buf: PAnsiChar; len: Cardinal): Cardinal;
  public
    constructor Create;

    procedure markReadIndex;

    procedure AddBuffer(buf:PAnsiChar; len:Cardinal);

    function readBuffer(buf:PAnsiChar; len:Cardinal):Cardinal;
  end;


implementation

constructor TBufferLink.Create;
begin
  inherited Create;
  FReadPosition := 0;
end;

{ TBufferLink }

procedure TBufferLink.AddBuffer(buf: PAnsiChar; len: Cardinal);
var
  lvBuf:PBufRecord;
begin
  New(lvBuf);
  lvBuf.preBuf := nil;
  lvBuf.nextBuf := nil;
  lvBuf.buf := GetMemory(len);
  lvBuf.len := len;
  CopyMemory(lvBuf.buf, Pointer(LongInt(buf)), len);
  if FHead = nil then
  begin
    FHead := lvBuf;
  end;
  
  if FTail = nil then
  begin
    FTail := lvBuf;
  end else
  begin
    FTail.nextBuf := lvBuf;
    lvBuf.preBuf := FTail;
  end;
end;

function TBufferLink.InnerReadBuf(const pvBufRecord: PBufRecord; pvStartIndex:
    Cardinal; buf: PAnsiChar; len: Cardinal): Cardinal;
var
  lvValidCount:Cardinal;
begin
  Result := 0;
  if pvBufRecord <> nil then
  begin
    lvValidCount := pvBufRecord.len-pvStartIndex;
    if lvValidCount <= 0 then
    begin
      Result := 0;
    end else
    begin
      if len <= lvValidCount then
      begin
        CopyMemory(buf, Pointer(Cardinal(pvBufRecord.buf) + pvStartIndex), len);
        Result := len;
      end else
      begin
        CopyMemory(buf, Pointer(Cardinal(pvBufRecord.buf) + pvStartIndex), lvValidCount);
        Result := lvValidCount;
      end;
    end;

  end;
end;

procedure TBufferLink.markReadIndex;
begin
  FMark := FRead;
  FMarkPosition := FReadPosition;
end;

function TBufferLink.readBuffer(buf: PAnsiChar; len: Cardinal): Cardinal;
var
  lvBuf:PBufRecord;
  lvPosition, l, lvReadCount, lvRemain:Cardinal;
begin
  lvReadCount := 0;
  lvBuf := FRead;
  lvPosition := FReadPosition;
  if lvBuf = nil then
  begin
    lvBuf := FHead;
    lvPosition := 0;
  end;

  if lvBuf <> nil then
  begin
    lvRemain := len;
    while lvBuf <> nil do
    begin
      l := InnerReadBuf(lvBuf, lvPosition, Pointer(Cardinal(buf) + lvReadCount), lvRemain);
      if l = lvRemain then
      begin
        //讀完
        inc(lvReadCount, l);
        Inc(lvPosition, l);
        FReadPosition := lvPosition;
        FRead := lvBuf;
        lvRemain := 0;
        Break;
      end else if l < lvRemain then  //讀取的比需要讀的長度小
      begin
        lvRemain := lvRemain - l;
        inc(lvReadCount, l);
        Inc(lvPosition, l);
        FReadPosition := lvPosition;
        FRead := lvBuf;
        lvBuf := lvBuf.nextBuf;
        if lvBuf <> nil then   //讀下一個
        begin
          FRead := lvBuf;
          FReadPosition := 0;
          lvPosition := 0;
        end;
      end;
    end;
    Result := lvReadCount;
  end else
  begin
    Result := 0;
  end;

end;

end.

>>>>后面研究Decoder


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM