www.pudn.com > IOCPSocketServer.rar > IOCPSocket.pas


{**************************************************** 
 
           IOCP Socket Server 
 
          主要参考自CodeProject下载源码, 
          特别感谢张无忌的帮助 
 
          这个版本还很粗糙,很多地方没有处理好,最近 
          比较忙,没有精力和时间来完善它,大家如果感 
          兴趣,可以接着完善,或者干脆重写。 
 
          如果是从这份代码改写,写完了别忘了发给我一份 
          wjjxm@etang.com 
 
 
          write by: softdog 
          2003.2 
 
****************************************************} 
unit IOCPSocket; 
 
interface 
 
uses 
  SysUtils, Windows, WinSock2, SyncObjs, Classes; 
 
const 
  BUFFER_SIZE = 2048; 
 
type 
  TSocketStatus = (ssAccept, ssSend, ssRecv, ssClose, ssStop); 
 
  PPerHandleData = ^TPerHandleData; 
  TPerHandleData = record 
    Overlapped: OVERLAPPED; 
    wsaBuffer: WSABUF; 
    Statu: TSocketStatus; 
    Socket: TSocket; 
    Buffer: array[0..BUFFER_SIZE - 1] of char; 
  end; 
 
  TIOCPProcThread = class; 
  TIOCPServer = class; 
 
  TErrorLogProc = procedure(Msg: string; param: array of const) of object; 
  TOnClientConnect = procedure(NewSocket: TSocket) of object; 
  TOnClientDisConnect = procedure(ASocket: TSocket) of object; 
  TOnDataRecive = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object; 
  TOnDataSend = procedure(IOData: PPerHandleData; var OperPosted: boolean) of object; 
 
  //服务端Socket类 
  TIOCPServer = class(TComponent) 
  private 
    FActive: boolean; 
    FLocalAddress: string; 
    FLocalPort: Word; 
    FListenPort: THandle; 
    FListenSocket: TSocket; 
    FAcceptThreads: word; 
    FAcceptThreadPool: TList; 
    FProcPool: TList; 
    FClientList: TStringList; 
    FThreadCacheSize: Word; 
    FErrorLogProc: TErrorLogProc; 
    FLogProc: TErrorLogProc; 
    FOnClientConnect: TOnClientConnect; 
    FOnClientDisConnect: TOnClientDisConnect; 
    FOnDataRecive: TOnDataRecive; 
    FOnDataSend: TOnDataSend; 
    FCount: integer; 
    procedure SetActive(const Value: boolean); 
    procedure SetLocalAddress(const Value: string); 
    procedure SetLocalPort(const Value: word); 
    procedure SetThreadCacheSize(const Value: Word); 
  protected 
    procedure ErrorLog(Msg: string; param: array of const); 
    procedure Log(Msg: string; param: array of const); 
    procedure DoClientClose(SocketInfo: TSocket); 
  public 
    constructor Create(Owner: TComponent); override; 
    destructor Destroy; override; 
    function Start: boolean; 
    function Stop: boolean; 
    function SendData(IOData: TPerHandleData): integer; 
    function PostRead(SocketInfo: TSocket): boolean; 
  published 
    property Active: boolean read FActive write SetActive; 
    property LocalAddress: string read FLocalAddress write SetLocalAddress; 
    property LocalPort: word read FLocalPort write SetLocalPort; 
    property AcceptThreads: word read FAcceptThreads write FAcceptThreads; 
    property ThreadCacheSize: Word read FThreadCacheSize write SetThreadCacheSize; 
    property OnErrorLog: TErrorLogProc read FErrorLogProc write FErrorLogProc; 
    property OnLog: TErrorLogProc read FLogProc write FLogProc; 
    property OnClientConnect: TOnClientConnect read FOnClientConnect write FOnClientConnect; 
    property OnClientDisConnect: TOnClientDisConnect read FOnClientDisConnect write FOnClientDisConnect; 
    property OnDataRecive: TOnDataRecive read FOnDataRecive write FOnDataRecive; 
    property OnDataSend: TOnDataSend read FOnDataSend write FOnDataSend; 
  end; 
 
  //监听线程类 
  TIOCPAcceptThread = class(TThread) 
  private 
    FServerComm: TIOCPServer; 
    FLogMsg: string; 
  protected 
    procedure Log; 
    procedure Errlog; 
    procedure Execute; override; 
  public 
    constructor Create(AServerComm: TIOCPServer); 
  end; 
 
  //服务端处理线程 
  TIOCPProcThread = class(TThread) 
  private 
    FServerComm: TIOCPServer; 
    FLogMsg: string; 
  protected 
    procedure Log; 
    procedure Errlog; 
    procedure Execute; override; 
  public 
    constructor Create(AServerComm: TIOCPServer); 
    destructor Destroy; override; 
    property ServerComm: TIOCPServer read FServerComm; 
  end; 
 
