www.pudn.com > UDP-based_Reliable_Data_Transfer_Library.zip > api.cpp


/*****************************************************************************
Copyright © 2001 - 2006, The Board of Trustees of the University of Illinois.
All Rights Reserved.

UDP-based Data Transfer Library (UDT) version 3

Laboratory for Advanced Computing (LAC)
National Center for Data Mining (NCDM)
University of Illinois at Chicago
http://www.lac.uic.edu/

This library is free software; you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or (at
your option) any later version.

This library is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this library; if not, write to the Free Software Foundation, Inc.,
59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
*****************************************************************************/

/*****************************************************************************
This file contains the implementation of UDT API.

reference: UDT programming manual and socket programming reference
*****************************************************************************/

/*****************************************************************************
written by
   Yunhong Gu [gu@lac.uic.edu], last updated 03/14/2006
*****************************************************************************/

#ifndef WIN32
   #include 
#else
   #include 
   #include 
#endif
#include "api.h"
#include "core.h"


CUDTSocket::CUDTSocket():
m_pSelfAddr(NULL),
m_pPeerAddr(NULL),
m_pUDT(NULL),
m_pQueuedSockets(NULL),
m_pAcceptSockets(NULL)
{
   #ifndef WIN32
      pthread_mutex_init(&m_AcceptLock, NULL);
      pthread_cond_init(&m_AcceptCond, NULL);
   #else
      m_AcceptLock = CreateMutex(NULL, false, NULL);
      m_AcceptCond = CreateEvent(NULL, false, false, NULL);
   #endif
}

CUDTSocket::~CUDTSocket()
{
   if (AF_INET == m_iIPversion)
   {
      if (m_pSelfAddr)
         delete (sockaddr_in*)m_pSelfAddr;
      if (m_pPeerAddr)
         delete (sockaddr_in*)m_pPeerAddr;
   }
   else
   {
      if (m_pSelfAddr)
         delete (sockaddr_in6*)m_pSelfAddr;
      if (m_pPeerAddr)
         delete (sockaddr_in6*)m_pPeerAddr;
   }

   if (m_pUDT)
      delete m_pUDT;

   if (m_pQueuedSockets)
      delete m_pQueuedSockets;
   if (m_pAcceptSockets)
      delete m_pAcceptSockets;

   #ifndef WIN32
      pthread_mutex_destroy(&m_AcceptLock);
      pthread_cond_destroy(&m_AcceptCond);
   #else
      CloseHandle(m_AcceptLock);
      CloseHandle(m_AcceptCond);
   #endif
}

////////////////////////////////////////////////////////////////////////////////

CUDTUnited::CUDTUnited():
m_SocketID(1 << 30)
{
   #ifndef WIN32
      pthread_mutex_init(&m_ControlLock, NULL);
      pthread_mutex_init(&m_IDLock, NULL);
   #else
      m_ControlLock = CreateMutex(NULL, false, NULL);
      m_IDLock = CreateMutex(NULL, false, NULL);
   #endif

   #ifndef WIN32
      pthread_key_create(&m_TLSError, TLSDestroy);
   #else
      m_TLSError = TlsAlloc();
   #endif

   // Global initialization code
   #ifdef WIN32
      WORD wVersionRequested;
      WSADATA wsaData;
      wVersionRequested = MAKEWORD(2, 2);

      if (0 != WSAStartup(wVersionRequested, &wsaData))
         throw CUDTException(1, 0,  WSAGetLastError());
   #endif
}

CUDTUnited::~CUDTUnited()
{
   #ifndef WIN32
      pthread_mutex_destroy(&m_ControlLock);
      pthread_mutex_destroy(&m_IDLock);
   #else
      CloseHandle(m_ControlLock);
      CloseHandle(m_IDLock);
   #endif

   #ifndef WIN32
      pthread_key_delete(m_TLSError);
   #else
      TlsFree(m_TLSError);
   #endif

   // Global destruction code
   #ifdef WIN32
      WSACleanup();
   #endif
}

