www.pudn.com > UDP-based_Reliable_Data_Transfer_Library.zip > api.cpp
/***************************************************************************** Copyright © 2001 - 2006, The Board of Trustees of the University of Illinois. All Rights Reserved. UDP-based Data Transfer Library (UDT) version 3 Laboratory for Advanced Computing (LAC) National Center for Data Mining (NCDM) University of Illinois at Chicago http://www.lac.uic.edu/ This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. *****************************************************************************/ /***************************************************************************** This file contains the implementation of UDT API. reference: UDT programming manual and socket programming reference *****************************************************************************/ /***************************************************************************** written by Yunhong Gu [gu@lac.uic.edu], last updated 03/14/2006 *****************************************************************************/ #ifndef WIN32 #include#else #include #include #endif #include "api.h" #include "core.h" CUDTSocket::CUDTSocket(): m_pSelfAddr(NULL), m_pPeerAddr(NULL), m_pUDT(NULL), m_pQueuedSockets(NULL), m_pAcceptSockets(NULL) { #ifndef WIN32 pthread_mutex_init(&m_AcceptLock, NULL); pthread_cond_init(&m_AcceptCond, NULL); #else m_AcceptLock = CreateMutex(NULL, false, NULL); m_AcceptCond = CreateEvent(NULL, false, false, NULL); #endif } CUDTSocket::~CUDTSocket() { if (AF_INET == m_iIPversion) { if (m_pSelfAddr) delete (sockaddr_in*)m_pSelfAddr; if (m_pPeerAddr) delete (sockaddr_in*)m_pPeerAddr; } else { if (m_pSelfAddr) delete (sockaddr_in6*)m_pSelfAddr; if (m_pPeerAddr) delete (sockaddr_in6*)m_pPeerAddr; } if (m_pUDT) delete m_pUDT; if (m_pQueuedSockets) delete m_pQueuedSockets; if (m_pAcceptSockets) delete m_pAcceptSockets; #ifndef WIN32 pthread_mutex_destroy(&m_AcceptLock); pthread_cond_destroy(&m_AcceptCond); #else CloseHandle(m_AcceptLock); CloseHandle(m_AcceptCond); #endif } //////////////////////////////////////////////////////////////////////////////// CUDTUnited::CUDTUnited(): m_SocketID(1 << 30) { #ifndef WIN32 pthread_mutex_init(&m_ControlLock, NULL); pthread_mutex_init(&m_IDLock, NULL); #else m_ControlLock = CreateMutex(NULL, false, NULL); m_IDLock = CreateMutex(NULL, false, NULL); #endif #ifndef WIN32 pthread_key_create(&m_TLSError, TLSDestroy); #else m_TLSError = TlsAlloc(); #endif // Global initialization code #ifdef WIN32 WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD(2, 2); if (0 != WSAStartup(wVersionRequested, &wsaData)) throw CUDTException(1, 0, WSAGetLastError()); #endif } CUDTUnited::~CUDTUnited() { #ifndef WIN32 pthread_mutex_destroy(&m_ControlLock); pthread_mutex_destroy(&m_IDLock); #else CloseHandle(m_ControlLock); CloseHandle(m_IDLock); #endif #ifndef WIN32 pthread_key_delete(m_TLSError); #else TlsFree(m_TLSError); #endif // Global destruction code #ifdef WIN32 WSACleanup(); #endif } UDTSOCKET CUDTUnited::newSocket(const __int32& af, const __int32& type) { // garbage collection before a new socket is created checkBrokenSockets(); if ((type != SOCK_STREAM) && (type != SOCK_DGRAM)) throw CUDTException(5, 3, 0); CUDTSocket* ns = NULL; try { ns = new CUDTSocket; ns->m_pUDT = new CUDT; if (AF_INET == af) ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in); else ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); } catch (...) { delete ns; throw CUDTException(3, 2, 0); } #ifndef WIN32 pthread_mutex_lock(&m_IDLock); #else WaitForSingleObject(m_IDLock, INFINITE); #endif ns->m_Socket = -- m_SocketID; #ifndef WIN32 pthread_mutex_unlock(&m_IDLock); #else ReleaseMutex(m_IDLock); #endif ns->m_Status = CUDTSocket::INIT; ns->m_ListenSocket = 0; ns->m_pUDT->m_SocketID = ns->m_Socket; ns->m_pUDT->m_iSockType = type; ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af; // protect the m_Sockets structure. #ifndef WIN32 pthread_mutex_lock(&m_ControlLock); #else WaitForSingleObject(m_ControlLock, INFINITE); #endif try { m_Sockets[ns->m_Socket] = ns; } catch (...) { //failure and rollback delete ns; ns = NULL; } #ifndef WIN32 pthread_mutex_unlock(&m_ControlLock); #else ReleaseMutex(m_ControlLock); #endif if (NULL == ns) throw CUDTException(3, 2, 0); return ns->m_Socket; } int CUDTUnited::newConnection(const UDTSOCKET listen, const sockaddr* peer, CHandShake* hs) { // garbage collection before a new socket is created checkBrokenSockets(); CUDTSocket* ns; CUDTSocket* ls = locate(listen); // if this connection has already been processed if (NULL != (ns = locate(listen, peer))) { if (ns->m_pUDT->m_bBroken) { // last connection from the "peer" address has been broken ns->m_Status = CUDTSocket::CLOSED; gettimeofday(&ns->m_TimeStamp, 0); #ifndef WIN32 pthread_mutex_lock(&(ls->m_AcceptLock)); #else WaitForSingleObject(ls->m_AcceptLock, INFINITE); #endif ls->m_pQueuedSockets->erase(ns->m_Socket); ls->m_pAcceptSockets->erase(ns->m_Socket); #ifndef WIN32 pthread_mutex_unlock(&(ls->m_AcceptLock)); #else ReleaseMutex(ls->m_AcceptLock); #endif } else if (hs->m_iISN == ns->m_pUDT->m_iPeerISN) { // connection already exist, this is a repeated connection request // pass the hand shake packet to the UDT entity CPacket cr; cr.pack(0, NULL, hs, sizeof(CHandShake)); ns->m_pUDT->processCtrl(cr); return 0; //except for this situation a new connection should be started } } // exceeding backlog, refuse the connection request if (ls->m_pQueuedSockets->size() >= ls->m_uiBackLog) return -1; try { ns = new CUDTSocket; ns->m_pUDT = new CUDT(*(ls->m_pUDT)); if (AF_INET == ls->m_iIPversion) { ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in); ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in); memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in)); } else { ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6); ns->m_pPeerAddr = (sockaddr*)(new sockaddr_in6); memcpy(ns->m_pPeerAddr, peer, sizeof(sockaddr_in6)); } } catch (...) { delete ns; return -1; } #ifndef WIN32 pthread_mutex_lock(&m_IDLock); #else WaitForSingleObject(m_IDLock, INFINITE); #endif ns->m_Socket = -- m_SocketID; #ifndef WIN32 pthread_mutex_unlock(&m_IDLock); #else ReleaseMutex(m_IDLock); #endif ns->m_Status = CUDTSocket::INIT; ns->m_ListenSocket = listen; ns->m_iIPversion = ls->m_iIPversion; ns->m_pUDT->m_SocketID = ns->m_Socket; int error = 0; try { ns->m_pUDT->open(); ns->m_pUDT->connect(peer, hs); } catch (...) { error = 1; goto ERR_ROLLBACK; } // copy address information of local node ns->m_pUDT->m_pChannel->getSockAddr(ns->m_pSelfAddr); // protect the m_Sockets structure. #ifndef WIN32 pthread_mutex_lock(&m_ControlLock); #else WaitForSingleObject(m_ControlLock, INFINITE); #endif try { m_Sockets[ns->m_Socket] = ns; } catch (...) { error = 2; } #ifndef WIN32 pthread_mutex_unlock(&m_ControlLock); #else ReleaseMutex(m_ControlLock); #endif #ifndef WIN32 pthread_mutex_lock(&(ls->m_AcceptLock)); #else WaitForSingleObject(ls->m_AcceptLock, INFINITE); #endif try { ls->m_pQueuedSockets->insert(ns->m_Socket); } catch (...) { error = 3; } #ifndef WIN32 pthread_mutex_unlock(&(ls->m_AcceptLock)); #else ReleaseMutex(ls->m_AcceptLock); #endif ERR_ROLLBACK: if (error > 0) { ns->m_pUDT->close(); if (error > 1) m_Sockets.erase(ns->m_Socket); delete ns; return -1; } //connection is ready to complete, send hs to peer via the new UDT socket CPacket initpkt; initpkt.pack(0, NULL, hs, sizeof(CHandShake)); *(ns->m_pUDT->m_pChannel) << initpkt; // wake up a waiting accept() call #ifndef WIN32 pthread_cond_signal(&(ls->m_AcceptCond)); #else SetEvent(ls->m_AcceptCond); #endif return 1; } CUDT* CUDTUnited::lookup(const UDTSOCKET u) { // protects the m_Sockets structure CGuard cg(m_ControlLock); map ::iterator i = m_Sockets.find(u); if (i == m_Sockets.end()) throw CUDTException(5, 4, 0); return i->second->m_pUDT; } __int32 CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, const __int32& namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); // check the size of SOCKADDR structure if (AF_INET == s->m_iIPversion) { if (namelen != sizeof(sockaddr_in)) throw CUDTException(5, 3, 0); } else { if (namelen != sizeof(sockaddr_in6)) throw CUDTException(5, 3, 0); } // cannot bind a socket more than once if (CUDTSocket::INIT != s->m_Status) throw CUDTException(5, 0, 0); s->m_pUDT->open(name); s->m_Status = CUDTSocket::OPENED; // copy address information of local node s->m_pUDT->m_pChannel->getSockAddr(s->m_pSelfAddr); return 0; } __int32 CUDTUnited::listen(const UDTSOCKET u, const __int32& backlog) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); // listen is not supported in rendezvous connection setup if (s->m_pUDT->m_bRendezvous) throw CUDTException(5, 7, 0); if (backlog <= 0) throw CUDTException(5, 3, 0); s->m_uiBackLog = backlog; // do nothing if the socket is already listening if (CUDTSocket::LISTENING == s->m_Status) return 0; // a socket can listen only if is in OPENED status if (CUDTSocket::OPENED != s->m_Status) throw CUDTException(5, 5, 0); s->m_pUDT->listen(); try { s->m_pQueuedSockets = new set ; s->m_pAcceptSockets = new set ; } catch (...) { delete s->m_pQueuedSockets; throw CUDTException(3, 2, 0); } s->m_Status = CUDTSocket::LISTENING; return 0; } UDTSOCKET CUDTUnited::accept(const UDTSOCKET listen, sockaddr* addr, __int32* addrlen) { CUDTSocket* ls = locate(listen); if (ls == NULL) throw CUDTException(5, 4, 0); // the "listen" socket must be in LISTENING status if (CUDTSocket::LISTENING != ls->m_Status) throw CUDTException(5, 6, 0); // no "accept" in rendezvous connection setup if (ls->m_pUDT->m_bRendezvous) throw CUDTException(5, 7, 0); UDTSOCKET u = CUDT::INVALID_SOCK; bool accepted = false; // !!only one conection can be set up each time!! #ifndef WIN32 while (!accepted) { pthread_mutex_lock(&(ls->m_AcceptLock)); if (ls->m_pQueuedSockets->size() > 0) { u = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u); ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin()); accepted = true; } else if (!ls->m_pUDT->m_bSynRecving) accepted = true; else if (CUDTSocket::LISTENING == ls->m_Status) pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock)); if (CUDTSocket::LISTENING != ls->m_Status) accepted = true; pthread_mutex_unlock(&(ls->m_AcceptLock)); } #else while (!accepted) { WaitForSingleObject(ls->m_AcceptLock, INFINITE); if (ls->m_pQueuedSockets->size() > 0) { u = *(ls->m_pQueuedSockets->begin()); ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u); ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin()); accepted = true; } else if (!ls->m_pUDT->m_bSynRecving) accepted = true; ReleaseMutex(ls->m_AcceptLock); if (!accepted & (CUDTSocket::LISTENING == ls->m_Status)) WaitForSingleObject(ls->m_AcceptCond, INFINITE); if (CUDTSocket::LISTENING != ls->m_Status) { SetEvent(ls->m_AcceptCond); accepted = true; } } #endif if (u == CUDT::INVALID_SOCK) { // non-blocking receiving, no connection available if (!ls->m_pUDT->m_bSynRecving) throw CUDTException(6, 2, 0); // listening socket is closed throw CUDTException(5, 6, 0); } if (NULL != addr) { if (NULL == addrlen) throw CUDTException(5, 3, 0); if (AF_INET == locate(u)->m_iIPversion) *addrlen = sizeof(sockaddr_in); else *addrlen = sizeof(sockaddr_in6); // copy address information of peer node memcpy(addr, locate(u)->m_pPeerAddr, *addrlen); } return u; } __int32 CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, const __int32& namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); // check the size of SOCKADDR structure if (AF_INET == s->m_iIPversion) { if (namelen != sizeof(sockaddr_in)) throw CUDTException(5, 3, 0); } else { if (namelen != sizeof(sockaddr_in6)) throw CUDTException(5, 3, 0); } // a socket can "connect" only if it is in INIT or OPENED status if (CUDTSocket::INIT == s->m_Status) { if (!s->m_pUDT->m_bRendezvous) s->m_pUDT->open(); else throw CUDTException(5, 8, 0); } else if (CUDTSocket::OPENED != s->m_Status) throw CUDTException(5, 2, 0); s->m_pUDT->connect(name); s->m_Status = CUDTSocket::CONNECTED; // copy address information of local node s->m_pUDT->m_pChannel->getSockAddr(s->m_pSelfAddr); // record peer address if (AF_INET == s->m_iIPversion) s->m_pPeerAddr = (sockaddr*)(new sockaddr_in); else s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6); s->m_pUDT->m_pChannel->getPeerAddr(s->m_pPeerAddr); return 0; } __int32 CUDTUnited::close(const UDTSOCKET u) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); CUDTSocket::UDTSTATUS os = s->m_Status; // synchronize with garbage collection. #ifndef WIN32 pthread_mutex_lock(&m_ControlLock); #else WaitForSingleObject(m_ControlLock, INFINITE); #endif s->m_Status = CUDTSocket::CLOSED; #ifndef WIN32 pthread_mutex_unlock(&m_ControlLock); #else ReleaseMutex(m_ControlLock); #endif // broadcast all "accept" waiting if (CUDTSocket::LISTENING == os) { #ifndef WIN32 pthread_mutex_lock(&(s->m_AcceptLock)); pthread_mutex_unlock(&(s->m_AcceptLock)); pthread_cond_broadcast(&(s->m_AcceptCond)); #else SetEvent(s->m_AcceptCond); #endif } // garbage collection should not try to close this instance s->m_TimeStamp.tv_sec = -1; CUDT* udt = s->m_pUDT; udt->close(); // a socket will not be immediated removed when it is closed // in order to prevent other methods from accessing invalid address // a timer is started and the socket will be removed after approximately 1 second gettimeofday(&s->m_TimeStamp, 0); return 0; } __int32 CUDTUnited::getpeername(const UDTSOCKET u, sockaddr* name, __int32* namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); if (!s->m_pUDT->m_bConnected) throw CUDTException(2, 2, 0); if (AF_INET == s->m_iIPversion) *namelen = sizeof(sockaddr_in); else *namelen = sizeof(sockaddr_in6); // copy address information of peer node memcpy(name, s->m_pPeerAddr, *namelen); return 0; } __int32 CUDTUnited::getsockname(const UDTSOCKET u, sockaddr* name, __int32* namelen) { CUDTSocket* s = locate(u); if (NULL == s) throw CUDTException(5, 4, 0); if (AF_INET == s->m_iIPversion) *namelen = sizeof(sockaddr_in); else *namelen = sizeof(sockaddr_in6); // copy address information of local node memcpy(name, s->m_pSelfAddr, *namelen); return 0; } __int32 CUDTUnited::select(ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout) { timeval entertime, currtime; gettimeofday(&entertime, 0); __int64 to; if (NULL == timeout) to = (__int64)1 << 62; else to = timeout->tv_sec * 1000000 + timeout->tv_usec; __int32 count = 0; set rs, ws, es; do { CUDTSocket* s; // query read sockets if (NULL != readfds) for (set ::iterator i = readfds->begin(); i != readfds->end(); ++ i) { if (NULL == (s = locate(*i))) throw CUDTException(5, 4, 0); if ((s->m_pUDT->m_bConnected && (s->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0)) || (!s->m_pUDT->m_bListening && (s->m_pUDT->m_bBroken || !s->m_pUDT->m_bConnected)) || (s->m_pUDT->m_bListening && (s->m_pQueuedSockets->size() > 0))) { rs.insert(*i); ++ count; } } // query write sockets if (NULL != writefds) for (set ::iterator i = writefds->begin(); i != writefds->end(); ++ i) { if (NULL == (s = locate(*i))) throw CUDTException(5, 4, 0); if (s->m_pUDT->m_bConnected && (s->m_pUDT->m_pSndBuffer->getCurrBufSize() < s->m_pUDT->m_iSndQueueLimit)) { ws.insert(*i); ++ count; } } // query expections on sockets /* if (NULL != exceptfds) for (set ::iterator i = exceptfds->begin(); i != exceptfds->end(); ++ i) { if (NULL == (s = locate(*i))) throw CUDTException(5, 4, 0); // check connection request status es.insert(*i); ++ count; } */ if (0 < count) break; #ifndef WIN32 usleep(10); #else Sleep(1); #endif gettimeofday(&currtime, 0); } while (to > ((currtime.tv_sec - entertime.tv_sec) * 1000000 + currtime.tv_usec - entertime.tv_usec)); if (0 < count) { if (NULL != readfds) *readfds = rs; if (NULL != writefds) *writefds = ws; if (NULL != exceptfds) *exceptfds = es; } return count; } CUDTSocket* CUDTUnited::locate(const UDTSOCKET u) { CGuard cg(m_ControlLock); map ::iterator i = m_Sockets.find(u); if (i == m_Sockets.end()) return NULL; else return i->second; } CUDTSocket* CUDTUnited::locate(const UDTSOCKET u, const sockaddr* peer) { CGuard cg(m_ControlLock); map ::iterator i = m_Sockets.find(u); // look up the "peer" address in queued sockets set for (set ::iterator j = i->second->m_pQueuedSockets->begin(); j != i->second->m_pQueuedSockets->end(); ++ j) { map ::iterator k = m_Sockets.find(*j); if (AF_INET == i->second->m_iIPversion) { // compare IPv4 address if ((((sockaddr_in*)peer)->sin_port == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_port) && (((sockaddr_in*)peer)->sin_addr.s_addr == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_addr.s_addr)) return k->second; } else { // compare IPv6 address if (((sockaddr_in6*)peer)->sin6_port == ((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_port) { __int32* addr1 = (__int32*)&(((sockaddr_in6*)peer)->sin6_addr); __int32* addr2 = (__int32*)&(((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_addr); __int32 m = 4; for (; m > 0; -- m) if (addr1[m] != addr2[m]) break; if (m > 0) return k->second; } } } // look up the "peer" address in accepted sockets for (set ::iterator j = i->second->m_pAcceptSockets->begin(); j != i->second->m_pAcceptSockets->end(); ++ j) { map ::iterator k = m_Sockets.find(*j); if (AF_INET == i->second->m_iIPversion) { // compare IPv4 address if ((((sockaddr_in*)peer)->sin_port == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_port) && (((sockaddr_in*)peer)->sin_addr.s_addr == ((sockaddr_in*)k->second->m_pPeerAddr)->sin_addr.s_addr)) return k->second; } else { // compare IPv6 address if (((sockaddr_in6*)peer)->sin6_port == ((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_port) { __int32* addr1 = (__int32*)&(((sockaddr_in6*)peer)->sin6_addr); __int32* addr2 = (__int32*)&(((sockaddr_in6*)k->second->m_pPeerAddr)->sin6_addr); __int32 m = 4; for (; m > 0; -- m) if (addr1[m] != addr2[m]) break; if (m > 0) return k->second; } } } return NULL; } void CUDTUnited::checkBrokenSockets() { CGuard cg(m_ControlLock); // set of sockets To Be Removed set tbr; for (map ::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i) { if (CUDTSocket::CLOSED != i->second->m_Status) { // garbage collection if ((i->second->m_pUDT->m_bBroken) && (0 == i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize())) { //close broken connections and start removal timer i->second->m_Status = CUDTSocket::CLOSED; gettimeofday(&i->second->m_TimeStamp, 0); // remove from listener's queue map ::iterator j = m_Sockets.find(i->second->m_ListenSocket); if (j != m_Sockets.end()) j->second->m_pQueuedSockets->erase(i->second->m_Socket); } } else { // if timeout, delete the socket timeval currtime; gettimeofday(&currtime, 0); // timeout 1-2 seconds to destroy a socket if ((i->second->m_TimeStamp.tv_sec >= 0) && (currtime.tv_sec - i->second->m_TimeStamp.tv_sec >= 2)) tbr.insert(i->second->m_Socket); // sockets cannot be removed here because it will invalidate the map iterator } } // remove those timeout sockets for (set ::iterator i = tbr.begin(); i != tbr.end(); ++ i) removeSocket(*i); } void CUDTUnited::removeSocket(const UDTSOCKET u) { map ::iterator i = m_Sockets.find(u); // invalid socket ID if (i == m_Sockets.end()) return; if (0 != i->second->m_ListenSocket) { // if it is an accepted socket, remove it from the listener's queue map ::iterator j = m_Sockets.find(i->second->m_ListenSocket); if (j != m_Sockets.end()) j->second->m_pAcceptSockets->erase(u); } else if (NULL != i->second->m_pQueuedSockets) { // if it is a listener, remove all un-accepted sockets in its queue for (set ::iterator j = i->second->m_pQueuedSockets->begin(); j != i->second->m_pQueuedSockets->end(); ++ j) { m_Sockets[*j]->m_pUDT->close(); delete m_Sockets[*j]; m_Sockets.erase(*j); } } // delete this one m_Sockets[u]->m_pUDT->close(); delete m_Sockets[u]; m_Sockets.erase(u); } void CUDTUnited::setError(CUDTException* e) { #ifndef WIN32 delete (CUDTException*)pthread_getspecific(m_TLSError); pthread_setspecific(m_TLSError, e); #else delete (CUDTException*)TlsGetValue(m_TLSError); TlsSetValue(m_TLSError, e); #endif } CUDTException* CUDTUnited::getError() { #ifndef WIN32 if(NULL == pthread_getspecific(m_TLSError)) pthread_setspecific(m_TLSError, new CUDTException); return (CUDTException*)pthread_getspecific(m_TLSError); #else if(NULL == TlsGetValue(m_TLSError)) TlsSetValue(m_TLSError, new CUDTException); return (CUDTException*)TlsGetValue(m_TLSError); #endif } //////////////////////////////////////////////////////////////////////////////// UDTSOCKET CUDT::socket(int af, int type, int) { try { return s_UDTUnited.newSocket(af, type); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return INVALID_SOCK; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return INVALID_SOCK; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return INVALID_SOCK; } } int CUDT::bind(UDTSOCKET u, const sockaddr* name, int namelen) { try { return s_UDTUnited.bind(u, name, namelen); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::listen(UDTSOCKET u, int backlog) { try { return s_UDTUnited.listen(u, backlog); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } UDTSOCKET CUDT::accept(UDTSOCKET u, sockaddr* addr, int* addrlen) { try { return s_UDTUnited.accept(u, addr, addrlen); } catch (CUDTException& e) { s_UDTUnited.setError(new CUDTException(e)); return INVALID_SOCK; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return INVALID_SOCK; } } int CUDT::connect(UDTSOCKET u, const sockaddr* name, int namelen) { try { return s_UDTUnited.connect(u, name, namelen); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::close(UDTSOCKET u) { try { return s_UDTUnited.close(u); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::getpeername(UDTSOCKET u, sockaddr* name, int* namelen) { try { return s_UDTUnited.getpeername(u, name, namelen); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::getsockname(UDTSOCKET u, sockaddr* name, int* namelen) { try { return s_UDTUnited.getsockname(u, name, namelen);; } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::getsockopt(UDTSOCKET u, int, UDTOpt optname, void* optval, int* optlen) { try { CUDT* udt = s_UDTUnited.lookup(u); udt->getOpt(optname, optval, *optlen); return 0; } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::setsockopt(UDTSOCKET u, int, UDTOpt optname, const void* optval, int optlen) { try { CUDT* udt = s_UDTUnited.lookup(u); udt->setOpt(optname, optval, optlen); return 0; } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::shutdown(UDTSOCKET, int) { try { //CUDT* udt = s_UDTUnited.lookup(u); //udt->shutdown(how); return 0; } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::send(UDTSOCKET u, const char* buf, int len, int, int* handle, UDT_MEM_ROUTINE routine) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->send((char*)buf, len, handle, routine); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::recv(UDTSOCKET u, char* buf, int len, int, int* handle, UDT_MEM_ROUTINE routine) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->recv(buf, len, handle, routine); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->sendmsg((char*)buf, len, ttl, inorder); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->recvmsg(buf, len); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } __int64 CUDT::sendfile(UDTSOCKET u, ifstream& ifs, const __int64& offset, __int64& size, const int& block) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->sendfile(ifs, offset, size, block); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } __int64 CUDT::recvfile(UDTSOCKET u, ofstream& ofs, const __int64& offset, __int64& size, const int& block) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->recvfile(ofs, offset, size, block); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } bool CUDT::getoverlappedresult(UDTSOCKET u, int handle, int& progress, bool wait) { try { CUDT* udt = s_UDTUnited.lookup(u); return udt->getOverlappedResult(handle, progress, wait); } catch (CUDTException e) { // false and -1 means an error; false and positive value means incompleted IO. progress = -1; s_UDTUnited.setError(new CUDTException(e)); return false; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return false; } } int CUDT::select(int, ud_set* readfds, ud_set* writefds, ud_set* exceptfds, const timeval* timeout) { if ((NULL == readfds) && (NULL == writefds) && (NULL == exceptfds)) { s_UDTUnited.setError(new CUDTException(5, 3, 0)); return ERROR; } try { return s_UDTUnited.select(readfds, writefds, exceptfds, timeout); } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (bad_alloc&) { s_UDTUnited.setError(new CUDTException(3, 2, 0)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } CUDTException& CUDT::getlasterror() { return *s_UDTUnited.getError(); } int CUDT::perfmon(UDTSOCKET u, CPerfMon* perf, bool clear) { try { CUDT* udt = s_UDTUnited.lookup(u); udt->sample(perf, clear); return 0; } catch (CUDTException e) { s_UDTUnited.setError(new CUDTException(e)); return ERROR; } catch (...) { s_UDTUnited.setError(new CUDTException(-1, 0, 0)); return ERROR; } } bool CUDT::isUSock(UDTSOCKET u) { return (NULL != s_UDTUnited.lookup(u)); } CUDT* CUDT::getUDTHandle(UDTSOCKET u) { return s_UDTUnited.lookup(u); } //////////////////////////////////////////////////////////////////////////////// namespace UDT { UDTSOCKET socket(int af, int type, int protocol) { return CUDT::socket(af, type, protocol); } int bind(UDTSOCKET u, const struct sockaddr* name, int namelen) { return CUDT::bind(u, name, namelen); } int listen(UDTSOCKET u, int backlog) { return CUDT::listen(u, backlog); } UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) { return CUDT::accept(u, addr, addrlen); } int connect(UDTSOCKET u, const struct sockaddr* name, int namelen) { return CUDT::connect(u, name, namelen); } int close(UDTSOCKET u) { return CUDT::close(u); } int getpeername(UDTSOCKET u, struct sockaddr* name, int* namelen) { return CUDT::getpeername(u, name, namelen); } int getsockname(UDTSOCKET u, struct sockaddr* name, int* namelen) { return CUDT::getsockname(u, name, namelen); } int getsockopt(UDTSOCKET u, int level, SOCKOPT optname, void* optval, int* optlen) { return CUDT::getsockopt(u, level, optname, optval, optlen); } int setsockopt(UDTSOCKET u, int level, SOCKOPT optname, const void* optval, int optlen) { return CUDT::setsockopt(u, level, optname, optval, optlen); } int shutdown(UDTSOCKET u, int how) { return CUDT::shutdown(u, how); } int send(UDTSOCKET u, const char* buf, int len, int flags, int* handle, UDT_MEM_ROUTINE routine) { return CUDT::send(u, buf, len, flags, handle, routine); } int recv(UDTSOCKET u, char* buf, int len, int flags, int* handle, UDT_MEM_ROUTINE routine) { return CUDT::recv(u, buf, len, flags, handle, routine); } int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl, bool inorder) { return CUDT::sendmsg(u, buf, len, ttl, inorder); } int recvmsg(UDTSOCKET u, char* buf, int len) { return CUDT::recvmsg(u, buf, len); } __int64 sendfile(UDTSOCKET u, ifstream& ifs, const __int64& offset, __int64& size, const int& block) { return CUDT::sendfile(u, ifs, offset, size, block); } __int64 recvfile(UDTSOCKET u, ofstream& ofs, const __int64& offset, __int64& size, const int& block) { return CUDT::recvfile(u, ofs, offset, size, block); } bool getoverlappedresult(UDTSOCKET u, int handle, int& progress, bool wait) { return CUDT::getoverlappedresult(u, handle, progress, wait); } int select(int nfds, UDSET* readfds, UDSET* writefds, UDSET* exceptfds, const struct timeval* timeout) { return CUDT::select(nfds, readfds, writefds, exceptfds, timeout); } ERRORINFO getlasterror() { return CUDT::getlasterror(); } int perfmon(UDTSOCKET u, TRACEINFO* perf, bool clear) { return CUDT::perfmon(u, perf, clear); } }