www.pudn.com > LiServer.rar > iocpserver.cpp


// 
// Sample: I/O Completion Port IPv4/IPv6 Server 
// 
// Files: 
//      iocpserver.cpp    - this file 
//      resolve.cpp       - Common name resolution routines 
//      resolve.h         - Header file for name resolution routines 
// 
// Description: 
//      This sample illustrates overlapped IO with a completion port for 
//      TCP and UDP over both IPv4 and IPv6. This sample uses the  
//      getaddrinfo/getnameinfo APIs which allows this application to be  
//      IP version independent. That is the desired address family  
//      (AF_INET or AF_INET6) can be determined simply from the string  
//      address passed via the -l command. 
// 
//      For TCP, a listening socket is created for each IP address family 
//      available. Each socket is associated with a completion port and 
//      worker threads are spawned (one for each CPU available). For each 
//      listening thread, a number of AcceptEx are posted. The worker threads 
//      then wait for one of these to complete. Upon completion, the client 
//      socket is associated with the completion port and several receives 
//      are posted. The AcceptEx is reposted as well. Once data is received 
//      on a client socket, it is echoed back.  
// 
//      For UDP, an echo socket is creatd for each IP address family available. 
//      For each socket, several receives are posted. Once these receives  
//      complete, the data is sent back to the receiver. 
// 
//      The important thing to remember with IOCP is that the completion events 
//      may occur out of order; however, the buffers are guaranteed to be filled 
//      in the order posted. For our echo server this can cause problems as  
//      receive N+1 may complete before receive N. We can't echo back N+1 before 
//      echoing N. There are two approaches possible. First, we could surmise 
//      that since receive N+1 has completed then we can safely echo back receive 
//      N and N+1 at that time (to maintain the data ordering). To do this properly 
//      you'll have to call WSAGetOverlappedResult on receive N in order to find 
//      out how many bytes were received to echo it back. The second approach 
//      (which is implemented in this sample) is to keep a list of receive 
//      buffers that completed out of order. This list is maintained in the 
//      per-socket data structure. When receive N+1 completes, it will notice that 
//      receive N has not completed. The buffer is then queued in the out of 
//      order send list. Once receive N completes, its buffer is queued -- the 
//      queue is ordered in the same order that the receive operations are. 
//      Another routine (DoSends) goes through this list and sends those buffers 
//      that are available and in order. If any gaps are detected no further buffers 
//      are sent (as we will wait for that receive to complete and insert its 
//      buffer into the list so that the next call to DoSends will correctly 
//      send the buffers in the right order). 
// 
//      For example: 
//          If this sample is called with the following command lines: 
//              iocpserver.exe -l fe80::2efe:1234 -e 5150 
//              iocpserver.exe -l :: 
//          Then the server creates an IPv6 socket as an IPv6 address was 
//          provided. 
// 
//          On the other hand, with the following command line: 
//              iocpserver.exe -l 7.7.7.1 -e 5150 
//              iocpserver.exe -l 0.0.0.0 
//          Then the server creates an IPv4 socket. 
// 
//          Calling the server with no parameters will create a server that 
//          listens both IPv4 and IPv6 (if installed). 
// 
// Compile: 
//      cl -o iocpserver.exe iocpserver.cpp resolve.cpp ws2_32.lib 
// 
// Usage: 
//      iocpserver.exe [options] 
//          -a 4|6     Address family, 4 = IPv4, 6 = IPv6 [default = IPv4] 
//          -b size    Buffer size for send/recv 
//          -e port    Port number 
//          -l addr    Local address to bind to [default INADDR_ANY for IPv4 or INADDR6_ANY for IPv6] 
//          -p proto   Which protocol to use [default = TCP] 
//              tcp         Use TCP 
//              udp         Use UDP 
// 
#ifndef _MT 
#define _MT 
#endif 
 
#include "my.h" 
#include  
#include  
#include  
//#include  
#include  
#include  
 
 
#include "resolve.h" 
 
#include "CMsgAnalyse.h" 
 
#pragma comment(lib,"Ws2_32.lib") 
#define DEFAULT_BUFFER_SIZE         4096   // default buffer size 
#define DEFAULT_OVERLAPPED_COUNT    5      // Number of overlapped recv per socket 
#define MAX_COMPLETION_THREAD_COUNT 32     // Maximum number of completion threads allowed 
 
int gAddressFamily = AF_UNSPEC,         // default to unspecified 
    gSocketType    = SOCK_STREAM,       // default to TCP socket type 
    gProtocol      = IPPROTO_TCP,       // default to TCP protocol 
    gBufferSize    = DEFAULT_BUFFER_SIZE, 
    gOverlappedCount = DEFAULT_OVERLAPPED_COUNT; 
 
char *gBindAddr    = NULL,              // local interface to bind to 
     *gBindPort    = "5150";            // local port to bind to 
 
// 
// This is our per I/O buffer. It contains a WSAOVERLAPPED structure as well 
//    as other necessary information for handling an IO operation on a socket. 
// 
typedef struct _BUFFER_OBJ 
{ 
    WSAOVERLAPPED        ol; 
 
    SOCKET               sclient;       // Used for AcceptEx client socket 
 
    char                *buf;           // Buffer for recv/send/AcceptEx 
    int                  buflen;        // Length of the buffer 
 
    int                  operation;     // Type of operation issued 
#define OP_ACCEPT       0                   // AcceptEx 
#define OP_READ         1                   // WSARecv/WSARecvFrom 
#define OP_WRITE        2                   // WSASend/WSASendTo 
 
    SOCKADDR_STORAGE     addr; 
    int                  addrlen; 
 
    ULONG                IoOrder;       // Order in which this I/O was posted 
 
    struct _BUFFER_OBJ  *next; 
 
} BUFFER_OBJ; 
 