UDTSOCKET CUDTUnited::newSocket(const __int32& af, const __int32& type)
{
   // garbage collection before a new socket is created
   checkBrokenSockets();

   if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))
      throw CUDTException(5, 3, 0);

   CUDTSocket* ns = NULL;

   try
   {
      ns = new CUDTSocket;
      ns->m_pUDT = new CUDT;
      if (AF_INET == af)
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
      else
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
   }
   catch (...)
   {
      delete ns;
      throw CUDTException(3, 2, 0);
   }

   #ifndef WIN32
      pthread_mutex_lock(&m_IDLock);
   #else
      WaitForSingleObject(m_IDLock, INFINITE);
   #endif
   ns->m_Socket = -- m_SocketID;
   #ifndef WIN32
      pthread_mutex_unlock(&m_IDLock);
   #else
      ReleaseMutex(m_IDLock);
   #endif

   ns->m_Status = CUDTSocket::INIT;
   ns->m_ListenSocket = 0;
   ns->m_pUDT->m_SocketID = ns->m_Socket;
   ns->m_pUDT->m_iSockType = type;
   ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;

   // protect the m_Sockets structure.
   #ifndef WIN32
      pthread_mutex_lock(&m_ControlLock);
   #else
      WaitForSingleObject(m_ControlLock, INFINITE);
   #endif
   try
   {
      m_Sockets[ns->m_Socket] = ns;
   }
   catch (...)
   {
      //failure and rollback
      delete ns;
      ns = NULL;
   }
   #ifndef WIN32
      pthread_mutex_unlock(&m_ControlLock);
   #else
      ReleaseMutex(m_ControlLock);
   #endif

   if (NULL == ns)
      throw CUDTException(3, 2, 0);

   return ns->m_Socket;
}

int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs)
{
   // garbage collection before a new socket is created
   checkBrokenSockets();

   CUDTSocket* ns;
   CUDTSocket* ls = locate(listen);

   // if this connection has already been processed
   if (NULL != (ns = locate(listen, peer)))
   {
      if (ns->m_pUDT->m_bBroken)
      {
         // last connection from the "peer" address has been broken
         ns->m_Status = CUDTSocket::CLOSED;
         gettimeofday(&ns->m_TimeStamp, 0);

         #ifndef WIN32
            pthread_mutex_lock(&(ls->m_AcceptLock));
         #else
            WaitForSingleObject(ls->m_AcceptLock, INFINITE);
         #endif
         ls->m_pQueuedSockets->erase(ns->m_Socket);
         ls->m_pAcceptSockets->erase(ns->m_Socket);
         #ifndef WIN32
            pthread_mutex_unlock(&(ls->m_AcceptLock));
         #else
            ReleaseMutex(ls->m_AcceptLock);
         #endif
      }
      else if (hs->m_iISN == ns->m_pUDT->m_iPeerISN)
      {
         // connection already exist, this is a repeated connection request
         // pass the hand shake packet to the UDT entity
         CPacket cr;
         cr.pack(0, NULL, hs, sizeof(CHandShake));

         ns->m_pUDT->processCtrl(cr);

         return 0;

         //except for this situation a new connection should be started
      }
   }

   // exceeding backlog, refuse the connection request
   if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog)
      return -1;

   try
   {
      ns = new CUDTSocket;
      ns->m_pUDT = new CUDT(*(ls->m_pUDT));
      if (AF_INET == ls->m_iIPversion)
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
         ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
         memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in));
      }
      else
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
         ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
         memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6));
      }
   }
   catch (...)
   {
      delete ns;
      return -1;
   }

   #ifndef WIN32
      pthread_mutex_lock(&m_IDLock);
   #else
      WaitForSingleObject(m_IDLock, INFINITE);
   #endif
   ns->m_Socket = -- m_SocketID;
   #ifndef WIN32
      pthread_mutex_unlock(&m_IDLock);
   #else
      ReleaseMutex(m_IDLock);
   #endif

   ns->m_Status = CUDTSocket::INIT;
   ns->m_ListenSocket = listen;
   ns->m_iIPversion = ls->m_iIPversion;
   ns->m_pUDT->m_SocketID = ns->m_Socket;

   int error = 0;

   try
   {
      ns->m_pUDT->open();
      ns->m_pUDT->connect(peer, hs);
   }
   catch (...)
   {
      error = 1;
      goto ERR_ROLLBACK;
   }

   // copy address information of local node
   ns->m_pUDT->m_pChannel->getSockAddr(ns->m_pSelfAddr);

   // protect the m_Sockets structure.
   #ifndef WIN32
      pthread_mutex_lock(&m_ControlLock);
   #else
      WaitForSingleObject(m_ControlLock, INFINITE);
   #endif
   try
   {
      m_Sockets[ns->m_Socket] = ns;
   }
   catch (...)
   {
      error = 2;
   }
   #ifndef WIN32
      pthread_mutex_unlock(&m_ControlLock);
   #else
      ReleaseMutex(m_ControlLock);
   #endif

   #ifndef WIN32
      pthread_mutex_lock(&(ls->m_AcceptLock));
   #else
      WaitForSingleObject(ls->m_AcceptLock, INFINITE);
   #endif
   try
   {
      ls->m_pQueuedSockets->insert(ns->m_Socket);
   }
   catch (...)
   {
      error = 3;
   }
   #ifndef WIN32
      pthread_mutex_unlock(&(ls->m_AcceptLock));
   #else
      ReleaseMutex(ls->m_AcceptLock);
   #endif

   ERR_ROLLBACK:
   if (error > 0)
   {
      ns->m_pUDT->close();
      if (error > 1)
          m_Sockets.erase(ns->m_Socket);
      delete ns;

      return -1;
   }

   //connection is ready to complete, send hs to peer via the new UDT socket
   CPacket initpkt;
   initpkt.pack(0, NULL, hs, sizeof(CHandShake));
   *(ns->m_pUDT->m_pChannel) << initpkt;

   // wake up a waiting accept() call
   #ifndef WIN32
      pthread_cond_signal(&(ls->m_AcceptCond));
   #else
      SetEvent(ls->m_AcceptCond);
   #endif

   return 1;
}