procedure InitSocket; 
procedure DoneSocket; 
procedure Register; 
 
var 
  GlobalLock: TCriticalSection; 
 
implementation 
 
var 
  bCanLog: boolean; 
 
procedure Register; 
begin 
  RegisterComponents('IOCP', [TIOCPServer]); 
end; 
 
procedure InitSocket; 
var 
  wsaData: TWSAData; 
begin 
  WSAStartup(makeword(2, 0), wsaData); 
end; 
 
procedure DoneSocket; 
begin 
  WSACleanup; 
end; 
 
{ TIOCPServer } 
 
constructor TIOCPServer.Create(Owner: TComponent); 
begin 
  inherited Create(Owner); 
 
  FAcceptThreads := 5; 
  FThreadCacheSize := 5; 
  FAcceptThreadPool := TList.Create; 
  FClientList := TStringList.Create; 
  FProcPool := TList.Create; 
end; 
 
destructor TIOCPServer.Destroy; 
begin 
  bCanLog := false; 
  Stop; 
  FAcceptThreadPool.Free; 
  FClientList.Free; 
  FProcPool.Free; 
 
  inherited; 
end; 
 
procedure TIOCPServer.DoClientClose(SocketInfo: TSocket); 
var 
  idx: integer; 
begin 
  idx := FClientList.IndexOf(inttostr(SocketInfo)); 
  if idx >= 0 then 
  begin 
    closesocket(SocketInfo); 
    FClientList.Delete(idx); 
  end; 
 
  if Assigned(FOnClientDisConnect) then 
    FOnClientDisConnect(SocketInfo); 
end; 
 
procedure TIOCPServer.ErrorLog(Msg: string; param: array of const); 
begin 
  if Assigned(FErrorLogProc) and bCanLog then 
    FErrorLogProc(Msg, Param); 
end; 
 
procedure TIOCPServer.Log(Msg: string; param: array of const); 
begin 
  if Assigned(FLogProc) and bCanLog then 
    FLogProc(Msg, Param); 
end; 
 
function TIOCPServer.PostRead(SocketInfo: TSocket): boolean; 
var 
  byteRecv, Flags: DWORD; 
  HandleData: PPerHandleData; 
begin 
  Flags := 0; 
  New(HandleData); 
  FillChar(HandleData.Overlapped, Sizeof(Overlapped), 0); 
  FillChar(HandleData.Buffer, BUFFER_SIZE, 0); 
  HandleData.wsaBuffer.buf := HandleData.Buffer; 
  HandleData.wsaBuffer.len := BUFFER_SIZE; 
  HandleData.Statu := ssRecv; 
  HandleData.Socket := SocketInfo; 
 
  Result := (WSARecv(SocketInfo, @(HandleData.wsaBuffer), 1, 
    byteRecv, 
    Flags, 
    @HandleData.Overlapped, 
    nil) 
    <> SOCKET_ERROR); 
end; 
 
function TIOCPServer.SendData(IOData: TPerHandleData): integer; 
var 
  byteSend, Flags: DWORD; 
  NewData: PPerHandleData; 
begin 
  new(NewData); 
  NewData^ := IOData; 
  Flags := 0; 
  FillChar(NewData^.Overlapped, Sizeof(Overlapped), 0); 
  NewData^.Statu := ssSend; 
 
  if WSASend(NewData^.Socket, @(NewData^.wsaBuffer), 1, 
    byteSend, 
    Flags, 
    @(NewData^.Overlapped), 
    nil) = SOCKET_ERROR then 
    if WSAGetLastError <> ERROR_IO_PENDING then 
    begin 
      Result := -1; 
      exit; 
    end; 
 
  Result := byteSend; 
end; 
 