// 
// This is our per socket buffer. It contains information about the socket handle 
//    which is returned from each GetQueuedCompletionStatus call. 
// 
typedef struct _SOCKET_OBJ 
{ 
    SOCKET               s;              // Socket handle 
 
    int                  af,             // Address family of socket (AF_INET, AF_INET6) 
                         bClosing;       // Is the socket closing? 
 
    volatile LONG        OutstandingOps; // Number of outstanding overlapped ops on  
                                         //    socket 
 
    BUFFER_OBJ         **PendingAccepts; // Pending AcceptEx buffers  
                                         //   (used for listening sockets only) 
 
    ULONG                LastSendIssued, // Last sequence number sent 
                         IoCountIssued;  // Next sequence number assigned to receives 
    BUFFER_OBJ          *OutOfOrderSends;// List of send buffers that completed out of order 
 
    // Pointers to Microsoft specific extensions. These are used by listening 
    //   sockets only 
    LPFN_ACCEPTEX        lpfnAcceptEx; 
    LPFN_GETACCEPTEXSOCKADDRS lpfnGetAcceptExSockaddrs; 
 
    CRITICAL_SECTION     SockCritSec;    // Protect access to this structure 
 
    struct _SOCKET_OBJ  *next; 
} SOCKET_OBJ; 
 
 
 
// 
// Statistics counters 
// 
volatile LONG gBytesRead=0, 
              gBytesSent=0, 
              gStartTime=0, 
              gBytesReadLast=0, 
              gBytesSentLast=0, 
              gStartTimeLast=0; 
 
// 
// Function: usage 
// 
// Description: 
//      Prints usage information and exits the process. 
// 
void usage(char *progname) 
{ 
    fprintf(stderr, "usage: %s [-a 4|6] [-e port] [-l local-addr] [-p udp|tcp]\n", 
            progname); 
    fprintf(stderr, "  -a 4|6     Address family, 4 = IPv4, 6 = IPv6 [default = IPv4]\n" 
                    "  -b size    Buffer size for send/recv [default = %d]\n" 
                    "  -e port    Port number [default = %s]\n" 
                    "  -l addr    Local address to bind to [default INADDR_ANY for IPv4 or INADDR6_ANY for IPv6]\n" 
                    "  -p tcp|udp Which protocol to use [default = TCP]\n", 
                    gBufferSize, 
                    gBindPort 
                    ); 
    ExitProcess(-1); 
} 
 
void dbgprint(char *format,...) 
{ 
#ifdef DEBUG 
    va_list vl; 
    char    dbgbuf[2048]; 
 
    if (pid == 0) 
    { 
        pid = GetCurrentProcessId(); 
    } 
 
    va_start(vl, format); 
    wvsprintf(dbgbuf, format, vl); 
    va_end(vl); 
 
    OutputDebugString(dbgbuf); 
#endif 
} 
 
// 
// Function: GetBufferObj 
//  
// Description: 
//    Allocate a BUFFER_OBJ. Each receive posted allocates one of these.  
//    After the recv is successful, the BUFFER_OBJ is queued for 
//    sending by the send thread. To increase performance, a lookaside lists  
//    should be used to cache free BUFFER_OBJ. 
// 
BUFFER_OBJ *GetBufferObj(SOCKET_OBJ *sock, int buflen) 
{ 
    BUFFER_OBJ *newobj=NULL; 
 
    // Allocate the object 
    newobj = (BUFFER_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(BUFFER_OBJ)); 
    if (newobj == NULL) 
    { 
        fprintf(stderr, "GetBufferObj: HeapAlloc failed: %d\n", GetLastError()); 
        ExitProcess(-1); 
    } 
    // Allocate the buffer 
    newobj->buf = (char *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(BYTE) *buflen); 
    if (newobj->buf == NULL) 
    { 
        fprintf(stderr, "GetBufferObj: HeapAlloc failed: %d\n", GetLastError()); 
        ExitProcess(-1); 
    } 
    newobj->buflen = buflen; 
 
    newobj->addrlen = sizeof(newobj->addr); 
 
    return newobj; 
} 
 
Msg *GetMsgObject(SOCKET theS, int lenth) 
{ 
	Msg *newobj = 0; 
 
	newobj = (Msg *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(Msg)); 
    if (newobj == NULL) 
    { 
        fprintf(stderr, "GetMsgObj: HeapAlloc failed: %d\n", GetLastError()); 
        ExitProcess(-1); 
    } 
 
	newobj->buf = (char *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(BYTE) *lenth); 
    if (newobj->buf == NULL) 
    { 
        fprintf(stderr, "GetBufferObj: HeapAlloc failed: %d\n", GetLastError()); 
        ExitProcess(-1); 
    } 
    newobj->buflen = lenth; 
	newobj->s = theS; 
 
	return newobj; 
} 
 
// 
// Function: FreeBufferObj 
//  
// Description: 
//    Free the buffer object. To increase performance, a lookaside list should be 
//    implemented to cache BUFFER_OBJ when freed. 
// 
void FreeBufferObj(BUFFER_OBJ *obj) 
{ 
    HeapFree(GetProcessHeap(), 0, obj->buf); 
    HeapFree(GetProcessHeap(), 0, obj); 
} 
 