CUDT* CUDTUnited::lookup(const UDTSOCKET u)
{
   // protects the m_Sockets structure
   CGuard cg(m_ControlLock);

   map::iterator i = m_Sockets.find(u);

   if (i == m_Sockets.end())
      throw CUDTException(5, 4, 0);

   return i->second->m_pUDT;
}

__int32 CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, const __int32& namelen)
{
   CUDTSocket* s = locate(u);

   if (NULL == s)
      throw CUDTException(5, 4, 0);

   // check the size of SOCKADDR structure
   if (AF_INET == s->m_iIPversion)
   {
      if (namelen != sizeof(sockaddr_in))
         throw CUDTException(5, 3, 0);
   }
   else
   {
      if (namelen != sizeof(sockaddr_in6))
         throw CUDTException(5, 3, 0);
   }

   // cannot bind a socket more than once
   if (CUDTSocket::INIT != s->m_Status)
      throw CUDTException(5, 0, 0);

   s->m_pUDT->open(name);
   s->m_Status = CUDTSocket::OPENED;

   // copy address information of local node
   s->m_pUDT->m_pChannel->getSockAddr(s->m_pSelfAddr);

   return 0;
}

__int32 CUDTUnited::listen(const UDTSOCKET u, const __int32& backlog)
{
   CUDTSocket* s = locate(u);

   if (NULL == s)
      throw CUDTException(5, 4, 0);

   // listen is not supported in rendezvous connection setup
   if (s->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);

   if (backlog <= 0)
      throw CUDTException(5, 3, 0);

   s->m_uiBackLog = backlog;

   // do nothing if the socket is already listening
   if (CUDTSocket::LISTENING == s->m_Status)
      return 0;

   // a socket can listen only if is in OPENED status
   if (CUDTSocket::OPENED != s->m_Status)
      throw CUDTException(5, 5, 0);

   s->m_pUDT->listen();

   try
   {
      s->m_pQueuedSockets = new set;
      s->m_pAcceptSockets = new set;
   }
   catch (...)
   {
      delete s->m_pQueuedSockets;
      throw CUDTException(3, 2, 0);
   }

   s->m_Status = CUDTSocket::LISTENING;

   return 0;
}

UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, __int32* addrlen)
{
   CUDTSocket* ls = locate(listen);

   if (ls == NULL)
      throw CUDTException(5, 4, 0);

   // the "listen" socket must be in LISTENING status
   if (CUDTSocket::LISTENING != ls->m_Status)
      throw CUDTException(5, 6, 0);

   // no "accept" in rendezvous connection setup
   if (ls->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);


   UDTSOCKET u = CUDT::INVALID_SOCK;
   bool accepted = false;

   // !!only one conection can be set up each time!!
   #ifndef WIN32
      while (!accepted)
      {
         pthread_mutex_lock(&(ls->m_AcceptLock));

         if (ls->m_pQueuedSockets->size() > 0)
         {
            u = *(ls->m_pQueuedSockets->begin());
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());

            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
            accepted = true;
         else if (CUDTSocket::LISTENING == ls->m_Status)
            pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));

         if (CUDTSocket::LISTENING != ls->m_Status)
            accepted = true;

         pthread_mutex_unlock(&(ls->m_AcceptLock));
      }
   #else
      while (!accepted)
      {
         WaitForSingleObject(ls->m_AcceptLock, INFINITE);

         if (ls->m_pQueuedSockets->size() > 0)
         {
            u = *(ls->m_pQueuedSockets->begin());
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());

            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
            accepted = true;

         ReleaseMutex(ls->m_AcceptLock);

         if  (!accepted & (CUDTSocket::LISTENING == ls->m_Status))
            WaitForSingleObject(ls->m_AcceptCond, INFINITE);

         if (CUDTSocket::LISTENING != ls->m_Status)
         {
            SetEvent(ls->m_AcceptCond);
            accepted = true;
         }
      }
   #endif


   if (u == CUDT::INVALID_SOCK)
   {
      // non-blocking receiving, no connection available
      if (!ls->m_pUDT->m_bSynRecving)
         throw CUDTException(6, 2, 0);

      // listening socket is closed
      throw CUDTException(5, 6, 0);
   }

   if (NULL != addr)
   {
      if (NULL == addrlen)
         throw CUDTException(5, 3, 0);

      if (AF_INET == locate(u)->m_iIPversion)
         *addrlen = sizeof(sockaddr_in);
      else
         *addrlen = sizeof(sockaddr_in6);

      // copy address information of peer node
      memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
   }

   return u;
}

__int32 CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, const __int32& namelen)
{
   CUDTSocket* s = locate(u);

   if (NULL == s)
      throw CUDTException(5, 4, 0);

   // check the size of SOCKADDR structure
   if (AF_INET == s->m_iIPversion)
   {
      if (namelen != sizeof(sockaddr_in))
         throw CUDTException(5, 3, 0);
   }
   else
   {
      if (namelen != sizeof(sockaddr_in6))
         throw CUDTException(5, 3, 0);
   }

   // a socket can "connect" only if it is in INIT or OPENED status
   if (CUDTSocket::INIT == s->m_Status)
   {
      if (!s->m_pUDT->m_bRendezvous)
         s->m_pUDT->open();
      else
         throw CUDTException(5, 8, 0);
   }
   else if (CUDTSocket::OPENED != s->m_Status)
      throw CUDTException(5, 2, 0);

   s->m_pUDT->connect(name);
   s->m_Status = CUDTSocket::CONNECTED;

   // copy address information of local node
   s->m_pUDT->m_pChannel->getSockAddr(s->m_pSelfAddr);

   // record peer address
   if (AF_INET == s->m_iIPversion)
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
   else
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
   s->m_pUDT->m_pChannel->getPeerAddr(s->m_pPeerAddr);

   return 0;
}

__int32 CUDTUnited::close(const UDTSOCKET u)
{
   CUDTSocket* s = locate(u);
   
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CUDTSocket::UDTSTATUS os = s->m_Status;

   // synchronize with garbage collection.
   #ifndef WIN32
      pthread_mutex_lock(&m_ControlLock);
   #else
      WaitForSingleObject(m_ControlLock, INFINITE);
   #endif
   s->m_Status = CUDTSocket::CLOSED;
   #ifndef WIN32
      pthread_mutex_unlock(&m_ControlLock);
   #else
      ReleaseMutex(m_ControlLock);
   #endif

   // broadcast all "accept" waiting
   if (CUDTSocket::LISTENING == os)
   {
      #ifndef WIN32
         pthread_mutex_lock(&(s->m_AcceptLock));
         pthread_mutex_unlock(&(s->m_AcceptLock));
         pthread_cond_broadcast(&(s->m_AcceptCond));
      #else
         SetEvent(s->m_AcceptCond);
      #endif
   }

   // garbage collection should not try to close this instance
   s->m_TimeStamp.tv_sec = -1;

   CUDT* udt = s->m_pUDT;
   udt->close();

   // a socket will not be immediated removed when it is closed
   // in order to prevent other methods from accessing invalid address
   // a timer is started and the socket will be removed after approximately 1 second
   gettimeofday(&s->m_TimeStamp, 0);

   return 0;
}