procedure TIOCPServer.SetActive(const Value: boolean); 
begin 
  if FActive = Value then 
    exit; 
 
  if Value then 
    Start 
  else 
    Stop; 
 
  FActive := Value; 
end; 
 
procedure TIOCPServer.SetLocalAddress(const Value: string); 
begin 
  if FActive then 
  begin 
    ErrorLog('改变参数前必须停止监听!', []); 
    exit; 
  end; 
 
  FLocalAddress := Value; 
end; 
 
procedure TIOCPServer.SetLocalPort(const Value: word); 
begin 
  if FActive then 
  begin 
    ErrorLog('改变参数前必须停止监听!', []); 
    exit; 
  end; 
 
  FLocalPort := Value; 
end; 
 
procedure TIOCPServer.SetThreadCacheSize(const Value: Word); 
begin 
  FThreadCacheSize := Value; 
 
  if FProcPool = nil then 
    FProcPool := TList.Create; 
end; 
 
function TIOCPServer.Start: boolean; 
var 
  addr: TSockAddr; 
  i: integer; 
begin 
  if FActive then 
  begin 
    ErrorLog('服务已经启动!', []); 
    result := false; 
    exit; 
  end; 
 
  FListenPort := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); 
 
  //创建IOCP处理线程 
  for i := 1 to FThreadCacheSize do 
  begin 
    FProcPool.Add(TIOCPProcThread.Create(self)); 
    inc(FCount); 
  end; 
 
  //创建Overlapped Socket 
  FListenSocket := WSASocket(AF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED); 
 
  if FListenSocket = INVALID_SOCKET then 
  begin 
    ErrorLog('创建套接字错误:%d', [WSAGetLastError]); 
    Result := false; 
    exit; 
  end; 
 
  addr.sin_family := AF_INET; 
  addr.sin_port := htons(FLocalPort); 
  addr.sin_addr.S_addr := INADDR_ANY; 
 
  if Winsock2.bind(FListenSocket, @addr, sizeof(addr)) <> 0 then 
  begin 
    ErrorLog('绑定套接字错误:%d', [WSAGetLastError]); 
    Result := false; 
    exit; 
  end; 
 
  if Winsock2.listen(FListenSocket, 5) <> 0 then 
  begin 
    ErrorLog('监听套接字错误:%d', [WSAGetLastError]); 
    Result := false; 
    exit; 
  end; 
 
  //创建监听线程 
  for i := 1 to AcceptThreads do 
    FAcceptThreadPool.Add(TIOCPAcceptThread.Create(self)); 
 
  Result := true; 
  FActive := true; 
 
  Log('服务已经启动在端口: %d', [LocalPort]); 
end; 
 
function TIOCPServer.Stop: boolean; 
var 
  i: integer; 
  Closelapped: PPerHandleData; 
begin 
  if not FActive then 
  begin 
    result := false; 
    exit; 
  end; 
 
  //关闭监听线程 
  for i := FAcceptThreadPool.Count downto 1 do 
  begin 
    TIOCPAcceptThread(FAcceptThreadPool.Items[i - 1]).Terminate; 
    FAcceptThreadPool.Delete(i - 1); 
  end; 
 
  //关闭监听Socket 
  if FListenSocket <> INVALID_SOCKET then 
  begin 
    closesocket(FListenSocket); 
    FListenSocket := INVALID_SOCKET; 
  end; 
 
  //断开所有客户端连接 
  for i := FClientList.Count - 1 downto 0 do 
    DoClientClose(strtoint(FClientList.Strings[i])); 
 
  //发送关闭处理线程信号 
  for i := 1 to FProcPool.Count do 
  begin 
    New(Closelapped); 
    Closelapped.Statu := ssStop; 
    PostQueuedCompletionStatus(FListenPort, 1, 0, @(Closelapped.Overlapped)); 
  end; 
  FProcPool.Clear; 
 
  //关闭完成端口句柄 
  if FListenPort <> 0 then 
    CloseHandle(FListenPort); 
 
  Result := true; 
  FActive := false; 
 
  Log('服务已经停止.', []); 
end; 
 
{ TIOCPAcceptThread } 
 
constructor TIOCPAcceptThread.Create(AServerComm: TIOCPServer); 
begin 
  inherited Create(true); 
  FServerComm := AServerComm; 
  FreeOnTerminate := true; 
  Resume; 