// 
// Function: GetSocketObj 
// 
// Description: 
//    Allocate a socket object and initialize its members. A socket object is 
//    allocated for each socket created (either by socket or accept). 
//    Again, a lookaside list can be implemented to cache freed SOCKET_OBJ to 
//    improve performance. 
// 
SOCKET_OBJ *GetSocketObj(SOCKET s, int af) 
{ 
    SOCKET_OBJ  *sockobj=NULL; 
 
    sockobj = (SOCKET_OBJ *)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(SOCKET_OBJ)); 
    if (sockobj == NULL) 
    { 
        fprintf(stderr, "GetSocketObj: HeapAlloc failed: %d\n", GetLastError()); 
        ExitProcess(-1); 
    } 
 
    // Initialize the members 
    sockobj->s = s; 
    sockobj->af = af; 
 
    // For TCP we initialize the IO count to one since the AcceptEx is posted 
    //    to receive data 
    sockobj->IoCountIssued = ((gProtocol == IPPROTO_TCP) ? 1 : 0); 
 
    InitializeCriticalSection(&sockobj->SockCritSec); 
 
    return sockobj; 
} 
 
// 
// Function: FreeSocketObj 
// 
// Description: 
//    Frees a socket object. If there are outstanding operations, the object 
//    is not freed.  
// 
void FreeSocketObj(SOCKET_OBJ *obj) 
{ 
    BUFFER_OBJ  *ptr=NULL, 
                *tmp=NULL; 
 
    if (obj->OutstandingOps != 0) 
    { 
        // Still outstanding operations so just return 
        return; 
    } 
    // Close the socket if it hasn't already been closed 
    if (obj->s != INVALID_SOCKET) 
    { 
        closesocket(obj->s); 
        obj->s = INVALID_SOCKET; 
    } 
 
    DeleteCriticalSection(&obj->SockCritSec); 
 
    HeapFree(GetProcessHeap(), 0, obj); 
} 
 
// 
// Function: ValidateArgs 
// 
// Description: 
//      Parses the command line arguments and sets up some global  
//      variables. 
// 
void ValidateArgs(int argc, char **argv) 
{ 
    int     i; 
 
    for(i=1; i < argc ;i++) 
    { 
        if (((argv[i][0] != '/') && (argv[i][0] != '-')) || (strlen(argv[i]) < 2)) 
            usage(argv[0]); 
        else 
        { 
            switch (tolower(argv[i][1])) 
            { 
                case 'a':               // address family - IPv4 or IPv6 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    if (argv[i+1][0] == '4') 
                        gAddressFamily = AF_INET; 
                    else if (argv[i+1][0] == '6') 
                        gAddressFamily = AF_INET6; 
                    else 
                        usage(argv[0]); 
                    i++; 
                    break; 
                case 'b':               // buffer size for send/recv 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    gBufferSize = atol(argv[++i]); 
                    break; 
                case 'e':               // endpoint - port number 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    gBindPort = argv[++i]; 
                    break; 
                case 'l':               // local address for binding 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    gBindAddr = argv[++i]; 
                    break; 
                case 'o':               // overlapped count 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    gOverlappedCount = atol(argv[++i]); 
                    break; 
                case 'p':               // protocol - TCP or UDP 
                    if (i+1 >= argc) 
                        usage(argv[0]); 
                    if (_strnicmp(argv[i+1], "tcp", 3) == 0) 
                    { 
                        gProtocol = IPPROTO_TCP; 
                        gSocketType = SOCK_STREAM; 
                    } 
                    else if (_strnicmp(argv[i+1], "udp", 3) == 0) 
                    { 
                        gProtocol = IPPROTO_UDP; 
                        gSocketType = SOCK_DGRAM; 
                    } 
                    else 
                        usage(argv[0]); 
                    i++; 
                    break; 
                default: 
                    usage(argv[0]); 
                    break; 
            } 
        } 
    } 
} 
 
// 
// Function: PrintStatistics 
// 
// Description: 
//    Print the send/recv statistics for the server 
// 
void PrintStatistics() 
{ 
    ULONG       bps, tick, elapsed; 
 
    tick = GetTickCount(); 
 
    elapsed = (tick - gStartTime) / 1000; 
 
    if (elapsed == 0) 
        return; 
 
    printf("\n"); 
 
    // Calculate average bytes per second 
    bps = gBytesSent / elapsed; 
    printf("Average BPS sent: %lu [%lu]\n", bps, gBytesSent); 
 
    bps = gBytesRead / elapsed; 
    printf("Average BPS read: %lu [%lu]\n", bps, gBytesRead); 
 
    elapsed = (tick - gStartTimeLast) / 1000; 
 
    if (elapsed == 0) 
        return; 
 
    // Calculate bytes per second over the last X seconds 
    bps = gBytesSentLast / elapsed; 
    printf("Current BPS sent: %lu\n", bps); 
 
    bps = gBytesReadLast / elapsed; 
    printf("Current BPS read: %lu\n", bps); 
 
    InterlockedExchange(&gBytesSentLast, 0); 
    InterlockedExchange(&gBytesReadLast, 0); 
 
    gStartTimeLast = tick; 
} 
 
// 
// Function: PostRecv 
//  
// Description:  
//    Post an overlapped receive operation on the socket. 
// 
int PostRecv(SOCKET_OBJ *sock, BUFFER_OBJ *recvobj) 
{ 
    WSABUF  wbuf; 
    DWORD   bytes, 
            flags; 
    int     rc; 
 
 
    recvobj->operation = OP_READ; 
 
    wbuf.buf = recvobj->buf; 
    wbuf.len = recvobj->buflen; 
 
    flags = 0; 
 
    EnterCriticalSection(&sock->SockCritSec); 
 
    // Assign the IO order to this receive. This must be performned within 
    //    the critical section. The operation of assigning the IO count and posting 
    //    the receive cannot be interupted. 
    recvobj->IoOrder = sock->IoCountIssued; 
    sock->IoCountIssued++; 
 
    if (gProtocol == IPPROTO_TCP) 
    { 
        rc = WSARecv( 
                sock->s, 
               &wbuf, 
                1, 
               &bytes, 
               &flags, 
               &recvobj->ol, 
                NULL 
                ); 
    } 
    else 
    { 
        rc = WSARecvFrom( 
                sock->s, 
               &wbuf, 
                1, 
               &bytes, 
               &flags, 
                (SOCKADDR *)&recvobj->addr, 
               &recvobj->addrlen, 
               &recvobj->ol, 
                NULL 
                ); 
    } 
 
    LeaveCriticalSection(&sock->SockCritSec); 
 
    if (rc == SOCKET_ERROR) 
    { 
        if (WSAGetLastError() != WSA_IO_PENDING) 
        { 
            dbgprint("PostRecv: WSARecv* failed: %d\n", WSAGetLastError()); 
            return SOCKET_ERROR; 
        } 
    } 
 
    // Increment outstanding overlapped operations 
    InterlockedIncrement(&sock->OutstandingOps); 
 
    return NO_ERROR; 
} 
 