__int32 CUDTUnited::getpeername(const UDTSOCKET u, sockaddr* name, __int32* namelen)
{
   CUDTSocket* s = locate(u);

   if (NULL == s)
      throw CUDTException(5, 4, 0);

   if (!s->m_pUDT->m_bConnected)
      throw CUDTException(2, 2, 0);

   if (AF_INET == s->m_iIPversion)
      *namelen = sizeof(sockaddr_in);
   else
      *namelen = sizeof(sockaddr_in6);

   // copy address information of peer node
   memcpy(name, s->m_pPeerAddr, *namelen);

   return 0;
}

__int32 CUDTUnited::getsockname(const UDTSOCKET u, sockaddr* name, __int32* namelen)
{
   CUDTSocket* s = locate(u);

   if (NULL == s)
      throw CUDTException(5, 4, 0);

   if (AF_INET == s->m_iIPversion)
      *namelen = sizeof(sockaddr_in);
   else
      *namelen = sizeof(sockaddr_in6);

   // copy address information of local node
   memcpy(name, s->m_pSelfAddr, *namelen);

   return 0;
}

__int32 CUDTUnited::select(ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout)
{
   timeval entertime, currtime;

   gettimeofday(&entertime, 0);

   __int64 to;
   if (NULL == timeout)
      to = (__int64)1 << 62;
   else
      to = timeout->tv_sec * 1000000 + timeout->tv_usec;

   __int32 count = 0;

   set rs, ws, es;

   do
   {
      CUDTSocket* s;

      // query read sockets
      if (NULL != readfds)
         for (set::iterator i = readfds->begin(); i != readfds->end(); ++ i)
         {
            if (NULL == (s = locate(*i)))
               throw CUDTException(5, 4, 0);

            if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0))
               || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected))
               || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0)))
            {
               rs.insert(*i);
               ++ count;
            }
         }

      // query write sockets
      if (NULL != writefds)
         for (set::iterator i = writefds->begin(); i != writefds->end(); ++ i)
         {
            if (NULL == (s = locate(*i)))
               throw CUDTException(5, 4, 0);

            if (s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndQueueLimit))
            {
               ws.insert(*i);
               ++ count;
            }
         }

      // query expections on sockets
      /*
      if (NULL != exceptfds)
         for (set::iterator i = exceptfds->begin(); i != exceptfds->end(); ++ i)
         {
            if (NULL == (s = locate(*i)))
               throw CUDTException(5, 4, 0);

            // check connection request status
               es.insert(*i);
               ++ count;
         }
      */

      if (0 < count)
         break;

      #ifndef WIN32
         usleep(10);
      #else
         Sleep(1);
      #endif

      gettimeofday(&currtime, 0);

   } while (to > ((currtime.tv_sec - entertime.tv_sec) * 1000000 + currtime.tv_usec - entertime.tv_usec));

   if (0 < count)
   {
      if (NULL != readfds)
         *readfds = rs;

      if (NULL != writefds)
         *writefds = ws;

      if (NULL != exceptfds)
         *exceptfds = es;
   }

   return count;
}

CUDTSocket* CUDTUnited::locate(const UDTSOCKET u)
{
   CGuard cg(m_ControlLock);

   map::iterator i = m_Sockets.find(u);

   if (i == m_Sockets.end())
      return NULL;
   else
      return i->second;
}