end; 
 
procedure TIOCPAcceptThread.Errlog; 
begin 
  FServerComm.ErrorLog(FLogMsg, []); 
end; 
 
procedure TIOCPAcceptThread.Execute; 
var 
  Addr: TSockAddr; 
  AddrLen: integer; 
  sNew: TSocket; 
  NewPort: THandle; 
begin 
  AddrLen := SizeOf(Addr); 
  while not Terminated do 
  begin 
    sNew := Winsock2.accept(FServerComm.FListenSocket, Addr, AddrLen); 
    if sNew <> INVALID_SOCKET then 
    begin 
      GlobalLock.Enter; 
 
      //绑定完成端口 
      NewPort := CreateIoCompletionPort(sNew, FServerComm.FListenPort, 0, 0); 
      if NewPort = 0 then 
        Continue 
      else 
      begin 
        FServerComm.FClientList.Add(inttostr(sNew)); 
 
        //发送读取请求 
        FServerComm.PostRead(sNew); 
 
        if Assigned(FServerComm.OnClientConnect) then 
          FServerComm.OnClientConnect(sNew); 
 
        FLogMsg := Format('客户端连接:%d', [sNew]); 
        Synchronize(Log); 
 
        GlobalLock.Leave; 
      end; 
    end else 
      if WSAGetLastError = WSAEINTR then 
      begin 
        FLogMsg := '监听线程终止.'; 
        Synchronize(Log); 
        Terminate; 
      end; 
  end; 
end; 
 
procedure TIOCPAcceptThread.Log; 
begin 
  FServerComm.Log(FLogMsg, []); 
end; 
 
{ TIOCPProcThread } 
 
constructor TIOCPProcThread.Create(AServerComm: TIOCPServer); 
begin 
  FServerComm := AServerComm; 
  FreeOnTerminate := true; 
 
  inherited Create(false); 
end; 
 
destructor TIOCPProcThread.Destroy; 
begin 
  inherited; 
end; 
 
procedure TIOCPProcThread.Errlog; 
begin 
  FServerComm.ErrorLog(FLogMsg, []); 
end; 
 
procedure TIOCPProcThread.Execute; 
var 
  HandleData: PPerHandleData; 
  byteRece, Key: DWORD; 
  OperPosted: boolean; 
  errCode: integer; 
begin 
  while not Terminated do 
  begin 
    if not GetQueuedCompletionStatus(FServerComm.FListenPort, byteRece, Key, POverlapped(HandleData), INFINITE) then 
    begin 
      errCode := GetLastError; 
         
      if errcode = 6 then 
        Terminate; 
      Continue; 
    end; 
 
    GlobalLock.Enter; 
 
    //客户端断开连接 
    if byteRece = 0 then 
    begin 
      FLogMsg := Format('客户端断开连接:%d', [HandleData.Socket]); 
      Synchronize(Log); 
      FServerComm.DoClientClose(HandleData.Socket); 
      Dispose(HandleData); 
      GlobalLock.Leave; 
      Continue; 
    end; 
 
    case HandleData.Statu of 
      ssRecv: 
        begin 
          OperPosted := false; 
          if Assigned(FServerComm.FOnDataRecive) then 
            FServerComm.FOnDataRecive(HandleData, OperPosted); 
          if not OperPosted then 
            FServerComm.PostRead(HandleData.Socket); 
        end; 
      ssSend: 
        begin 
          OperPosted := false; 
          if Assigned(FServerComm.FOnDataSend) then 
            FServerComm.FOnDataSend(HandleData, OperPosted); 
          if not OperPosted then 
            FServerComm.PostRead(HandleData.Socket); 
        end; 
      ssClose: 
        begin 
          FServerComm.DoClientClose(HandleData.Socket); 
        end; 
      ssStop: 
        begin 
          Dec(FServerComm.FCount); 
          Terminate; 
        end; 
    end; // Case 
 
    Dispose(HandleData); 
    GlobalLock.Leave; 
  end; 
end; 
 
procedure TIOCPProcThread.Log; 
begin 
  FServerComm.Log(FLogMsg, []); 
end; 
 
initialization 
  InitSocket; 
  GlobalLock := TCriticalSection.Create; 
  bCanLog := true; 
finalization 
  DoneSocket; 
  GlobalLock.Free; 
end.