// 
// Function: PostSend 
//  
// Description: 
//    Post an overlapped send operation on the socket. 
// 
int PostSend(SOCKET_OBJ *sock, BUFFER_OBJ *sendobj) 
{ 
    WSABUF  wbuf; 
    DWORD   bytes; 
    int     rc; 
 
    sendobj->operation = OP_WRITE; 
 
    wbuf.buf = sendobj->buf; 
    wbuf.len = sendobj->buflen; 
 
    EnterCriticalSection(&sock->SockCritSec); 
 
    // Incrmenting the last send issued and issuing the send should not be 
    //    interuptable. 
    sock->LastSendIssued++; 
 
    if (gProtocol == IPPROTO_TCP) 
    { 
        rc = WSASend( 
                sock->s, 
               &wbuf, 
                1, 
               &bytes, 
                0, 
               &sendobj->ol, 
                NULL 
                ); 
    } 
    else 
    { 
        rc = WSASendTo( 
                sock->s, 
               &wbuf, 
                1, 
               &bytes, 
                0, 
                (SOCKADDR *)&sendobj->addr, 
                sendobj->addrlen, 
               &sendobj->ol, 
                NULL 
                ); 
    } 
 
    LeaveCriticalSection(&sock->SockCritSec); 
 
    if (rc == SOCKET_ERROR) 
    { 
        if (WSAGetLastError() != WSA_IO_PENDING) 
        { 
            dbgprint("PostSend: WSASend* failed: %d\n", WSAGetLastError()); 
            return SOCKET_ERROR; 
        } 
    } 
 
    // Increment the outstanding operation count 
    InterlockedIncrement(&sock->OutstandingOps); 
 
    return NO_ERROR; 
} 
 
// 
// Function: PostAccept 
//  
// Description: 
//    Post an overlapped accept on a listening socket. 
// 
int PostAccept(SOCKET_OBJ *sock, BUFFER_OBJ *acceptobj) 
{ 
    DWORD   bytes; 
    int     rc; 
 
    acceptobj->operation = OP_ACCEPT; 
 
    // Create the client socket for an incoming connection 
    acceptobj->sclient = socket(sock->af, SOCK_STREAM, IPPROTO_TCP); 
    if (acceptobj->sclient == INVALID_SOCKET) 
    { 
        fprintf(stderr, "PostAccept: socket failed: %d\n", WSAGetLastError()); 
        return -1; 
    } 
 
    rc = sock->lpfnAcceptEx( 
            sock->s, 
            acceptobj->sclient, 
            acceptobj->buf, 
            acceptobj->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2), 
            sizeof(SOCKADDR_STORAGE) + 16, 
            sizeof(SOCKADDR_STORAGE) + 16, 
           &bytes, 
           &acceptobj->ol 
            ); 
    if (rc == FALSE) 
    { 
        if (WSAGetLastError() != WSA_IO_PENDING) 
        { 
            dbgprint("PostAccept: AcceptEx failed: %d\n", 
                    WSAGetLastError()); 
            return SOCKET_ERROR; 
        } 
    } 
 
    // Increment the outstanding overlapped count for this socket 
    InterlockedIncrement(&sock->OutstandingOps); 
 
    return NO_ERROR; 
} 
 
// 
// Function: InsertPendingSend 
//  
// Description: 
//    This routine inserts a send buffer object into the socket's list 
//    of out of order sends. The routine DoSends will go through this 
//    list to issue those sends that are in the correct order. 
// 
void InsertPendingSend(SOCKET_OBJ *sock, BUFFER_OBJ *send) 
{ 
    BUFFER_OBJ *ptr=NULL, 
               *prev=NULL; 
 
 
    EnterCriticalSection(&sock->SockCritSec); 
 
    send->next = NULL; 
 
    // This loop finds the place to put the send within the list. 
    //    The send list is in the same order as the receives were 
    //    posted. 
    ptr = sock->OutOfOrderSends; 
    while (ptr) 
    { 
        if (send->IoOrder < ptr->IoOrder) 
        { 
            break; 
        } 
 
        prev = ptr; 
        ptr = ptr->next; 
    } 
    if (prev == NULL) 
    { 
        // Inserting at head 
        sock->OutOfOrderSends = send; 
        send->next = ptr; 
    } 
    else 
    { 
        // Insertion somewhere in the middle 
        prev->next = send; 
        send->next = ptr; 
    } 
 
    LeaveCriticalSection(&sock->SockCritSec); 
} 
 