CUDTSocket* CUDTUnited::locate(const UDTSOCKET u, const sockaddr* peer)
{
   CGuard cg(m_ControlLock);

   map::iterator i = m_Sockets.find(u);

   // look up the "peer" address in queued sockets set
   for (set::iterator j = i->second->m_pQueuedSockets->begin(); j != i->second->m_pQueuedSockets->end(); ++ j)
   {
      map::iterator k = m_Sockets.find(*j);

      if (AF_INET == i->second->m_iIPversion)
      {
         // compare IPv4 address
         if ((((sockaddr_in*)peer)->sin_port == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_port) && (((sockaddr_in*)peer)->sin_addr.s_addr == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_addr.s_addr))
            return k->second;
      }
      else
      {
         // compare IPv6 address
         if (((sockaddr_in6*)peer)->sin6_port == ((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_port)
         {
            __int32* addr1 = (__int32*)&(((sockaddr_in6*)peer)->sin6_addr);
            __int32* addr2 = (__int32*)&(((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_addr);

            __int32 m = 4;
            for (; m > 0; -- m)
               if (addr1[m] != addr2[m])
                  break;

            if (m > 0)
               return k->second;
         }
      }
   }

   // look up the "peer" address in accepted sockets
   for (set::iterator j = i->second->m_pAcceptSockets->begin(); j != i->second->m_pAcceptSockets->end(); ++ j)
   {
      map::iterator k = m_Sockets.find(*j);

      if (AF_INET == i->second->m_iIPversion)
      {
         // compare IPv4 address
         if ((((sockaddr_in*)peer)->sin_port == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_port) && (((sockaddr_in*)peer)->sin_addr.s_addr == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_addr.s_addr))
            return k->second;
      }
      else
      {
         // compare IPv6 address
         if (((sockaddr_in6*)peer)->sin6_port == ((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_port)
         {
            __int32* addr1 = (__int32*)&(((sockaddr_in6*)peer)->sin6_addr);
            __int32* addr2 = (__int32*)&(((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_addr);

            __int32 m = 4;
            for (; m > 0; -- m)
               if (addr1[m] != addr2[m])
                  break;

            if (m > 0)
               return k->second;
         }
      }
   }

   return NULL;
}

void CUDTUnited::checkBrokenSockets()
{
   CGuard cg(m_ControlLock);

   // set of sockets To Be Removed
   set tbr;

   for (map::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)
   {
      if (CUDTSocket::CLOSED != i->second->m_Status)
      {
         // garbage collection
         if ((i->second->m_pUDT->m_bBroken) && (0 == i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize()))
         {
            //close broken connections and start removal timer
            i->second->m_Status = CUDTSocket::CLOSED;
            gettimeofday(&i->second->m_TimeStamp, 0);

            // remove from listener's queue
            map::iterator j = m_Sockets.find(i->second->m_ListenSocket);
            if (j != m_Sockets.end())
               j->second->m_pQueuedSockets->erase(i->second->m_Socket);
         }
      }
      else
      {
         // if timeout, delete the socket
         timeval currtime;
         gettimeofday(&currtime, 0);
         // timeout 1-2 seconds to destroy a socket
         if ((i->second->m_TimeStamp.tv_sec >= 0) && (currtime.tv_sec - i->second->m_TimeStamp.tv_sec >= 2))
            tbr.insert(i->second->m_Socket);

         // sockets cannot be removed here because it will invalidate the map iterator
      }
   }

   // remove those timeout sockets
   for (set::iterator i = tbr.begin(); i != tbr.end(); ++ i)
      removeSocket(*i);
}

void CUDTUnited::removeSocket(const UDTSOCKET u)
{
   map::iterator i = m_Sockets.find(u);

   // invalid socket ID
   if (i == m_Sockets.end())
      return;

   if (0 != i->second->m_ListenSocket)
   {
      // if it is an accepted socket, remove it from the listener's queue
      map::iterator j = m_Sockets.find(i->second->m_ListenSocket);

      if (j != m_Sockets.end())
         j->second->m_pAcceptSockets->erase(u);
   }
   else if (NULL != i->second->m_pQueuedSockets)
   {
      // if it is a listener, remove all un-accepted sockets in its queue
      for (set::iterator j = i->second->m_pQueuedSockets->begin(); j != i->second->m_pQueuedSockets->end(); ++ j)
      {
         m_Sockets[*j]->m_pUDT->close();
         delete m_Sockets[*j];
         m_Sockets.erase(*j);
      }
   }

   // delete this one
   m_Sockets[u]->m_pUDT->close();
   delete m_Sockets[u];
   m_Sockets.erase(u);
}

void CUDTUnited::setError(CUDTException* e)
{
   #ifndef WIN32
      delete (CUDTException*)pthread_getspecific(m_TLSError);
      pthread_setspecific(m_TLSError, e);
   #else
      delete (CUDTException*)TlsGetValue(m_TLSError);
      TlsSetValue(m_TLSError, e);
   #endif
}

CUDTException* CUDTUnited::getError()
{
   #ifndef WIN32
      if(NULL == pthread_getspecific(m_TLSError))
         pthread_setspecific(m_TLSError, new CUDTException);
      return (CUDTException*)pthread_getspecific(m_TLSError);
   #else
      if(NULL == TlsGetValue(m_TLSError))
         TlsSetValue(m_TLSError, new CUDTException);
      return (CUDTException*)TlsGetValue(m_TLSError);
   #endif
}

////////////////////////////////////////////////////////////////////////////////

UDTSOCKET CUDT::socket(int af, int type, int)
{
   try
   {
      return s_UDTUnited.newSocket(af, type);
   }
   catch (CUDTException& e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return INVALID_SOCK;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return INVALID_SOCK;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return INVALID_SOCK;
   }
}

int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen)
{
   try
   {
      return s_UDTUnited.bind(u, name, namelen);
   }
   catch (CUDTException& e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::listen(UDTSOCKET u, int backlog)
{
   try
   {
      return s_UDTUnited.listen(u, backlog);
   }
   catch (CUDTException& e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

UDTSOCKET CUDT::accept(UDTSOCKET u, sockaddr* addr, int* addrlen)
{
   try
   {
      return s_UDTUnited.accept(u, addr, addrlen);
   }
   catch (CUDTException& e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return INVALID_SOCK;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return INVALID_SOCK;
   }
}

int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen)
{
   try
   {
      return s_UDTUnited.connect(u, name, namelen);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::close(UDTSOCKET u)
{
   try
   {
      return s_UDTUnited.close(u);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::getpeername(UDTSOCKET u, sockaddr* name, int* namelen)
{
   try
   {
      return s_UDTUnited.getpeername(u, name, namelen);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::getsockname(UDTSOCKET u, sockaddr* name, int* namelen)
{
   try
   {
      return s_UDTUnited.getsockname(u, name, namelen);;
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::getsockopt(UDTSOCKET u, int, UDTOpt optname, void* optval, int* optlen)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      udt->getOpt(optname, optval, *optlen);

      return 0;
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::setsockopt(UDTSOCKET u, int, UDTOpt optname, const void* optval, int optlen)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      udt->setOpt(optname, optval, optlen);

      return 0;
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::shutdown(UDTSOCKET, int)
{
   try
   {
      //CUDT* udt = s_UDTUnited.lookup(u);
      //udt->shutdown(how);
      return 0;
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::send(UDTSOCKET u, const char* buf, int len, int, int* handle, UDT_MEM_ROUTINE routine)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->send((char*)buf, len, handle, routine);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::recv(UDTSOCKET u, char* buf, int len, int, int* handle, UDT_MEM_ROUTINE routine)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->recv(buf, len, handle, routine);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->sendmsg((char*)buf, len, ttl, inorder);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

int CUDT::recvmsg(UDTSOCKET u, char* buf, int len)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->recvmsg(buf, len);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

__int64 CUDT::sendfile(UDTSOCKET u, ifstream& ifs, const __int64& offset, __int64& size, const int& block)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->sendfile(ifs, offset, size, block);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

__int64 CUDT::recvfile(UDTSOCKET u, ofstream& ofs, const __int64& offset, __int64& size, const int& block)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->recvfile(ofs, offset, size, block);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

bool CUDT::getoverlappedresult(UDTSOCKET u, int handle, int& progress, bool wait)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      return udt->getOverlappedResult(handle, progress, wait);
   }
   catch (CUDTException e)
   {
      // false and -1 means an error; false and positive value means incompleted IO.
      progress = -1;

      s_UDTUnited.setError(new CUDTException(e));
      return false;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return false;
   }
}

int CUDT::select(int, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout)
{
   if ((NULL == readfds) && (NULL == writefds) && (NULL == exceptfds))
   {
      s_UDTUnited.setError(new CUDTException(5, 3, 0));
      return ERROR;
   }

   try
   {
      return s_UDTUnited.select(readfds, writefds, exceptfds, timeout);
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (bad_alloc&)
   {
      s_UDTUnited.setError(new CUDTException(3, 2, 0));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

CUDTException& CUDT::getlasterror()
{
   return *s_UDTUnited.getError();
}

int CUDT::perfmon(UDTSOCKET u, CPerfMon* perf, bool clear)
{
   try
   {
      CUDT* udt = s_UDTUnited.lookup(u);

      udt->sample(perf, clear);

      return 0;
   }
   catch (CUDTException e)
   {
      s_UDTUnited.setError(new CUDTException(e));
      return ERROR;
   }
   catch (...)
   {
      s_UDTUnited.setError(new CUDTException(-1, 0, 0));
      return ERROR;
   }
}

bool CUDT::isUSock(UDTSOCKET u)
{
   return (NULL != s_UDTUnited.lookup(u));
}

CUDT* CUDT::getUDTHandle(UDTSOCKET u)
{
   return s_UDTUnited.lookup(u);
}

////////////////////////////////////////////////////////////////////////////////

namespace UDT
{
UDTSOCKET socket(int af, int type, int protocol)
{
   return CUDT::socket(af, type, protocol);
}

int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)
{
   return CUDT::bind(u, name, namelen);
}

int listen(UDTSOCKET u, int backlog)
{
   return CUDT::listen(u, backlog);
}

UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)
{
   return CUDT::accept(u, addr, addrlen);
}

int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)
{
   return CUDT::connect(u, name, namelen);
}

int close(UDTSOCKET u)
{
   return CUDT::close(u);
}

int getpeername(UDTSOCKET u, struct sockaddr* name, int* namelen)
{
   return CUDT::getpeername(u, name, namelen);
}

int getsockname(UDTSOCKET u, struct sockaddr* name, int* namelen)
{
   return CUDT::getsockname(u, name, namelen);
}

int getsockopt(UDTSOCKET u, int level, SOCKOPT optname, void* optval, int* optlen)
{
   return CUDT::getsockopt(u, level, optname, optval, optlen);
}

int setsockopt(UDTSOCKET u, int level, SOCKOPT optname, const void* optval, int optlen)
{
   return CUDT::setsockopt(u, level, optname, optval, optlen);
}

int shutdown(UDTSOCKET u, int how)
{
   return CUDT::shutdown(u, how);
}

int send(UDTSOCKET u, const char* buf, int len, int flags, int* handle, UDT_MEM_ROUTINE routine)
{
   return CUDT::send(u, buf, len, flags, handle, routine);
}

int recv(UDTSOCKET u, char* buf, int len, int flags, int* handle, UDT_MEM_ROUTINE routine)
{
   return CUDT::recv(u, buf, len, flags, handle, routine);
}

int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder)
{
   return CUDT::sendmsg(u, buf, len, ttl, inorder);
}

int recvmsg(UDTSOCKET u, char* buf, int len)
{
   return CUDT::recvmsg(u, buf, len);
}

__int64 sendfile(UDTSOCKET u, ifstream& ifs, const __int64& offset, __int64& size, const int& block)
{
   return CUDT::sendfile(u, ifs, offset, size, block);
}

__int64 recvfile(UDTSOCKET u, ofstream& ofs, const __int64& offset, __int64& size, const int& block)
{
   return CUDT::recvfile(u, ofs, offset, size, block);
}

bool getoverlappedresult(UDTSOCKET u, int handle, int& progress, bool wait)
{
   return CUDT::getoverlappedresult(u, handle, progress, wait);
}

int select(int nfds, UDSET* readfds, UDSET* writefds, UDSET* exceptfds, const struct timeval* timeout)
{
   return CUDT::select(nfds, readfds, writefds, exceptfds, timeout);
}

ERRORINFO getlasterror()
{
   return CUDT::getlasterror();
}

int perfmon(UDTSOCKET u, TRACEINFO* perf, bool clear)
{
   return CUDT::perfmon(u, perf, clear);
}

}