// 
// Function: DoSends 
// 
// Description: 
//    This routine goes through a socket object's list of out of order send 
//    buffers and sends as many of them up to the current send count. For each 
//    send posted, the LastSendIssued is incremented. This means that the next 
//    buffer sent must have an IO sequence nubmer equal to the LastSendIssued. 
//    This is to preserve the order of data echoed back. 
// 
int DoSends(SOCKET_OBJ *sock) 
{ 
    BUFFER_OBJ *sendobj=NULL; 
    int         ret; 
 
    ret = NO_ERROR; 
 
    EnterCriticalSection(&sock->SockCritSec); 
 
    sendobj = sock->OutOfOrderSends; 
    while ((sendobj) && (sendobj->IoOrder == sock->LastSendIssued)) 
    { 
        if (PostSend(sock, sendobj) != NO_ERROR) 
        { 
            FreeBufferObj(sendobj); 
             
            ret = SOCKET_ERROR; 
            break; 
        } 
        sock->OutOfOrderSends = sendobj = sendobj->next; 
    } 
 
    LeaveCriticalSection(&sock->SockCritSec); 
 
    return ret; 
} 
 
// 
// Function: HandleIo 
// 
// Description: 
//    This function handles the IO on a socket. In the event of a receive, the  
//    completed receive is posted again. For completed accepts, another AcceptEx 
//    is posted. For completed sends, the buffer is freed. 
// 
void HandleIo(SOCKET_OBJ *sock, BUFFER_OBJ *buf, HANDLE CompPort, DWORD BytesTransfered, DWORD error) 
{ 
    SOCKET_OBJ *clientobj=NULL;     // New client object for accepted connections 
    BUFFER_OBJ *recvobj=NULL,       // Used to post new receives on accepted connections 
               *sendobj=NULL;       // Used to post new sends for data received 
    BOOL        bCleanupSocket; 
    char       *tmp; 
    int         i; 
	Msg *RecvMsg = 0; 
 
    if (error != 0) 
        dbgprint("OP = %d; Error = %d\n", buf->operation, error); 
 
    bCleanupSocket = FALSE; 
 
    if ((error != NO_ERROR) && (gProtocol == IPPROTO_TCP)) 
    { 
        // An error occured on a TCP socket, free the associated per I/O buffer 
        // and see if there are any more outstanding operations. If so we must 
        // wait until they are complete as well. 
        // 
        FreeBufferObj(buf); 
 
        if (InterlockedDecrement(&sock->OutstandingOps) == 0) 
        { 
            dbgprint("Freeing socket obj in GetOverlappedResult\n"); 
            FreeSocketObj(sock); 
        } 
        return; 
    } 
 
    EnterCriticalSection(&sock->SockCritSec); 
    if (buf->operation == OP_ACCEPT) 
    { 
        HANDLE            hrc; 
        SOCKADDR_STORAGE *LocalSockaddr=NULL, 
                         *RemoteSockaddr=NULL; 
        int               LocalSockaddrLen, 
                          RemoteSockaddrLen; 
 
        // Update counters 
        InterlockedExchangeAdd(&gBytesRead, BytesTransfered); 
        InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered); 
 
        // Print the client's addresss 
        sock->lpfnGetAcceptExSockaddrs( 
                buf->buf, 
                buf->buflen - ((sizeof(SOCKADDR_STORAGE) + 16) * 2), 
                sizeof(SOCKADDR_STORAGE) + 16, 
                sizeof(SOCKADDR_STORAGE) + 16, 
                (SOCKADDR **)&LocalSockaddr, 
               &LocalSockaddrLen, 
                (SOCKADDR **)&RemoteSockaddr, 
               &RemoteSockaddrLen 
                ); 
 
        /* 
        printf("Received connection from: "); 
        PrintAddress((SOCKADDR *)RemoteSockaddr, RemoteSockaddrLen); 
        printf("\n"); 
        */ 
 
        // Get a new SOCKET_OBJ for the client connection 
        clientobj = GetSocketObj(buf->sclient, sock->af); 
 
        // Associate the new connection to our completion port 
        hrc = CreateIoCompletionPort( 
                (HANDLE)buf->sclient, 
                CompPort, 
                (ULONG_PTR)clientobj, 
                0 
                ); 
        if (hrc == NULL) 
        { 
            fprintf(stderr, "CompletionThread: CreateIoCompletionPort failed: %d\n", 
                    GetLastError()); 
            return; 
        } 
 
        // Get a BUFFER_OBJ to echo the data received with the accept back to the client 
        sendobj = GetBufferObj(clientobj, BytesTransfered); 
 
        // Copy the buffer to the sending object 
        memcpy(sendobj->buf, buf->buf, BytesTransfered); 
 
		RecvMsg = GetMsgObject(clientobj->s, BytesTransfered); 
		memcpy(RecvMsg->buf, buf->buf, BytesTransfered); 
EnterCriticalSection(&gRecvCS); 
		gRecvMsg.push_back(RecvMsg); 
LeaveCriticalSection(&gRecvCS); 
        // Post the send 
        if (PostSend(clientobj, sendobj) == NO_ERROR) 
        { 
            // Now post some receives on this new connection 
            for(i=0; i < gOverlappedCount ;i++) 
            { 
                recvobj = GetBufferObj(clientobj, gBufferSize); 
 
                if (PostRecv(clientobj, recvobj) != NO_ERROR) 
                { 
                    // If for some reason the send call fails, clean up the connection 
                    FreeBufferObj(recvobj); 
                    error = SOCKET_ERROR; 
                    break; 
                } 
            } 
        } 
        else 
        { 
            // If for some reason the send call fails, clean up the connection 
            FreeBufferObj(sendobj); 
            error = SOCKET_ERROR; 
        } 
         
        // Re-post the AcceptEx 
        PostAccept(sock, buf); 
 
		if (error != NO_ERROR) 
		{ 
            if (clientobj->OutstandingOps == 0) 
            { 
                closesocket(clientobj->s); 
                clientobj->s = INVALID_SOCKET; 
                FreeSocketObj(clientobj); 
            } 
            else 
            { 
                clientobj->bClosing = TRUE; 
            } 
            error = NO_ERROR; 
		} 
    } 
    else if ((buf->operation == OP_READ) && (error == NO_ERROR)) 
    { 
        // 
        // Receive completed successfully 
        // 
        if ((BytesTransfered > 0) || (gProtocol == IPPROTO_UDP)) 
        { 
            InterlockedExchangeAdd(&gBytesRead, BytesTransfered); 
            InterlockedExchangeAdd(&gBytesReadLast, BytesTransfered); 
 
            // Create a buffer to send 
            sendobj = GetBufferObj(sock, gBufferSize); 
 
            if (gProtocol == IPPROTO_UDP) 
            { 
                memcpy(&sendobj->addr, &buf->addr, buf->addrlen); 
            } 
		RecvMsg = GetMsgObject(sock->s, BytesTransfered); 
		memcpy(RecvMsg->buf, buf->buf, BytesTransfered); 
EnterCriticalSection(&gRecvCS); 
		gRecvMsg.push_back(RecvMsg); 
LeaveCriticalSection(&gRecvCS); 
            // Swap the buffers (i.e. buffer we just received becomes the send buffer) 
            tmp              = sendobj->buf; 
            sendobj->buflen  = BytesTransfered; 
            sendobj->buf     = buf->buf; 
            sendobj->IoOrder = buf->IoOrder; 
 
            buf->buf    = tmp; 
            buf->buflen = gBufferSize; 
 
            InsertPendingSend(sock, sendobj); 
 
            if (DoSends(sock) != NO_ERROR) 
            { 
                error = SOCKET_ERROR; 
            } 
            else 
            { 
                // Post another receive 
                if (PostRecv(sock, buf) != NO_ERROR) 
                { 
                    // In the event the recv fails, clean up the connection 
                    FreeBufferObj(buf); 
                    error = SOCKET_ERROR; 
                } 
            } 
        } 
        else 
        { 
            dbgprint("Got 0 byte receive\n"); 
 
            // Graceful close - the receive returned 0 bytes read 
            sock->bClosing = TRUE; 
 
            // Free the receive buffer 
            FreeBufferObj(buf); 
 
            if (DoSends(sock) != NO_ERROR) 
            { 
                dbgprint("0: cleaning up in zero byte handler\n"); 
                error = SOCKET_ERROR; 
            } 
 
            // If this was the last outstanding operation on socket, clean it up 
            if ((sock->OutstandingOps == 0) && (sock->OutOfOrderSends == NULL)) 
            { 
                dbgprint("1: cleaning up in zero byte handler\n"); 
                bCleanupSocket = TRUE; 
            } 
        } 
    } 
    else if ((buf->operation == OP_READ) && (error != NO_ERROR) && (gProtocol == IPPROTO_UDP)) 
    { 
        // If for UDP, a receive completes with an error, we ignore it and re-post the recv 
        if (PostRecv(sock, buf) != NO_ERROR) 
        { 
            error = SOCKET_ERROR; 
        } 
    } 
    else if (buf->operation == OP_WRITE) 
    { 
        // Update the counters 
        InterlockedExchangeAdd(&gBytesSent, BytesTransfered); 
        InterlockedExchangeAdd(&gBytesSentLast, BytesTransfered); 
 
        FreeBufferObj(buf); 
 
        if (DoSends(sock) != NO_ERROR) 
        { 
            dbgprint("Cleaning up inside OP_WRITE handler\n"); 
            error = SOCKET_ERROR; 
        } 
    } 
 
    if (error != NO_ERROR) 
    { 
        sock->bClosing = TRUE; 
    } 
 
    // 
    // Check to see if socket is closing 
    // 
    if ( (InterlockedDecrement(&sock->OutstandingOps) == 0) && 
         (sock->bClosing) && 
         (sock->OutOfOrderSends == NULL) ) 
    { 
        bCleanupSocket = TRUE; 
    } 
    else 
    { 
        if (DoSends(sock) != NO_ERROR) 
        { 
            bCleanupSocket = TRUE; 
        } 
    } 
 
    LeaveCriticalSection(&sock->SockCritSec); 
 
    if (bCleanupSocket) 
    { 
        closesocket(sock->s); 
        sock->s = INVALID_SOCKET; 
 
        FreeSocketObj(sock); 
    } 
 
    return; 
} 
 
// 
// Function: CompletionThread 
//  
// Description: 
//    This is the completion thread which services our completion port. One of 
//    these threads is created per processor on the system. The thread sits in  
//    an infinite loop calling GetQueuedCompletionStatus and handling socket 
//    IO that completed. 
// 
DWORD WINAPI CompletionThread(LPVOID lpParam) 
{ 
    SOCKET_OBJ  *sockobj=NULL;          // Per socket object for completed I/O 
    BUFFER_OBJ  *bufobj=NULL;           // Per I/O object for completed I/O 
    OVERLAPPED  *lpOverlapped=NULL;     // Pointer to overlapped structure for completed I/O 
    HANDLE       CompletionPort;        // Completion port handle 
    DWORD        BytesTransfered,       // Number of bytes transfered 
                 Flags;                 // Flags for completed I/O 
    int          rc,  
                 error; 
 
    CompletionPort = (HANDLE)lpParam; 
    while (1) 
    { 
        error = NO_ERROR; 
        rc = GetQueuedCompletionStatus( 
                CompletionPort, 
               &BytesTransfered, 
                (PULONG_PTR)&sockobj, 
               &lpOverlapped, 
                INFINITE 
                ); 
 
        bufobj = CONTAINING_RECORD(lpOverlapped, BUFFER_OBJ, ol); 
 
        if (rc == FALSE) 
        { 
            // If the call fails, call WSAGetOverlappedResult to translate the 
            //    error code into a Winsock error code. 
            dbgprint("CompletionThread: GetQueuedCompletionStatus failed: %d\n", 
                    GetLastError()); 
            rc = WSAGetOverlappedResult( 
                    sockobj->s, 
                   &bufobj->ol, 
                   &BytesTransfered, 
                    FALSE, 
                   &Flags 
                    ); 
            if (rc == FALSE) 
            { 
                error = WSAGetLastError(); 
            } 
        } 
        // Handle the IO operation 
        HandleIo(sockobj, bufobj, CompletionPort, BytesTransfered, error); 
    } 
 
    ExitThread(0); 
    return 0; 
} 
 
// 
// Function: main 
// 
// Description: 
//      This is the main program. It parses the command line and creates 
//      the main socket. For UDP this socket is used to receive datagrams. 
//      For TCP the socket is used to accept incoming client connections. 
//      Each client TCP connection is handed off to a worker thread which 
//      will receive any data on that connection until the connection is 
//      closed. 
// 
int __cdecl main(int argc, char **argv) 
{ 
    WSADATA          wsd; 
    SYSTEM_INFO      sysinfo; 
    SOCKET_OBJ      *sockobj=NULL, 
                    *ListenSockets=NULL; 
    HANDLE           CompletionPort, 
                     CompThreads[MAX_COMPLETION_THREAD_COUNT], 
                     hrc; 
    int              endpointcount=0, 
                     interval, 
                     rc, 
                     i; 
    struct addrinfo *res=NULL, 
                    *ptr=NULL; 
    CMsgAnalyse theMsg; 
 
	mpc.CreateInstance(__uuidof(Connection)); 
	try                  
	{	 
		mpc->Open("driver={SQL Server};Server=192.168.0.1;DATABASE=data;UID=sa;PWD=qjqj","","",adModeUnknown); 
	} 
	catch(_com_error e) 
	{ 
		printf("数据库连接失败,请联系网络或数据库管理人员查看相应连接与设置!\n"); 
		return FALSE; 
	}  
	mpr.CreateInstance(__uuidof(Recordset)); 
 
 
	InitializeCriticalSection(&gRecvCS); 
    // Validate the command line 
    ValidateArgs(argc, argv); 
 
    // Load Winsock 
    if (WSAStartup(MAKEWORD(2,2), &wsd) != 0) 
    { 
        fprintf(stderr, "unable to load Winsock!\n"); 
        return -1; 
    } 
 
    // Create the completion port used by this server 
    CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)NULL, 0); 
    if (CompletionPort == NULL) 
    { 
        fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError()); 
        return -1; 
    } 
 
    // Find out how many processors are on this system 
    GetSystemInfo(&sysinfo); 
 
    if (sysinfo.dwNumberOfProcessors > MAX_COMPLETION_THREAD_COUNT) 
    { 
        sysinfo.dwNumberOfProcessors = MAX_COMPLETION_THREAD_COUNT; 
    } 
     
    // Create the worker threads to service the completion notifications 
    for(i=0; i < (int)sysinfo.dwNumberOfProcessors ;i++) 
    { 
        CompThreads[i] = CreateThread(NULL, 0, CompletionThread, (LPVOID)CompletionPort, 0, NULL); 
        if (CompThreads[i] == NULL) 
        { 
            fprintf(stderr, "CreatThread failed: %d\n", GetLastError()); 
            return -1; 
        } 
    } 
	theMsg.StartThread(); 
 
    printf("Local address: %s; Port: %s; Family: %d\n", 
            gBindAddr, gBindPort, gAddressFamily); 
 
    res = ResolveAddress(gBindAddr, gBindPort, gAddressFamily, gSocketType, gProtocol); 
    if (res == NULL) 
    { 
        fprintf(stderr, "ResolveAddress failed to return any addresses!\n"); 
        return -1; 
    } 
 
    // For each local address returned, create a listening/receiving socket 
    ptr = res; 
    while (ptr) 
    { 
        PrintAddress(ptr->ai_addr, ptr->ai_addrlen); printf("\n"); 
 
        sockobj = GetSocketObj(INVALID_SOCKET, ptr->ai_family); 
 
        // create the socket 
        sockobj->s = socket(ptr->ai_family, ptr->ai_socktype, ptr->ai_protocol); 
        if (sockobj->s == INVALID_SOCKET) 
        { 
            fprintf(stderr,"socket failed: %d\n", WSAGetLastError()); 
            return -1; 
        } 
 
        // Associate the socket and its SOCKET_OBJ to the completion port 
        hrc = CreateIoCompletionPort((HANDLE)sockobj->s, CompletionPort, (ULONG_PTR)sockobj, 0); 
        if (hrc == NULL) 
        { 
            fprintf(stderr, "CreateIoCompletionPort failed: %d\n", GetLastError()); 
            return -1; 
        } 
 
        // bind the socket to a local address and port 
        rc = bind(sockobj->s, ptr->ai_addr, ptr->ai_addrlen); 
        if (rc == SOCKET_ERROR) 
        { 
            fprintf(stderr, "bind failed: %d\n", WSAGetLastError()); 
            return -1; 
        } 
 
        if (gProtocol == IPPROTO_TCP) 
        { 
            BUFFER_OBJ *acceptobj=NULL; 
            GUID        guidAcceptEx = WSAID_ACCEPTEX, 
                        guidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS; 
            DWORD       bytes; 
 
            // Need to load the Winsock extension functions from each provider 
            //    -- e.g. AF_INET and AF_INET6.  
            rc = WSAIoctl( 
                    sockobj->s, 
                    SIO_GET_EXTENSION_FUNCTION_POINTER, 
                   &guidAcceptEx, 
                    sizeof(guidAcceptEx), 
                   &sockobj->lpfnAcceptEx, 
                    sizeof(sockobj->lpfnAcceptEx), 
                   &bytes, 
                    NULL, 
                    NULL 
                    ); 
            if (rc == SOCKET_ERROR) 
            { 
                fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER failed: %d\n", 
                        WSAGetLastError()); 
                return -1; 
            } 
            rc = WSAIoctl( 
                    sockobj->s, 
                    SIO_GET_EXTENSION_FUNCTION_POINTER, 
                   &guidGetAcceptExSockaddrs, 
                    sizeof(guidGetAcceptExSockaddrs), 
                   &sockobj->lpfnGetAcceptExSockaddrs, 
                    sizeof(sockobj->lpfnGetAcceptExSockaddrs), 
                   &bytes, 
                    NULL, 
                    NULL 
                    ); 
            if (rc == SOCKET_ERROR) 
            { 
                fprintf(stderr, "WSAIoctl: SIO_GET_EXTENSION_FUNCTION_POINTER faled: %d\n", 
                        WSAGetLastError()); 
                return -1; 
            } 
 
            // For TCP sockets, we need to "listen" on them 
            rc = listen(sockobj->s, 100); 
            if (rc == SOCKET_ERROR) 
            { 
                fprintf(stderr, "listen failed: %d\n", WSAGetLastError()); 
                return -1; 
            } 
 
            // Keep track of the pending AcceptEx operations 
            sockobj->PendingAccepts = (BUFFER_OBJ **)HeapAlloc( 
                    GetProcessHeap(),  
                    HEAP_ZERO_MEMORY,  
                    (sizeof(BUFFER_OBJ *) * gOverlappedCount)); 
            if (sockobj->PendingAccepts == NULL) 
            { 
                fprintf(stderr, "HeapAlloc failed: %d\n", GetLastError()); 
                ExitProcess(-1); 
            } 
 
            // Post the AcceptEx(s) 
            for(i=0; i < gOverlappedCount ;i++) 
            { 
                sockobj->PendingAccepts[i] = acceptobj = GetBufferObj(sockobj, gBufferSize); 
 
                PostAccept(sockobj, acceptobj); 
            } 
            // 
            // Maintain a list of the listening socket structures 
            // 
            if (ListenSockets == NULL) 
            { 
                ListenSockets = sockobj; 
            } 
            else 
            { 
                sockobj->next = ListenSockets; 
                ListenSockets = sockobj; 
            } 
        } 
        else 
        { 
            BUFFER_OBJ *recvobj=NULL; 
            DWORD       bytes; 
            int         optval; 
 
            // Turn off UDP errors resulting from ICMP messages (port/host unreachable, etc) 
            optval = 0; 
            rc = WSAIoctl( 
                    sockobj->s, 
                    SIO_UDP_CONNRESET, 
                   &optval, 
                    sizeof(optval), 
                    NULL, 
                    0, 
                   &bytes, 
                    NULL, 
                    NULL 
                    ); 
            if (rc == SOCKET_ERROR) 
            { 
                fprintf(stderr, "WSAIoctl: SIO_UDP_CONNRESET failed: %d\n",  
                        WSAGetLastError()); 
            } 
 
            // For UDP, simply post some receives 
            for(i=0; i < gOverlappedCount ;i++) 
            { 
                recvobj = GetBufferObj(sockobj, gBufferSize); 
 
                PostRecv(sockobj, recvobj); 
            } 
        } 
 
        endpointcount++; 
        ptr = ptr->ai_next; 
    } 
    // free the addrinfo structure for the 'bind' address 
    freeaddrinfo(res); 
 
    gStartTime = gStartTimeLast = GetTickCount(); 
 
    interval = 0; 
    while (1) 
    { 
        rc = WSAWaitForMultipleEvents( 
                sysinfo.dwNumberOfProcessors, 
                CompThreads, 
                TRUE, 
                5000, 
                FALSE 
                ); 
        if (rc == WAIT_FAILED) 
        { 
            fprintf(stderr, "WSAWaitForMultipleEvents failed: %d\n", WSAGetLastError()); 
            break; 
        } 
        else if (rc == WAIT_TIMEOUT) 
        { 
            interval++; 
 
            PrintStatistics(); 
 
            if (interval == 12) 
            { 
                SOCKET_OBJ  *listenptr=NULL; 
                int          optval, 
                optlen; 
 
                // For TCP, cycle through all the outstanding AcceptEx operations 
                //   to see if any of the client sockets have been connected but 
                //   haven't received any data. If so, close them as they could be 
                //   a denial of service attack. 
                listenptr = ListenSockets; 
                while (listenptr) 
                { 
					int kkook = 0; 
                    for(kkook=0; kkook < gOverlappedCount ;kkook++) 
                    { 
						if( listenptr->PendingAccepts[kkook]->sclient != NULL) 
						{ 
							optlen = sizeof(optval); 
							rc = getsockopt( 
									listenptr->PendingAccepts[kkook]->sclient, 
									SOL_SOCKET, 
									SO_CONNECT_TIME, 
									(char *)&optval, 
									&optlen 
										   ); 
							if (rc == SOCKET_ERROR) 
							{ 
								fprintf(stderr, "getsockopt: SO_CONNECT_TIME failed: %d\n", 
										WSAGetLastError()); 
								return -1; 
							} 
							// If the socket has been connected for more than 5 minutes, 
							//    close it. If closed, the AcceptEx call will fail in the 
							//    completion thread. 
							if ((optval != 0xFFFFFFFF) && (optval > 300)) 
							{ 
								closesocket(listenptr->PendingAccepts[i]->sclient); 
							} 
						} 
                    } 
                    listenptr = listenptr->next; 
                } 
                interval = 0; 
            } 
        } 
    } 
 
    WSACleanup(); 
	DeleteCriticalSection(&gRecvCS); 
    return 0; 
}