www.pudn.com > udt.rar > udt.cpp


/*****************************************************************************
This file contains the implementation of main algorithms of UDT protocol and
the implementation of core UDT interfaces.

*****************************************************************************/

#ifndef WIN32
   #include 
   #include 
   #include 
   #include 
   #include 
   #include 
#else
   #include 
   #include 
#endif

#include 
#include "udt.h"

using namespace std;

__int32 CCC::m_iCCID = 0; 
 
#ifndef WIN32 
UDTSOCKET CUDT::INVALID_SOCK = -1;	// invalid socket descriptor 
int CUDT::ERROR = -1;                 // socket api error returned value 
#else 
UDTSOCKET CUDT::INVALID_SOCK= -1; 
#undef ERROR 
int CUDT::ERROR = -1; 
#endif 

UDTSOCKET UDT::INVALID_SOCK = CUDT::INVALID_SOCK;
int UDT::ERROR = CUDT::ERROR;
 
// 
CUDTUnited CUDT::s_UDTUnited; 
//CUDTUnited CUDT::s_UDTUnited;   //wrh
CUDT::CUDT():
//
// These constants are defined in UDT specification. They MUST NOT be changed!
//
m_iVersion(2),
m_iSYNInterval(10000),
m_iSelfClockInterval(64),
m_iMaxSeqNo(1 << 30),
m_iSeqNoTH(1 << 29),
m_iMaxAckSeqNo(1 << 16),
m_iProbeInterval(16),
m_iQuickStartPkts(16)
{
   m_pChannel = NULL;
   m_pSndBuffer = NULL;
   m_pRcvBuffer = NULL;
   m_pSndLossList = NULL;
   m_pRcvLossList = NULL;
   m_pTimer = NULL;
   m_pIrrPktList = NULL;
   m_pACKWindow = NULL;
   m_pSndTimeWindow = NULL;
   m_pRcvTimeWindow = NULL;

   // Initilize mutex and condition variables
   initSynch();

   // Default UDT configurations
   m_iMSS = 1500;
   m_bSynSending = true;
   m_bSynRecving = true;
   m_iFlightFlagSize = 25600;
   m_iSndQueueLimit = 40960000;
   m_iUDTBufSize = 40960000;
   m_Linger.l_onoff = 1;
   m_Linger.l_linger = 180;
   m_iUDPSndBufSize = 65536;
   m_iUDPRcvBufSize = 4 * 1024 * 1024;
   m_iMaxMsg = 9000;
   m_iMsgTTL = -1;
   m_iSockType = SOCK_STREAM;
   m_iIPversion = AF_INET;

   #ifdef CUSTOM_CC
      m_pCCFactory = new CCCFactory;
   #else
      m_pCCFactory = NULL;
   #endif
   m_pCC = NULL;

   m_iRTT = 10 * m_iSYNInterval;
   m_iRTTVar = m_iRTT >> 1;
   m_ullCPUFrequency = CTimer::getCPUFrequency();

   // Initial status
   m_bOpened = false;
   m_bConnected = false;
   m_bBroken = false;
}

CUDT::CUDT(const CUDT& ancestor):
m_iVersion(ancestor.m_iVersion),
m_iSYNInterval(ancestor.m_iSYNInterval),
m_iSelfClockInterval(ancestor.m_iSelfClockInterval),
m_iMaxSeqNo(ancestor.m_iMaxSeqNo),
m_iSeqNoTH(ancestor.m_iSeqNoTH),
m_iMaxAckSeqNo(ancestor.m_iMaxAckSeqNo),
m_iProbeInterval(ancestor.m_iProbeInterval),
m_iQuickStartPkts(ancestor.m_iQuickStartPkts)
{
   m_pChannel = NULL;
   m_pSndBuffer = NULL;
   m_pRcvBuffer = NULL;
   m_pSndLossList = NULL;
   m_pRcvLossList = NULL;
   m_pTimer = NULL;
   m_pIrrPktList = NULL;
   m_pACKWindow = NULL;
   m_pSndTimeWindow = NULL;
   m_pRcvTimeWindow = NULL;

   // Initilize mutex and condition variables
   initSynch();

   // Default UDT configurations
   m_iMSS = ancestor.m_iMSS;
   m_bSynSending = ancestor.m_bSynSending;
   m_bSynRecving = ancestor.m_bSynRecving;
   m_iFlightFlagSize = ancestor.m_iFlightFlagSize;
   m_iSndQueueLimit = ancestor.m_iSndQueueLimit;
   m_iUDTBufSize = ancestor.m_iUDTBufSize;
   m_Linger = ancestor.m_Linger;
   m_iUDPSndBufSize = ancestor.m_iUDPSndBufSize;
   m_iUDPRcvBufSize = ancestor.m_iUDPRcvBufSize;
   m_iMaxMsg = ancestor.m_iMaxMsg;
   m_iMsgTTL = ancestor.m_iMsgTTL;
   m_iSockType = ancestor.m_iSockType;
   m_iIPversion = ancestor.m_iIPversion;

   #ifdef CUSTOM_CC
      m_pCCFactory = ancestor.m_pCCFactory->clone();
   #else
      m_pCCFactory = NULL;
   #endif
   m_pCC = NULL;

   m_iRTT = ancestor.m_iRTT;
   m_iRTTVar = ancestor.m_iRTTVar;
   m_ullCPUFrequency = ancestor.m_ullCPUFrequency;

   // Initial status
   m_bOpened = false;
   m_bConnected = false;
   m_bBroken = false;
}

CUDT::~CUDT()
{
   // release mutex/condtion variables
   destroySynch();

   // destroy the data structures
   if (m_pChannel)
      delete m_pChannel;
   if (m_pSndBuffer)
      delete m_pSndBuffer;
   if (m_pRcvBuffer)
      delete m_pRcvBuffer;
   if (m_pSndLossList)
      delete m_pSndLossList;
   if (m_pRcvLossList)
      delete m_pRcvLossList;
   if (m_pTimer)
      delete m_pTimer;
   if (m_pIrrPktList)
      delete m_pIrrPktList;
   if (m_pACKWindow)
      delete m_pACKWindow;
   if (m_pSndTimeWindow)
      delete m_pSndTimeWindow;
   if (m_pRcvTimeWindow)
      delete m_pRcvTimeWindow;
   if (m_pCCFactory)
      delete m_pCCFactory;
   if (m_pCC)
      delete m_pCC;
}

void CUDT::setOpt(UDTOpt optName, const void* optval, const __int32&)
{
   CGuard cg(m_ConnectionLock);
   CGuard sendguard(m_SendLock);
   CGuard recvguard(m_RecvLock);

   switch (optName)
   {
   case UDT_MSS:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iMSS = *(__int32 *)optval;
      if (m_iMSS < 28)
         throw CUDTException(5, 3, 0);
      break;

   case UDT_SNDSYN:
      m_bSynSending = *(bool *)optval;
      break;

   case UDT_RCVSYN:
      m_bSynRecving = *(bool *)optval;
      break;

   case UDT_CC:
      #ifndef CUSTOM_CC
         throw CUDTException(5, 0, 0);
      #else
         if (m_bOpened)
            throw CUDTException(5, 1, 0);
         if (NULL != m_pCCFactory)
            delete m_pCCFactory;
         m_pCCFactory = ((CCCVirtualFactory *)optval)->clone();
      #endif

      break;

   case UDT_FC:
      if (m_bConnected)
         throw CUDTException(5, 2, 0);

      m_iFlightFlagSize = *(__int32 *)optval;
      if (m_iFlightFlagSize < 1)
         throw CUDTException(5, 3);
      break;

   case UDT_SNDBUF:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iSndQueueLimit = *(__int32 *)optval;
      if (m_iSndQueueLimit <= 0)
         throw CUDTException(5, 3, 0);
      break;

   case UDT_RCVBUF:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iUDTBufSize = *(__int32 *)optval;
      if (m_iUDTBufSize < (m_iMSS - 28) * 16)
         throw CUDTException(5, 3, 0);
      break;

   case UDT_LINGER:
      m_Linger = *(linger*)optval;
      break;

   case UDP_SNDBUF:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iUDPSndBufSize = *(__int32 *)optval;
      break;

   case UDP_RCVBUF:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iUDPRcvBufSize = *(__int32 *)optval;
      break;

   case UDT_MAXMSG:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iMaxMsg = *(__int32 *)optval;
      break;

   case UDT_MSGTTL:
      if (m_bOpened)
         throw CUDTException(5, 1, 0);

      m_iMsgTTL = *(__int32 *)optval;
      break;

   default:
      throw CUDTException(5, 0, 0);
   }
}

void CUDT::getOpt(UDTOpt optName, void* optval, __int32& optlen)
{
   CGuard cg(m_ConnectionLock);

   switch (optName)
   {
   case UDT_MSS:
      *(__int32 *)optval = m_iMSS;
      optlen = sizeof(__int32);
      break;

   case UDT_SNDSYN:
      *(bool *)optval = m_bSynSending;
      optlen = sizeof(bool);
      break;

   case UDT_RCVSYN:
      *(bool *)optval = m_bSynRecving;
      optlen = sizeof(bool);
      break;

   case UDT_CC:
      #ifndef CUSTOM_CC
         throw CUDTException(5, 0, 0);
      #else
         if (!m_bOpened)
            throw CUDTException(5, 5, 0);
         *(CCC**)optval = m_pCC;
         optlen = sizeof(CCC*);
      #endif

      break;

   case UDT_FC:
      *(__int32 *)optval = m_iFlightFlagSize;
      optlen = sizeof(__int32);
      break;

   case UDT_SNDBUF:
      *(__int32 *)optval = m_iUDTBufSize;
      optlen = sizeof(__int32);
      break;

   case UDT_RCVBUF:
      *(__int32 *)optval = m_iUDTBufSize;
      optlen = sizeof(__int32);
      break;

   case UDT_LINGER:
      if (optlen < (__int32)(sizeof(linger)))
         throw CUDTException(5, 3, 0);

      *(linger*)optval = m_Linger;
      optlen = sizeof(linger);
      break;

   case UDP_SNDBUF:
      *(__int32 *)optval = m_iUDPSndBufSize;
      optlen = sizeof(__int32);
      break;

   case UDP_RCVBUF:
      *(__int32 *)optval = m_iUDPRcvBufSize;
      optlen = sizeof(__int32);
      break;

   case UDT_MAXMSG:
      *(__int32 *)optval = m_iMaxMsg;
      optlen = sizeof(__int32);
      break;

   case UDT_MSGTTL:
      *(__int32 *)optval = m_iMsgTTL;
      optlen = sizeof(__int32);
      break;

   default:
      throw CUDTException(5, 0, 0);
   }
}

void CUDT::open(const sockaddr* addr)
{
   CGuard cg(m_ConnectionLock);

   // Initial status
   m_bClosing = false;
   m_bShutdown = false;
   m_bListening = false;
   m_iEXPCount = 1;

   // Initial sequence number, loss, acknowledgement, etc.
   m_iPktSize = m_iMSS - 28;
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
   m_iISN = 0;
   m_iPeerISN = 0;
 
   m_bLoss = false;
   gettimeofday(&m_LastSYNTime, 0);

   m_iSndLastAck = 0;
   m_iSndLastDataAck = 0;
   m_iSndCurrSeqNo = -1;

   m_iRcvLastAck = 0;
   m_iRcvLastAckAck = 0;
   m_ullLastAckTime = 0;
   m_iRcvCurrSeqNo = -1;
   m_iNextExpect = 0;
   m_bReadBuf = false;

   m_iLastDecSeq = -1;
   m_iNAKCount = 0;
   m_iDecRandom = 1;
   m_iAvgNAKNum = 1;

   m_iBandwidth = 1;
   m_bSndSlowStart = true;
   m_bRcvSlowStart = true;
   m_bFreeze = false;

   m_iAckSeqNo = 0;

   m_iSndHandle = (1 << 30);
   m_iRcvHandle = -(1 << 30);

   // Initial sending rate = 1us
   m_ullInterval = m_ullCPUFrequency;
   m_ullTimeDiff = 0;
   m_ullLastDecRate = m_ullCPUFrequency;

   // default congestion window size = infinite
   m_dCongestionWindow = 1 << 30;

   // Initial Window Size = 16 packets
   m_iFlowWindowSize = 16;
   m_iFlowControlWindow = 16;
   m_iMaxFlowWindowSize = m_iFlightFlagSize;

   #ifdef CUSTOM_CC
      m_pCC = m_pCCFactory->create();
      m_pCC->m_UDT = m_SocketID;
      m_pCC->m_pUDT = this;
      m_ullInterval = (unsigned __int64)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
      m_dCongestionWindow = m_pCC->m_dCWndSize;
   #endif

   #ifdef TRACE
      // trace information
      gettimeofday(&m_StartTime, 0);
      m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;
      gettimeofday(&m_LastSampleTime, 0);
      m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
   #endif

   // Construct and open a channel
   try
   {
      m_pChannel = new CChannel(m_iIPversion);

      m_pChannel->setSndBufSize(m_iUDPSndBufSize);
      m_pChannel->setRcvBufSize(m_iUDPRcvBufSize);

      m_pChannel->open(addr);
   }
   catch(CUDTException e)
   {
      // Let applications to process this exception
      throw CUDTException(e);
   }

   // Now UDT is opened.
   m_bOpened = true;
}

#ifndef WIN32
void* CUDT::listenHandler(void* listener)
#else
DWORD WINAPI CUDT::listenHandler(LPVOID listener)
#endif
{
   CUDT* self = static_cast(listener);

   // Type 0 (handshake) control packet
   CPacket initpkt;
   char* initdata = new char [self->m_iPayloadSize];
   CHandShake* hs = (CHandShake *)initdata;
   initpkt.pack(0, NULL, initdata, sizeof(CHandShake));

   sockaddr* addr;
   if (AF_INET == self->m_iIPversion)
      addr = (sockaddr*)(new sockaddr_in);
   else
      addr = (sockaddr*)(new sockaddr_in6);

   while (!self->m_bClosing)
   {
      // Listening to the port...
      initpkt.setLength(self->m_iPayloadSize);
      if (self->m_pChannel->recvfrom(initpkt, addr) <= 0)
         continue;

      // When a peer side connects in...
      if ((1 == initpkt.getFlag()) && (0 == initpkt.getType()))
         s_UDTUnited.newConnection(self->m_SocketID, addr, hs);
   }

   if (AF_INET == self->m_iIPversion)
      delete (sockaddr_in*)addr;
   else
      delete (sockaddr_in6*)addr;

   #ifndef WIN32
      return NULL;
   #else
      return 0;
   #endif
}

void CUDT::listen()
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened)
      throw CUDTException(5, 0, 0);

   if (m_bConnected)
      throw CUDTException(5, 2, 0);

   // listen can be called more than once
   if (m_bListening)
      return;

   #ifndef WIN32
      if (0 != pthread_create(&m_ListenThread, NULL, CUDT::listenHandler, this))
         throw CUDTException(7, 0, errno);
   #else
      if (NULL == (m_ListenThread = CreateThread(NULL, 0, CUDT::listenHandler, this, 0, NULL)))
         throw CUDTException(7, 0, GetLastError());
   #endif

   m_bListening = true;
}

void CUDT::connect(const sockaddr* serv_addr)
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened)
      throw CUDTException(5, 0, 0);

   if (m_bListening)
      throw CUDTException(5, 2, 0);

   if (m_bConnected)
      throw CUDTException(5, 2, 0);

   // I will connect to an Initiator, so I am NOT an initiator.
   m_bInitiator = false;

   CPacket initpkt;
   char* initdata = new char [m_iPayloadSize];
   CHandShake* hs = (CHandShake *)initdata;

   // This is my current configurations.
   hs->m_iVersion = m_iVersion;
   hs->m_iMSS = m_iMSS;
   hs->m_iFlightFlagSize = m_iFlightFlagSize;

   // Random Initial Sequence Number
   timeval currtime;
   gettimeofday(&currtime, 0);
   srand(currtime.tv_usec);
   m_iISN = hs->m_iISN = (__int32)(double(rand()) * m_iMaxSeqNo / (RAND_MAX + 1.0));

   m_iLastDecSeq = hs->m_iISN - 1;
   m_iSndLastAck = hs->m_iISN;
   m_iSndLastDataAck = hs->m_iISN;
   m_iSndCurrSeqNo = hs->m_iISN - 1;

   initpkt.pack(0, NULL, initdata, sizeof(CHandShake));
 
   // Inform the initiator my configurations.
   m_pChannel->sendto(initpkt, serv_addr);

   sockaddr* peer_addr;
   if (AF_INET == m_iIPversion)
      peer_addr = (sockaddr*)(new sockaddr_in);
   else
      peer_addr = (sockaddr*)(new sockaddr_in6);

   // Wait for the negotiated configurations from the peer side.
   initpkt.setLength(m_iPayloadSize);
   m_pChannel->recvfrom(initpkt, peer_addr);

   const __int32 timeo = 3000000;

   timeval entertime;
   gettimeofday(&entertime, 0);

   while ((initpkt.getLength() <= 0) || (1 != initpkt.getFlag()) || (0 != initpkt.getType()))
   {
      initpkt.setLength(sizeof(CHandShake));
      m_pChannel->sendto(initpkt, serv_addr);

      initpkt.setLength(m_iPayloadSize);
      m_pChannel->recvfrom(initpkt, peer_addr);

      gettimeofday(&currtime, 0);
      if ((currtime.tv_sec - entertime.tv_sec) * 1000000 + (currtime.tv_usec - entertime.tv_usec) > timeo)
         throw CUDTException(1, 1, 0);
   }

   m_pChannel->connect(peer_addr);

   if (AF_INET == m_iIPversion)
      delete (sockaddr_in*)peer_addr;
   else
      delete (sockaddr_in6*)peer_addr;

   // Got it. Re-configure according to the negotiated values.
   m_iMSS = hs->m_iMSS;
   m_iMaxFlowWindowSize = hs->m_iFlightFlagSize;
   m_iPktSize = m_iMSS - 28;
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;

   m_iPeerISN = hs->m_iISN;      

   m_iRcvLastAck = hs->m_iISN;
   m_iRcvLastAckAck = hs->m_iISN;
   m_iRcvCurrSeqNo = hs->m_iISN - 1;
   m_iNextExpect = hs->m_iISN;

   m_iUserBufBorder = m_iRcvLastAck + (__int32)ceil(double(m_iUDTBufSize) / m_iPayloadSize);

   delete [] initdata;

   // Prepare all structures
   m_pTimer = new CTimer;
   m_pSndBuffer = new CSndBuffer;
   m_pRcvBuffer = new CRcvBuffer(m_iUDTBufSize);

   // after introducing lite ACK, the sndlosslist may not be cleared in time, so it requires twice space.
   m_pSndLossList = new CSndLossList(m_iMaxFlowWindowSize * 2, m_iSeqNoTH, m_iMaxSeqNo);

   m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize, m_iSeqNoTH, m_iMaxSeqNo);
   m_pIrrPktList = new CIrregularPktList(m_iFlightFlagSize, m_iSeqNoTH, m_iMaxSeqNo);
   m_pACKWindow = new CACKWindow(4096);
   m_pRcvTimeWindow = new CPktTimeWindow(m_iQuickStartPkts, 16, 64);

   #ifdef CUSTOM_CC
      m_pCC->init();
   #endif

   // Now I am also running, a little while after the Initiator was running.
   #ifndef WIN32
      m_bSndThrStart = false;
      if (0 != pthread_create(&m_RcvThread, NULL, CUDT::rcvHandler, this))
         throw CUDTException(1, 3, errno);
   #else
      m_SndThread = NULL;
      if (NULL == (m_RcvThread = CreateThread(NULL, 0, CUDT::rcvHandler, this, 0, NULL)))
         throw CUDTException(1, 3, GetLastError());
   #endif

   // And, I am connected too.
   m_bConnected = true;
}

void CUDT::connect(const sockaddr* peer, const CHandShake* hs)
{
   // This UDT entity is an Initiator, since it is started at the server side.
   m_bInitiator = true;

   // Type 0 (handshake) control packet
   CPacket initpkt;
   CHandShake ci;
   memcpy(&ci, hs, sizeof(CHandShake));
   initpkt.pack(0, NULL, &ci, sizeof(CHandShake));

   // Uses the smaller MSS between the peers        
   if (ci.m_iMSS > m_iMSS)
      ci.m_iMSS = m_iMSS;
   else
      m_iMSS = ci.m_iMSS;

   // exchange info for maximum flow window size
   m_iMaxFlowWindowSize = ci.m_iFlightFlagSize;
   ci.m_iFlightFlagSize = m_iFlightFlagSize;

   m_iPeerISN = ci.m_iISN;

   m_iRcvLastAck = ci.m_iISN;
   m_iRcvLastAckAck = ci.m_iISN;
   m_iRcvCurrSeqNo = ci.m_iISN - 1;
   m_iNextExpect = ci.m_iISN;

   m_pChannel->connect(peer);

   // Random Initial Sequence Number
   timeval currtime;
   gettimeofday(&currtime, 0);
   srand(currtime.tv_usec);
   ci.m_iISN = m_iISN = (__int32)(double(rand()) * m_iMaxSeqNo / (RAND_MAX + 1.0));

   m_iLastDecSeq = m_iISN - 1;
   m_iSndLastAck = m_iISN;
   m_iSndLastDataAck = m_iISN;
   m_iSndCurrSeqNo = m_iISN - 1;

   // Send back the negotiated configurations.
   *m_pChannel << initpkt;
  
   m_iPktSize = m_iMSS - 28;
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;

   m_iUserBufBorder = m_iRcvLastAck + (__int32)ceil(double(m_iUDTBufSize) / m_iPayloadSize);

   // Prepare all structures
   m_pTimer = new CTimer;
   m_pSndBuffer = new CSndBuffer;
   m_pRcvBuffer = new CRcvBuffer(m_iUDTBufSize);
   m_pSndLossList = new CSndLossList(m_iMaxFlowWindowSize * 2, m_iSeqNoTH, m_iMaxSeqNo);
   m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize, m_iSeqNoTH, m_iMaxSeqNo);
   m_pIrrPktList = new CIrregularPktList(m_iFlightFlagSize, m_iSeqNoTH, m_iMaxSeqNo);
   m_pACKWindow = new CACKWindow(4096);
   m_pRcvTimeWindow = new CPktTimeWindow(m_iQuickStartPkts, 16, 64);

   #ifdef CUSTOM_CC
      m_pCC->init();
   #endif

   // UDT is now running...
   #ifndef WIN32
      m_bSndThrStart = false;
      if (0 != pthread_create(&m_RcvThread, NULL, CUDT::rcvHandler, this))
         throw CUDTException(1, 3, errno);
   #else
      m_SndThread = NULL;
      if (NULL == (m_RcvThread = CreateThread(NULL, 0, CUDT::rcvHandler, this, 0, NULL)))
         throw CUDTException(1, 3, GetLastError());
   #endif

   // And of course, it is connected.
   m_bConnected = true;
}

void CUDT::close()
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened)
      return;

   if (0 != m_Linger.l_onoff)
   {
      timeval t1, t2;
      gettimeofday(&t1, 0);
      t2 = t1;

      while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && ((t2.tv_sec - t1.tv_sec - 1) < m_Linger.l_linger))
      {
         #ifndef WIN32
            usleep(10);
         #else
            Sleep(1);
         #endif

         gettimeofday(&t2, 0);
      }
   }

   #ifdef CUSTOM_CC
      m_pCC->close();
   #endif

   // Inform the threads handler to stop.
   m_bClosing = true;
   m_bBroken = true;

   // Signal the sender and recver if they are waiting for data.
   releaseSynch();

   // Wait for the threads to exit.

   #ifndef WIN32
      if (m_bListening)
      {
         pthread_join(m_ListenThread, NULL);
         m_bListening = false;
      }
      if (m_bConnected)
      {
         m_pTimer->interrupt();
         if (m_bSndThrStart)
         {
            pthread_join(m_SndThread, NULL);
            m_bSndThrStart = false;
         }
         pthread_join(m_RcvThread, NULL);
         m_bConnected = false;
      }
   #else
      if (m_bListening)
      {
         WaitForSingleObject(m_ListenThread, INFINITE);
         m_bListening = false;
      }
      if (m_bConnected)
      {
         m_pTimer->interrupt();
         if (NULL != m_SndThread)
         {
            WaitForSingleObject(m_SndThread, INFINITE);
            m_SndThread = NULL;
         }
         WaitForSingleObject(m_RcvThread, INFINITE);
         m_bConnected = false;
      }
   #endif

   // waiting all send and recv calls to stop
   CGuard sendguard(m_SendLock);
   CGuard recvguard(m_RecvLock);

   // Channel is to be destroyed.
   if (m_pChannel)
   {
      // inform the peer side with a "shutdown" packet
      if (!m_bShutdown)
         sendCtrl(5);

      m_pChannel->disconnect();
      delete m_pChannel;
      m_pChannel = NULL;
   }

   // And structures released.
   if (m_pSndBuffer)
      delete m_pSndBuffer;
   if (m_pRcvBuffer)
      delete m_pRcvBuffer;
   if (m_pSndLossList)
      delete m_pSndLossList;
   if (m_pRcvLossList)
      delete m_pRcvLossList;
   if (m_pTimer)
      delete m_pTimer;
   if (m_pIrrPktList)
      delete m_pIrrPktList;
   if (m_pACKWindow)
      delete m_pACKWindow;
   if (m_pSndTimeWindow)
      delete m_pSndTimeWindow;
   if (m_pRcvTimeWindow)
      delete m_pRcvTimeWindow;
   if (m_pCCFactory)
      delete m_pCCFactory;
   if (m_pCC)
      delete m_pCC;

   m_pSndBuffer = NULL;
   m_pRcvBuffer = NULL;
   m_pSndLossList = NULL;
   m_pRcvLossList = NULL;
   m_pTimer = NULL;
   m_pIrrPktList = NULL;
   m_pACKWindow = NULL;
   m_pSndTimeWindow = NULL;
   m_pRcvTimeWindow = NULL;
   m_pCCFactory = NULL;
   m_pCC = NULL;

   // CLOSED.
   m_bOpened = false;
}

#ifndef WIN32
void* CUDT::sndHandler(void* sender)
#else
DWORD WINAPI CUDT::sndHandler(LPVOID sender)
#endif
{
   CUDT* self = static_cast(sender);

   CPacket datapkt;
   __int32 payload;
   __int32 offset;

   #ifdef CUSTOM_CC
      __int32 cwnd;
   #endif

   bool probe = false;

   unsigned __int64 entertime;
   unsigned __int64 targettime;
   #ifdef NO_BUSY_WAITING
      unsigned __int64 currtime;
      //__int32 burst = 0;
   #endif

   timeval now;
   #ifndef WIN32
      timespec timeout;
   #endif

   while (!self->m_bClosing)
   {
      // Remember the time the last packet is sent.
      self->m_pTimer->rdtsc(entertime);

      // Loss retransmission always has higher priority.
      if ((datapkt.m_iSeqNo = self->m_pSndLossList->getLostSeq()) >= 0)
      {
         // protect m_iSndLastDataAck from updating by ACK processing
         CGuard ackguard(self->m_AckLock);

         if ((datapkt.m_iSeqNo >= self->m_iSndLastDataAck) && (datapkt.m_iSeqNo < self->m_iSndLastDataAck + self->m_iSeqNoTH))
            offset = (datapkt.m_iSeqNo - self->m_iSndLastDataAck) * self->m_iPayloadSize;
         else if (datapkt.m_iSeqNo < self->m_iSndLastDataAck - self->m_iSeqNoTH)
            offset = (datapkt.m_iSeqNo + self->m_iMaxSeqNo - self->m_iSndLastDataAck) * self->m_iPayloadSize;
         else
            continue;

         if ((payload = self->m_pSndBuffer->readData(&(datapkt.m_pcData), offset, self->m_iPayloadSize)) == 0)
            continue;

         #ifdef TRACE
            ++ self->m_iTraceRetrans;
         #endif
      }
      // If no loss, pack a new packet.
      else
      {
         #ifndef CUSTOM_CC
            if (self->m_iFlowWindowSize <= ((self->m_iSndCurrSeqNo - self->m_iSndLastAck + 1 + self->m_iMaxSeqNo) % self->m_iMaxSeqNo))
         #else
            cwnd = (self->m_iFlowWindowSize < (__int32)self->m_dCongestionWindow) ? self->m_iFlowWindowSize : (__int32)self->m_dCongestionWindow;
            if (cwnd <= ((self->m_iSndCurrSeqNo - self->m_iSndLastAck + 1 + self->m_iMaxSeqNo) % self->m_iMaxSeqNo))
         #endif
         {
            //wait here for ACK, NAK, or EXP (i.e, some data to sent)
            #ifndef WIN32
               gettimeofday(&now, 0);
               if (now.tv_usec < 990000)
               {
                  timeout.tv_sec = now.tv_sec;
                  timeout.tv_nsec = (now.tv_usec + 10000) * 1000;
               }
               else
               {
                  timeout.tv_sec = now.tv_sec + 1;
                  timeout.tv_nsec = now.tv_usec * 1000;
               }
               pthread_cond_timedwait(&self->m_WindowCond, &self->m_WindowLock, &timeout);
            #else
               WaitForSingleObject(self->m_WindowCond, 1);
            #endif

            #ifdef NO_BUSY_WAITING
               // the waiting time should not be counted in. clear the time diff to zero.
               self->m_ullTimeDiff = 0;
            #endif

            continue;
         }

         if (0 == (payload = self->m_pSndBuffer->readData(&(datapkt.m_pcData), self->m_iPayloadSize)))
         {
            //check if the sender buffer is empty
            if (0 == self->m_pSndBuffer->getCurrBufSize())
            {
               // If yes, sleep here until a signal comes.
               #ifndef WIN32
                  pthread_mutex_lock(&(self->m_SendDataLock));
                  while ((0 == self->m_pSndBuffer->getCurrBufSize()) && (!self->m_bClosing))
                     pthread_cond_wait(&(self->m_SendDataCond), &(self->m_SendDataLock));
                  pthread_mutex_unlock(&(self->m_SendDataLock));
               #else
                  WaitForSingleObject(self->m_SendDataLock, INFINITE);
                  while ((0 == self->m_pSndBuffer->getCurrBufSize()) && (!self->m_bClosing))
                  {
                     ReleaseMutex(self->m_SendDataLock);
                     WaitForSingleObject(self->m_SendDataCond, INFINITE);
                     WaitForSingleObject(self->m_SendDataLock, INFINITE);
                  }
                  ReleaseMutex(self->m_SendDataLock);
               #endif
            }

            #ifdef NO_BUSY_WAITING
               // the waiting time should not be counted in. clear the time diff to zero.
               self->m_ullTimeDiff = 0;
            #endif

            continue;
         }

         self->m_iSndCurrSeqNo = (self->m_iSndCurrSeqNo + 1) % self->m_iMaxSeqNo;
         datapkt.m_iSeqNo = self->m_iSndCurrSeqNo;
 
         if (0 == self->m_iSndCurrSeqNo % self->m_iProbeInterval)
            probe = true;
      }

      gettimeofday(&now, 0);
      datapkt.m_iTimeStamp = (now.tv_sec - self->m_StartTime.tv_sec) * 1000000 + now.tv_usec - self->m_StartTime.tv_usec;

      // Now sending.
      datapkt.setLength(payload);
      *(self->m_pChannel) << datapkt;

      self->m_pSndTimeWindow->onPktSent(now);

      #ifdef CUSTOM_CC
         self->m_pCC->onPktSent(&datapkt);
      #endif

      #ifdef TRACE
         ++ self->m_llTraceSent;
      #endif

      if (probe)
      {
         // sends out probing packet pair
         self->m_pTimer->rdtsc(targettime);
         probe = false;
      }
      else if (self->m_bFreeze)
      {
         // sending is fronzen!
         targettime = entertime + self->m_iSYNInterval * self->m_ullCPUFrequency + self->m_ullInterval;
         self->m_bFreeze = false;
      }
      else
         targettime = entertime + self->m_ullInterval;

      // wait for an inter-packet time.
      #ifndef NO_BUSY_WAITING
         self->m_pTimer->sleepto(targettime);
      #else
         self->m_pTimer->rdtsc(currtime);

         if (currtime >= targettime)
            continue;

         while (currtime + self->m_ullTimeDiff < targettime)
         {
            //burst = 0;

            #ifndef WIN32
               gettimeofday(&now, 0);
               if (now.tv_usec < 990000)
               {
                  timeout.tv_sec = now.tv_sec;
                  timeout.tv_nsec = (now.tv_usec + 10000) * 1000;
               }
               else
               {
                  timeout.tv_sec = now.tv_sec + 1;
                  timeout.tv_nsec = now.tv_usec * 1000;
               }
               if (0 == pthread_cond_timedwait(&self->m_WindowCond, &self->m_WindowLock, &timeout))
                  break;
            #else
               if (WAIT_TIMEOUT != WaitForSingleObject(self->m_WindowCond, 1))
                  break;
            #endif
            self->m_pTimer->rdtsc(currtime);
         }

         self->m_pTimer->rdtsc(currtime);
         if (currtime >= targettime)
            self->m_ullTimeDiff += currtime - targettime;
         else if (self->m_ullTimeDiff > targettime - currtime)
            self->m_ullTimeDiff -= targettime - currtime;
         else
            self->m_ullTimeDiff = 0;
      #endif
   }

   #ifndef WIN32
      return NULL;
   #else
      return 0;
   #endif
}

#ifndef WIN32
void* CUDT::rcvHandler(void* recver)
#else
DWORD WINAPI CUDT::rcvHandler(LPVOID recver)
#endif
{
   CUDT* self = static_cast(recver);

   CPacket packet;
   char* payload = new char [self->m_iPayloadSize];
   bool nextslotfound;
   __int32 offset;
   __int32 loss;
   #if defined (CUSTOM_CC) || defined (NO_BUSY_WAITING)
      __int32 pktcount = 0;
   #endif

   // time
   unsigned __int64 currtime;
   unsigned __int64 nextacktime;
   unsigned __int64 nextnaktime;
   unsigned __int64 nextexptime;

   // SYN interval, in clock cycles
   const unsigned __int64 ullsynint = self->m_iSYNInterval * self->m_ullCPUFrequency;

   // ACK, NAK, and EXP intervals, in clock cycles
   unsigned __int64 ullackint = ullsynint;
   #ifdef CUSTOM_CC
      ullackint = self->m_pCC->m_iACKPeriod * 1000 * self->m_ullCPUFrequency;
   #endif
   unsigned __int64 ullnakint = (self->m_iRTT + 4 * self->m_iRTTVar) * self->m_ullCPUFrequency;
   unsigned __int64 ullexpint = (self->m_iRTT + 4 * self->m_iRTTVar) * self->m_ullCPUFrequency + ullsynint;

   // Set up the timers.
   self->m_pTimer->rdtsc(nextacktime);
   nextacktime += ullackint;
   self->m_pTimer->rdtsc(nextnaktime);
   nextnaktime += ullnakint;
   self->m_pTimer->rdtsc(nextexptime);
   nextexptime += ullexpint;

   while (!self->m_bClosing)
   {
      #ifdef NO_BUSY_WAITING
         // signal sleeping sender
         #ifndef WIN32
            pthread_cond_signal(&self->m_WindowCond);
         #else
            SetEvent(self->m_WindowCond);
         #endif
      #endif

      #ifdef CUSTOM_CC
         // update CC parameters
         self->m_ullInterval = (unsigned __int64)(self->m_pCC->m_dPktSndPeriod * self->m_ullCPUFrequency);
         self->m_dCongestionWindow = self->m_pCC->m_dCWndSize;
      #endif

      // "recv"/"recvfile" is called, overlapped mode is activated, and not enough received data in the protocol buffer
      if (self->m_bReadBuf)
      {
         // Check if there is enough data now.
         #ifndef WIN32
            pthread_mutex_lock(&(self->m_OverlappedRecvLock));
            self->m_bReadBuf = self->m_pRcvBuffer->readBuffer(const_cast(self->m_pcTempData), const_cast<__int32&>(self->m_iTempLen));
            pthread_mutex_unlock(&(self->m_OverlappedRecvLock));
         #else
            WaitForSingleObject(self->m_OverlappedRecvLock, INFINITE);
            self->m_bReadBuf = self->m_pRcvBuffer->readBuffer(const_cast(self->m_pcTempData), const_cast<__int32&>(self->m_iTempLen));
            ReleaseMutex(self->m_OverlappedRecvLock);
         #endif

         // Still no?! Register the application buffer.
         if (!self->m_bReadBuf)
         {
            offset = self->m_pRcvBuffer->registerUserBuf(const_cast(self->m_pcTempData), const_cast<__int32&>(self->m_iTempLen), self->m_iRcvHandle, self->m_iTempRoutine);
            // there is no seq. wrap for user buffer border. If it exceeds the max. seq., we just ignore it.
            self->m_iUserBufBorder = self->m_iRcvLastAck + (__int32)ceil(double(self->m_iTempLen - offset) / self->m_iPayloadSize);
         }

         // Otherwise, inform the blocked "recv"/"recvfile" call that the expected data has arrived.
         // or returns immediately in non-blocking IO mode.
         if (self->m_bReadBuf || !self->m_bSynRecving)
         {
            self->m_bReadBuf = false;
            #ifndef WIN32
               pthread_mutex_lock(&(self->m_OverlappedRecvLock));
               pthread_cond_signal(&(self->m_OverlappedRecvCond));
               pthread_mutex_unlock(&(self->m_OverlappedRecvLock));
            #else
               SetEvent(self->m_OverlappedRecvCond);
            #endif
         }
      }

      self->m_pTimer->rdtsc(currtime);
      loss = self->m_pRcvLossList->getFirstLostSeq();

      // Query the timers if any of them is expired.
      if ((currtime > nextacktime) || (loss >= self->m_iUserBufBorder) || ((self->m_iRcvCurrSeqNo >= self->m_iUserBufBorder - 1) && (loss < 0)))
      {
         // ACK timer expired, or user buffer is fulfilled.
         self->sendCtrl(2);

         self->m_pTimer->rdtsc(currtime);
         nextacktime = currtime + ullackint;

         #if defined(CUSTOM_CC) || defined (NO_BUSY_WAITING)
            pktcount = 0;
         #endif
      }
      //send a "light" ACK
      #if defined (CUSTOM_CC)
         else if ((self->m_pCC->m_iACKInterval > 0) && (self->m_pCC->m_iACKInterval <= pktcount))
         {
            self->sendCtrl(2, NULL, NULL, 2 * sizeof(__int32));
            pktcount = 0;
         }
      #elif defined (NO_BUSY_WAITING)
         else if (self->m_iSelfClockInterval <= pktcount)
         {
            self->sendCtrl(2, NULL, NULL, 2 * sizeof(__int32));
            pktcount = 0;
         }
      #endif

      if ((loss >= 0) && (currtime > nextnaktime))
      {
         // NAK timer expired, and there is loss to be reported.
         self->sendCtrl(3);

         self->m_pTimer->rdtsc(currtime);
         nextnaktime = currtime + ullnakint;
      }
      else if ((currtime > nextexptime) && (0 == self->m_pSndLossList->getLossLength()))
      {
         // Haven't receive any information from the peer, is it dead?!
         // timeout: at least 16 expirations and must be greater than 3 seconds and be less than 30 seconds
         if (((self->m_iEXPCount > 16) && 
             (self->m_iEXPCount * ((self->m_iEXPCount - 1) * (self->m_iRTT + 4 * self->m_iRTTVar) / 2 + self->m_iSYNInterval) > 3000000))
             || (self->m_iEXPCount * ((self->m_iEXPCount - 1) * (self->m_iRTT + 4 * self->m_iRTTVar) / 2 + self->m_iSYNInterval) > 30000000))
         {
            //
            // Connection is broken. 
            // UDT does not signal any information about this instead of to stop quietly.
            // Apllication will detect this when it calls any UDT methods next time.
            //
            self->m_bClosing = true;
            self->m_bBroken = true;

            self->releaseSynch();

            continue;
         }

         // sender: Insert all the packets sent after last received acknowledgement into the sender loss list.
         // recver: Send out a keep-alive packet
         if (((self->m_iSndCurrSeqNo + 1) % self->m_iMaxSeqNo) != self->m_iSndLastAck)
         {
            __int32 csn = self->m_iSndCurrSeqNo;
            self->m_pSndLossList->insert(const_cast<__int32&>(self->m_iSndLastAck), csn);

            #ifdef CUSTOM_CC
               self->m_pCC->onTimeout();
            #endif
         }
         else
            self->sendCtrl(1);

         if (self->m_pSndBuffer->getCurrBufSize() > 0)
         {
            // Wake up the waiting sender (avoiding deadlock on an infinite sleeping)
            self->m_pTimer->interrupt();

            #ifndef WIN32
               pthread_cond_signal(&self->m_WindowCond);
            #else
               SetEvent(self->m_WindowCond);
            #endif
         }

         ++ self->m_iEXPCount;

         ullexpint = (self->m_iEXPCount * (self->m_iRTT + 4 * self->m_iRTTVar) + self->m_iSYNInterval) * self->m_ullCPUFrequency;

         #ifdef CUSTOM_CC
            if (self->m_pCC->m_iRTO > 0)
               ullexpint = self->m_pCC->m_iRTO * self->m_ullCPUFrequency;
         #endif

         self->m_pTimer->rdtsc(nextexptime);
         nextexptime += ullexpint;
      }

      ////////////////////////////////////////////////////////////////////////////////////////////
      // Below is the packet receiving/processing part.

      packet.setLength(self->m_iPayloadSize);

      offset = self->m_iNextExpect - self->m_iRcvLastAck;
      if (offset < -self->m_iSeqNoTH)
         offset += self->m_iMaxSeqNo;

      // Look for a slot for the speculated data.
      if (!(self->m_pRcvBuffer->nextDataPos(&(packet.m_pcData), offset * self->m_iPayloadSize - self->m_pIrrPktList->currErrorSize(self->m_iNextExpect), self->m_iPayloadSize)))
      {
         packet.m_pcData = payload;
         nextslotfound = false;
      }
      else
         nextslotfound = true;

      // Receiving...
      *(self->m_pChannel) >> packet;

      // Got nothing?
      if (packet.getLength() <= 0)
         continue;

      // Just heard from the peer, reset the expiration count.
      self->m_iEXPCount = 1;
      ullexpint = (self->m_iRTT + 4 * self->m_iRTTVar) * self->m_ullCPUFrequency + ullsynint;
      #ifdef CUSTOM_CC
         if (self->m_pCC->m_iRTO > 0)
            ullexpint = self->m_pCC->m_iRTO * self->m_ullCPUFrequency;
      #endif
      if (((self->m_iSndCurrSeqNo + 1) % self->m_iMaxSeqNo) == self->m_iSndLastAck)
      {
         self->m_pTimer->rdtsc(nextexptime);
         nextexptime += ullexpint;
      }

      // But this is control packet, process it!
      if (packet.getFlag())
      {
         self->processCtrl(packet);

         if ((2 == packet.getType()) || (6 == packet.getType()))
         {
            ullnakint = (self->m_iRTT + 4 * self->m_iRTTVar) * self->m_ullCPUFrequency;
            //do not resent the loss report within too short period
            if (ullnakint < ullsynint)
               ullnakint = ullsynint;
         }

         self->m_pTimer->rdtsc(currtime);
         if ((2 <= packet.getType()) && (4 >= packet.getType()))
            nextexptime = currtime + ullexpint;

         continue;
      }

      // update time/delay information
      self->m_pRcvTimeWindow->onPktArrival();

      // check if it is probing packet pair
      if (packet.m_iSeqNo % self->m_iProbeInterval < 2)
      {
         if (0 == packet.m_iSeqNo % self->m_iProbeInterval)
            self->m_pRcvTimeWindow->probe1Arrival();
         else
            self->m_pRcvTimeWindow->probe2Arrival();
      }

      #ifdef TRACE
         ++ self->m_llTraceRecv;
      #endif

      offset = packet.m_iSeqNo - self->m_iRcvLastAck;
      if (offset < -self->m_iSeqNoTH)
         offset += self->m_iMaxSeqNo;

      // Data is too old, discard it!
      if ((offset >= self->m_iFlightFlagSize) || (offset < 0))
         continue;

      // Oops, the speculation is wrong...
      if ((packet.m_iSeqNo != self->m_iNextExpect) || (!nextslotfound))
      {
         // Put the received data explicitly into the right slot.
         if (!(self->m_pRcvBuffer->addData(packet.m_pcData, offset * self->m_iPayloadSize - self->m_pIrrPktList->currErrorSize(packet.m_iSeqNo), packet.getLength())))
            continue;

         // Loss detection.
         if (((packet.m_iSeqNo > self->m_iRcvCurrSeqNo + 1) && (packet.m_iSeqNo - self->m_iRcvCurrSeqNo < self->m_iSeqNoTH)) || (packet.m_iSeqNo < self->m_iRcvCurrSeqNo - self->m_iSeqNoTH))
         {
            // If loss found, insert them to the receiver loss list
            self->m_pRcvLossList->insert(self->m_iRcvCurrSeqNo + 1, packet.m_iSeqNo - 1);

            // pack loss list for NAK
            __int32 lossdata[2];
            lossdata[0] = (self->m_iRcvCurrSeqNo + 1) | 0x80000000;
            lossdata[1] = packet.m_iSeqNo - 1;

            // Generate loss report immediately.
            self->sendCtrl(3, NULL, lossdata, (self->m_iRcvCurrSeqNo + 1 == packet.m_iSeqNo - 1) ? 1 : 2);

            #ifdef TRACE
               self->m_iTraceRcvLoss += (packet.m_iSeqNo - self->m_iRcvCurrSeqNo - 1 + self->m_iMaxSeqNo) % self->m_iMaxSeqNo;
            #endif
         }
      }

      // This is not a regular fixed size packet...
      if (packet.getLength() != self->m_iPayloadSize)
         self->m_pIrrPktList->addIrregularPkt(packet.m_iSeqNo, self->m_iPayloadSize - packet.getLength());

      // Update the current largest sequence number that has been received.
      if (((packet.m_iSeqNo > self->m_iRcvCurrSeqNo) && (packet.m_iSeqNo - self->m_iRcvCurrSeqNo < self->m_iSeqNoTH)) || (packet.m_iSeqNo < self->m_iRcvCurrSeqNo - self->m_iSeqNoTH))
      {
         self->m_iRcvCurrSeqNo = packet.m_iSeqNo;

         // Speculate next packet.
         self->m_iNextExpect = (self->m_iRcvCurrSeqNo + 1) % self->m_iMaxSeqNo;
      }
      else
      {
         // Or it is a retransmitted packet, remove it from receiver loss list.
         // rearrange receiver buffer if it is a first-come irregular packet

         if (self->m_pRcvLossList->remove(packet.m_iSeqNo) && (packet.getLength() < self->m_iPayloadSize))
            self->m_pRcvBuffer->moveData(offset * self->m_iPayloadSize - self->m_pIrrPktList->currErrorSize(packet.m_iSeqNo) + packet.getLength(), self->m_iPayloadSize - packet.getLength());
      }

      #ifdef CUSTOM_CC
         self->m_pCC->onPktReceived(&packet);
      #endif

      #if defined (CUSTOM_CC) || defined (NO_BUSY_WAITING)
         pktcount ++;
      #endif
   }

   // acknowledge those possible unacknowledged data, if there is any
   if (0 != self->m_pRcvBuffer->getRcvDataSize())
      self->sendCtrl(2);

   delete [] payload;

   #ifndef WIN32
      return NULL;
   #else
      return 0;
   #endif
}

void CUDT::sendCtrl(const __int32& pkttype, void* lparam, void* rparam, const __int32& size)
{
   CPacket ctrlpkt;

   switch (pkttype)
   {
   case 2: //010 - Acknowledgement
      {
      __int32 ack;

      // If there is no loss, the ACK is the current largest sequence number plus 1;
      // Otherwise it is the smallest sequence number in the receiver loss list.
      if (0 == m_pRcvLossList->getLossLength())
         ack = (m_iRcvCurrSeqNo + 1) % m_iMaxSeqNo;
      else
         ack = m_pRcvLossList->getFirstLostSeq();

      #if defined (CUSTOM_CC) || defined (NO_BUSY_WAITING)
         // send out a lite ACK
         // to save time on buffer processing and bandwidth/AS measurement, a lite ACK only feeds back an ACK number
         if (size == 2 * sizeof(__int32))
         {
            ctrlpkt.pack(2, NULL, &ack, 2 * sizeof(__int32));
            *m_pChannel << ctrlpkt;
               
            break;
         }
      #endif

      unsigned __int64 currtime;
      m_pTimer->rdtsc(currtime);

      // There is new received packet to acknowledge, update related information.
      if (((ack > m_iRcvLastAck) && (ack - m_iRcvLastAck < m_iSeqNoTH)) || (ack < m_iRcvLastAck - m_iSeqNoTH))
      {
         __int32 acksize = (ack - m_iRcvLastAck + m_iMaxSeqNo) % m_iMaxSeqNo;
         m_iRcvLastAck = ack;

         if (m_pRcvBuffer->ackData(acksize * m_iPayloadSize - m_pIrrPktList->currErrorSize(m_iRcvLastAck)))
         {
            //singal an blocking overlapped IO. 
            #ifndef WIN32
               pthread_mutex_lock(&m_OverlappedRecvLock);
               pthread_cond_signal(&m_OverlappedRecvCond);
               pthread_mutex_unlock(&m_OverlappedRecvLock);
            #else
               SetEvent(m_OverlappedRecvCond);
            #endif
         }

         m_iUserBufBorder = m_iRcvLastAck + (__int32)ceil(double(m_pRcvBuffer->getAvailBufSize()) / m_iPayloadSize);

         // signal a waiting "recv" call if there is any data available
         #ifndef WIN32
            pthread_mutex_lock(&m_RecvDataLock);
            if ((m_bSynRecving) && (0 != m_pRcvBuffer->getRcvDataSize()))
               pthread_cond_signal(&m_RecvDataCond);
            pthread_mutex_unlock(&m_RecvDataLock);
         #else
            if ((m_bSynRecving) && (0 != m_pRcvBuffer->getRcvDataSize()))
               SetEvent(m_RecvDataCond);
         #endif

         m_pIrrPktList->deleteIrregularPkt(m_iRcvLastAck);
      }
      else if (ack == m_iRcvLastAck)
      {
         #ifdef CUSTOM_CC
            break;
         #endif

         if ((currtime - m_ullLastAckTime) < ((m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))
            break;
      }
      else
         break;

      // Send out the ACK only if has not been received by the sender before
      if (((m_iRcvLastAck > m_iRcvLastAckAck) && (m_iRcvLastAck - m_iRcvLastAckAck < m_iSeqNoTH)) || (m_iRcvLastAck < m_iRcvLastAckAck - m_iSeqNoTH))
      {
         __int32* data = new __int32 [5];

         m_iAckSeqNo = (m_iAckSeqNo + 1) % m_iMaxAckSeqNo;
         data[0] = m_iRcvLastAck;
         data[1] = m_iRTT;
         data[2] = m_iRTTVar;

         flowControl(m_pRcvTimeWindow->getPktRcvSpeed());
         data[3] = m_iFlowControlWindow;
         if (data[3] > (__int32)(m_pRcvBuffer->getAvailBufSize() / m_iPayloadSize))
            data[3] = (__int32)(m_pRcvBuffer->getAvailBufSize() / m_iPayloadSize);
         if (data[3] < 2)
            data[3] = 2;

         data[4] = m_bRcvSlowStart? 0 : m_pRcvTimeWindow->getBandwidth();

         ctrlpkt.pack(2, &m_iAckSeqNo, data, 5 * sizeof(__int32));
         *m_pChannel << ctrlpkt;

         m_pACKWindow->store(m_iAckSeqNo, m_iRcvLastAck);

         m_pTimer->rdtsc(m_ullLastAckTime);

         #ifdef TRACE
            ++ m_iSentACK;
         #endif

         delete [] data;
      }

      break;
      }

   case 6: //110 - Acknowledgement of Acknowledgement
      ctrlpkt.pack(6, lparam);

      *m_pChannel << ctrlpkt;

      break;

   case 3: //011 - Loss Report
      if (NULL != rparam)
      {
         if (1 == size)
         {
            // only 1 loss packet
            ctrlpkt.pack(3, NULL, (__int32 *)rparam + 1, sizeof(__int32));
         }
         else
         {
            // more than 1 loss packets
            ctrlpkt.pack(3, NULL, rparam, 2 * sizeof(__int32));
         }

         *m_pChannel << ctrlpkt;

         //Slow Start Stopped, if it is not
         m_bRcvSlowStart = false;

         #ifdef TRACE
            ++ m_iSentNAK;
         #endif
      }
      else if (m_pRcvLossList->getLossLength() > 0)
      {
         // this is periodically NAK report

         // read loss list from the local receiver loss list
         __int32* data = new __int32 [m_iPayloadSize];
         __int32 losslen;
         m_pRcvLossList->getLossArray(data, losslen, m_iPayloadSize / sizeof(__int32), m_iRTT + 4 * m_iRTTVar);

         if (0 < losslen)
         {
            ctrlpkt.pack(3, NULL, data, losslen * sizeof(__int32));
            *m_pChannel << ctrlpkt;

            //Slow Start Stopped, if it is not
            m_bRcvSlowStart = false;

            #ifdef TRACE
               ++ m_iSentNAK;
            #endif
         }

         delete [] data;
      }

      break;

   case 4: //100 - Congestion Warning
      ctrlpkt.pack(4);
      *m_pChannel << ctrlpkt;

      //Slow Start Stopped, if it is not
      m_bRcvSlowStart = false;

      m_pTimer->rdtsc(m_ullLastWarningTime);

      break;

   case 1: //001 - Keep-alive
      ctrlpkt.pack(1);
      *m_pChannel << ctrlpkt;
      
      break;

   case 0: //000 - Handshake
      ctrlpkt.pack(0, NULL, rparam, sizeof(CHandShake));
      *m_pChannel << ctrlpkt;

      break;

   case 5: //101 - Shutdown
      ctrlpkt.pack(5);
      *m_pChannel << ctrlpkt;

      break;

   case 7: //111 - Resevered for future use
      break;

   default:
      break;
   }
}

void CUDT::processCtrl(CPacket& ctrlpkt)
{
   switch (ctrlpkt.getType())
   {
   case 2: //010 - Acknowledgement
      {
      __int32 ack;

      // process a lite ACK
      if (ctrlpkt.getLength() == 2 * sizeof(__int32))
      {
         ack = *(__int32 *)ctrlpkt.m_pcData;
         if (((ack > m_iSndLastAck) && (ack - m_iSndLastAck < m_iSeqNoTH)) || (ack < m_iSndLastAck - m_iSeqNoTH))
            m_iSndLastAck = ack;

         #ifdef CUSTOM_CC
            m_pCC->onACK(*(__int32 *)ctrlpkt.m_pcData);
         #endif

         break;
      }

      // read ACK seq. no.
      ack = ctrlpkt.getAckSeqNo();

      // send ACK acknowledgement
      sendCtrl(6, &ack);

      // Got data ACK
      ack = *(__int32 *)ctrlpkt.m_pcData;

      if (((ack > m_iSndLastAck) && (ack - m_iSndLastAck < m_iSeqNoTH)) || (ack < m_iSndLastAck - m_iSeqNoTH))
         m_iSndLastAck = ack;

      // protect packet retransmission
      #ifndef WIN32
         pthread_mutex_lock(&m_AckLock);
      #else
         WaitForSingleObject(m_AckLock, INFINITE);
      #endif

      // acknowledge the sending buffer
      if ((ack > m_iSndLastDataAck) && (ack - m_iSndLastDataAck < m_iSeqNoTH))
         m_pSndBuffer->ackData((ack - m_iSndLastDataAck) * m_iPayloadSize, m_iPayloadSize);
      else if (ack < m_iSndLastDataAck - m_iSeqNoTH)
         m_pSndBuffer->ackData((ack - m_iSndLastDataAck + m_iMaxSeqNo) * m_iPayloadSize, m_iPayloadSize);
      else
      {
         // discard it if it is a repeated ACK
         #ifndef WIN32
            pthread_mutex_unlock(&m_AckLock);
         #else
            ReleaseMutex(m_AckLock);
         #endif

         break;
      }

      #ifdef CUSTOM_CC
         m_pCC->onACK(*(__int32 *)ctrlpkt.m_pcData);
      #endif

      // update sending variables
      m_iSndLastDataAck = ack;
      m_pSndLossList->remove((m_iSndLastDataAck - 1 + m_iMaxSeqNo) % m_iMaxSeqNo);

      #ifndef WIN32
         pthread_mutex_unlock(&m_AckLock);

         pthread_cond_signal(&m_WindowCond);

         pthread_mutex_lock(&m_SendBlockLock);
         if (m_bSynSending)
            pthread_cond_signal(&m_SendBlockCond);
         pthread_mutex_unlock(&m_SendBlockLock);
      #else
         ReleaseMutex(m_AckLock);

         SetEvent(m_WindowCond);

         if (m_bSynSending)
            SetEvent(m_SendBlockCond);
      #endif

      // Update RTT
      m_iRTT = *((__int32 *)ctrlpkt.m_pcData + 1);
      m_iRTTVar = *((__int32 *)ctrlpkt.m_pcData + 2);

      // Update Flow Window Size
      m_iFlowWindowSize = *((__int32 *)ctrlpkt.m_pcData + 3);

      #ifndef CUSTOM_CC
         // quick start
         if ((m_bSndSlowStart) && (*((__int32 *)ctrlpkt.m_pcData + 4) > 0))
         {
            m_bSndSlowStart = false;
            m_ullInterval = m_iFlowWindowSize * m_ullCPUFrequency / (m_iRTT + m_iSYNInterval);
         }
      #endif

      // Update Estimated Bandwidth
      if (*((__int32 *)ctrlpkt.m_pcData + 4) > 0)
         m_iBandwidth = (m_iBandwidth * 7 + *((__int32 *)ctrlpkt.m_pcData + 4)) >> 3;

      #ifndef CUSTOM_CC
         // an ACK may activate rate control
         timeval currtime;
         gettimeofday(&currtime, 0);

         if (((currtime.tv_sec - m_LastSYNTime.tv_sec) * 1000000 + currtime.tv_usec - m_LastSYNTime.tv_usec) >= m_iSYNInterval)
         {
            m_LastSYNTime = currtime;

            rateControl();
         }
      #endif

      // Wake up the waiting sender and correct the sending rate
      m_pTimer->interrupt();

      #ifdef TRACE
         ++ m_iRecvACK;
      #endif

      break;
      }

   case 6: //110 - Acknowledgement of Acknowledgement
      {
      __int32 ack;
      __int32 rtt = -1;
      //timeval currtime;

      // update RTT
      rtt = m_pACKWindow->acknowledge(ctrlpkt.getAckSeqNo(), ack);

      if (rtt <= 0)
         break;

      //
      // Well, I decide to temporaly disable the use of delay.
      // a good idea but the algorithm to detect it is not good enough.
      // I'll come back later...
      //

      //m_pRcvTimeWindow->ack2Arrival(rtt);

      // check packet delay trend
      //m_pTimer->rdtsc(currtime);
      //if (m_pRcvTimeWindow->getDelayTrend() && (currtime - m_ullLastWarningTime > (m_iRTT + 4 * m_iRTTVar) * m_ullCPUFrequency))
      //   sendCtrl(4);

      // RTT EWMA
      m_iRTTVar = (m_iRTTVar * 3 + abs(rtt - m_iRTT)) >> 2;
      m_iRTT = (m_iRTT * 7 + rtt) >> 3;

      // update last ACK that has been received by the sender
      if (((m_iRcvLastAckAck < ack) && (ack - m_iRcvLastAckAck < m_iSeqNoTH)) || (m_iRcvLastAckAck > ack + m_iSeqNoTH))
         m_iRcvLastAckAck = ack;

      break;
      }

   case 3: //011 - Loss Report
      {
      #ifndef CUSTOM_CC
         //Slow Start Stopped, if it is not
         m_bSndSlowStart = false;
      #endif

      __int32* losslist = (__int32 *)(ctrlpkt.m_pcData);

      #ifndef CUSTOM_CC
         // Congestion Control on Loss
         if ((((losslist[0] & 0x7FFFFFFF) > m_iLastDecSeq) && ((losslist[0] & 0x7FFFFFFF) - m_iLastDecSeq < m_iSeqNoTH)) || ((losslist[0] & 0x7FFFFFFF) < m_iLastDecSeq - m_iSeqNoTH))
         {
            m_bFreeze = true;

            #ifndef NOISY_LINK
               m_ullLastDecRate = m_ullInterval;
               m_ullInterval = (unsigned __int64)ceil((signed __int64)m_ullInterval * 1.125);
            #endif

            m_iAvgNAKNum = (__int32)ceil((double)m_iAvgNAKNum * 0.875 + (double)m_iNAKCount * 0.125) + 1;
            m_iNAKCount = 1;

            m_iLastDecSeq = m_iSndCurrSeqNo;

            // remove global synchronization using randomization
            srand(m_iLastDecSeq);
            m_iDecRandom = (__int32)(rand() * double(m_iAvgNAKNum) / (RAND_MAX + 1.0)) + 1;
         }
         else if (0 == (++ m_iNAKCount % m_iDecRandom))
         {
            m_ullInterval = (unsigned __int64)ceil((signed __int64)m_ullInterval * 1.125);

            m_iLastDecSeq = m_iSndCurrSeqNo;
         }
      #else
         m_pCC->onLoss(losslist, ctrlpkt.getLength());
      #endif

      // decode loss list message and insert loss into the sender loss list
      for (__int32 i = 0, n = (__int32)(ctrlpkt.getLength() / sizeof(__int32)); i < n; ++ i)
      {
         if (0 != (losslist[i] & 0x80000000))
         {
            if (((losslist[i] & 0x7FFFFFFF) >= m_iSndLastAck) || ((losslist[i] & 0x7FFFFFFF) < m_iSndLastAck - m_iSeqNoTH))
            {
               #ifdef TRACE
                  m_iTraceSndLoss += m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);
               #else
                  m_pSndLossList->insert(losslist[i] & 0x7FFFFFFF, losslist[i + 1]);
               #endif
               ++ i;
            }
         }
         else if ((losslist[i] >= m_iSndLastAck) || (losslist[i] < m_iSndLastAck - m_iSeqNoTH))
         {
            #ifdef TRACE
               m_iTraceSndLoss += m_pSndLossList->insert(losslist[i], losslist[i]);
            #else
               m_pSndLossList->insert(losslist[i], losslist[i]);
            #endif
         }
      }

      // Wake up the waiting sender (avoiding deadlock on an infinite sleeping)
      m_pSndLossList->insert(const_cast<__int32&>(m_iSndLastAck), const_cast<__int32&>(m_iSndLastAck));
      m_pTimer->interrupt();

      #ifndef WIN32
         pthread_cond_signal(&m_WindowCond);
      #else
         SetEvent(m_WindowCond);
      #endif

      // loss received during this SYN
      m_bLoss = true;

      #ifdef TRACE
         ++ m_iRecvNAK;
      #endif

      break;
      }

   case 4: //100 - Delay Warning
      #ifndef CUSTOM_CC
         //Slow Start Stopped, if it is not
         m_bSndSlowStart = false;

         // One way packet delay is increasing, so decrease the sending rate
         m_ullInterval = (unsigned __int64)ceil((signed __int64)m_ullInterval * 1.125);

         m_iLastDecSeq = m_iSndCurrSeqNo;
      #endif

      break;

   case 1: //001 - Keep-alive
      // The only purpose of keep-alive packet is to tell that the peer is still alive
      // nothing needs to be done.

      break;

   case 0: //000 - Handshake
      if ((m_bInitiator) && (m_iPeerISN - 1 == m_iRcvCurrSeqNo) && (m_iISN == m_iSndLastAck))
      {
         // The peer side has not received the handshake message, so it keeps querying
         // resend the handshake packet

         CHandShake initdata;
         initdata.m_iISN = m_iISN;
         initdata.m_iMSS = m_iMSS;
         initdata.m_iFlightFlagSize = m_iFlightFlagSize;
         sendCtrl(0, NULL, (char *)&initdata, sizeof(CHandShake));
      }

      // I am not an initiator, so both the initiator and I must had received the message before I came here

      break;

   case 5: //101 - Shutdown
      m_bShutdown = true;
      m_bClosing = true;
      m_bBroken = true;

      // Signal the sender and recver if they are waiting for data.
      releaseSynch();

      break;

   case 7: //111 - reserved and user defined messages
      #ifdef CUSTOM_CC
         m_pCC->processCustomMsg(&ctrlpkt);
      #endif

      break;

   default:
      break;
   }
}

void CUDT::rateControl()
{
   // During Slow Start, no rate increase
   if (m_bSndSlowStart)
      return;

   if (m_bLoss)
   {
      m_bLoss = false;
      return;
   }

   __int32 B = (__int32)(m_iBandwidth - 1000000.0 / (signed __int64)(m_ullInterval * m_ullCPUFrequency));
   if ((m_ullInterval > m_ullLastDecRate) && ((m_iBandwidth / 9) < B))
      B = m_iBandwidth / 9;

   double inc;

   if (B <= 0)
      inc = 1.0 / m_iMSS;
   else
   {
      // inc = max(10 ^ ceil(log10( B * MSS * 8 ) * Beta / MSS, 1/MSS)
      // Beta = 1.5 * 10^(-6)

      inc = pow(10.0, ceil(log10(B * m_iMSS * 8.0))) * 0.0000015 / m_iMSS;

      if (inc < 1.0/m_iMSS)
         inc = 1.0/m_iMSS;
   }

   m_ullInterval = (unsigned __int64)(((m_ullInterval * (unsigned __int64)m_iSYNInterval * m_ullCPUFrequency)) / ((m_ullInterval * (unsigned __int64)inc + m_iSYNInterval * m_ullCPUFrequency)));

   // correct the sending interval, which should not be less than the minimum sending interval of the system
   if (m_ullInterval < (unsigned __int64)(m_ullCPUFrequency * (unsigned __int64)(m_pSndTimeWindow->getMinPktSndInt() * 0.9)))
      m_ullInterval = (unsigned __int64)(m_ullCPUFrequency * (unsigned __int64)(m_pSndTimeWindow->getMinPktSndInt() * 0.9));
}

void CUDT::flowControl(const __int32& recvrate)
{
   if (m_bRcvSlowStart)
   {
      m_iFlowControlWindow = (m_iRcvLastAck > m_iPeerISN) ? (m_iRcvLastAck - m_iPeerISN) : (m_iRcvLastAck - m_iPeerISN + m_iMaxSeqNo);

      if ((recvrate > 0) && (m_iFlowControlWindow >= m_iQuickStartPkts))
      {
         // quick start
         m_bRcvSlowStart = false;
         m_iFlowControlWindow = (__int32)((__int64)recvrate * (m_iRTT + m_iSYNInterval) / 1000000) + 16;
      }
   }
   else if (recvrate > 0)
      m_iFlowControlWindow = (__int32)ceil(m_iFlowControlWindow * 0.875 + recvrate / 1000000.0 * (m_iRTT + m_iSYNInterval) * 0.125) + 16;

   if (m_iFlowControlWindow > m_iFlightFlagSize)
   {
      m_iFlowControlWindow = m_iFlightFlagSize;
      m_bRcvSlowStart = false;
   }
}

__int32 CUDT::sendX(char* data, const __int32& len, __int32* overlapped, const UDT_MEM_ROUTINE func)
{
   CGuard sendguard(m_SendLock);

   // throw an exception if not connected
   if (m_bBroken)
      throw CUDTException(2, 1, 0);
   else if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   if (len <= 0)
      return 0;

   // lazy snd thread creation
   #ifndef WIN32
      if (!m_bSndThrStart)
   #else
      if (NULL == m_SndThread)
   #endif
   {
      m_pSndTimeWindow = new CPktTimeWindow();

      #ifndef WIN32
         if (0 != pthread_create(&m_SndThread, NULL, CUDT::sndHandler, this))
            throw CUDTException(7, 0, errno);
         m_bSndThrStart = true;
      #else
         if (NULL == (m_SndThread = CreateThread(NULL, 0, CUDT::sndHandler, this, 0, NULL)))
            throw CUDTException(7, 0, GetLastError());
      #endif
   }

   if (m_pSndBuffer->getCurrBufSize() > m_iSndQueueLimit)
   {
      if (!m_bSynSending)
         throw CUDTException(6, 1, 0);
      else
      {
         // wait here during a blocking sending
         #ifndef WIN32
            pthread_mutex_lock(&m_SendBlockLock);
            while (!m_bBroken && m_bConnected && (m_iSndQueueLimit < m_pSndBuffer->getCurrBufSize()))
               pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
            pthread_mutex_unlock(&m_SendBlockLock);
         #else
            while (!m_bBroken && m_bConnected && (m_iSndQueueLimit < m_pSndBuffer->getCurrBufSize()))
               WaitForSingleObject(m_SendBlockCond, INFINITE);
         #endif

         // check the connection status
         if (m_bBroken)
            throw CUDTException(2, 1, 0);
      }
   }

   char* buf;
   __int32 handle = 0;
   UDT_MEM_ROUTINE r = func;

   if (NULL == overlapped)
   {
      buf = new char[len];
      memcpy(buf, data, len);
      data = buf;
      r = CSndBuffer::releaseBuffer;
   }
   else
   {
      #ifndef WIN32
         pthread_mutex_lock(&m_HandleLock);
      #else
         WaitForSingleObject(m_HandleLock, INFINITE);
      #endif
      if (1 == m_iSndHandle)
         m_iSndHandle = 1 << 30;
      // "send" handle descriptor is POSITIVE and DECREASING
      *overlapped = handle = -- m_iSndHandle;
      #ifndef WIN32
         pthread_mutex_unlock(&m_HandleLock);
      #else
         ReleaseMutex(m_HandleLock);
      #endif
   }

   // insert the user buffer into the sending list
   #ifndef WIN32
      pthread_mutex_lock(&m_SendDataLock);
      m_pSndBuffer->addBuffer(data, len, handle, r);
      pthread_mutex_unlock(&m_SendDataLock);
   #else
      WaitForSingleObject(m_SendDataLock, INFINITE);
      m_pSndBuffer->addBuffer(data, len, handle, r);
      ReleaseMutex(m_SendDataLock);
   #endif

   // signal the sending thread in case that it is waiting
   #ifndef WIN32
      pthread_mutex_lock(&m_SendDataLock);
      pthread_cond_signal(&m_SendDataCond);
      pthread_mutex_unlock(&m_SendDataLock);

      pthread_cond_signal(&m_WindowCond);
   #else
      SetEvent(m_SendDataCond);
      SetEvent(m_WindowCond);
   #endif

   // UDT either sends nothing or sends all 
   return len;
}

__int32 CUDT::recvX(char* data, const __int32& len, __int32* overlapped, UDT_MEM_ROUTINE func)
{
   CGuard recvguard(m_RecvLock);

   // throw an exception if not connected
   if (!m_bConnected)
      throw CUDTException(2, 2, 0);
   else if ((m_bBroken) && (0 == m_pRcvBuffer->getRcvDataSize()))
      throw CUDTException(2, 1, 0);
   else if ((m_bSynRecving || (NULL == overlapped)) && (0 < m_pRcvBuffer->getPendingQueueSize()))
      throw CUDTException(6, 4, 0);

   if (len <= 0)
      return 0;

   if ((NULL == overlapped) && (0 == m_pRcvBuffer->getRcvDataSize()))
   {
      if (!m_bSynRecving)
         throw CUDTException(6, 2, 0);
      else
      {
         #ifndef WIN32
            pthread_mutex_lock(&m_RecvDataLock);
            while (!m_bBroken && (0 == m_pRcvBuffer->getRcvDataSize()))
               pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
            pthread_mutex_unlock(&m_RecvDataLock);
         #else
            WaitForSingleObject(m_RecvDataCond, INFINITE);
         #endif
      }
   }

   if ((NULL == overlapped) || (m_bSynRecving && m_bBroken))
   {
      __int32 avail = m_pRcvBuffer->getRcvDataSize();
      if (len <= avail)
         avail = len;

      m_pRcvBuffer->readBuffer(data, avail);
      return avail;
   }

   // Overlapped IO begins.
   if (!m_bSynRecving && m_bBroken)
      throw CUDTException(2, 1, 0);
   else if (m_iUDTBufSize <= m_pRcvBuffer->getPendingQueueSize())
      throw CUDTException(6, 3, 0);

   #ifndef WIN32
      pthread_mutex_lock(&m_OverlappedRecvLock);
   #else
      WaitForSingleObject(m_OverlappedRecvLock, INFINITE);
   #endif

   if (len <= m_pRcvBuffer->getRcvDataSize())
   {
      m_pRcvBuffer->readBuffer(data, len);

      #ifndef WIN32
         pthread_mutex_unlock(&m_OverlappedRecvLock);
      #else
         ReleaseMutex(m_OverlappedRecvLock);
      #endif

      return len;
   }

   m_pcTempData = data;
   m_iTempLen = len;
   m_iTempRoutine = func;
   m_bReadBuf = true;

   #ifndef WIN32
      pthread_mutex_lock(&m_HandleLock);
   #else
      WaitForSingleObject(m_HandleLock, INFINITE);
   #endif
   if (-1 == m_iRcvHandle)
      m_iRcvHandle = -(1 << 30);
   // "recv" handle descriptor is NEGATIVE and INCREASING
   *overlapped = ++ m_iRcvHandle;
   #ifndef WIN32
      pthread_mutex_unlock(&m_HandleLock);
   #else
      ReleaseMutex(m_HandleLock);
   #endif

   #ifndef WIN32
      pthread_cond_wait(&m_OverlappedRecvCond, &m_OverlappedRecvLock);
      pthread_mutex_unlock(&m_OverlappedRecvLock);
   #else
      ReleaseMutex(m_OverlappedRecvLock);
      WaitForSingleObject(m_OverlappedRecvCond, INFINITE);
   #endif

   if (!m_bSynRecving)
      return 0;

   // check if the receiving is successful or the connection is broken
   if (m_bBroken)
   {
      // remove incompleted overlapped recv buffer
      m_pRcvBuffer->removeUserBuf();

      // connection broken and and no data received, report error
      if (0 == m_pRcvBuffer->getRcvDataSize())
        throw CUDTException(2, 1, 0);

      return (len <= m_pRcvBuffer->getRcvDataSize()) ? len : m_pRcvBuffer->getRcvDataSize();
   }

   return len;
}

bool CUDT::getOverlappedResult(const __int32& handle, __int32& progress, const bool& wait)
{
   // throw an exception if not connected
   if ((m_bBroken) && (0 == m_pRcvBuffer->getRcvDataSize()))
      throw CUDTException(2, 1, 0);
   else if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   // check sending buffer
   if (handle > 0)
   {
      bool res = m_pSndBuffer->getOverlappedResult(handle, progress);
      while (wait && !res && !m_bBroken)
      {
         #ifndef WIN32
            usleep(1);
         #else
            Sleep(1);
         #endif

         res = m_pSndBuffer->getOverlappedResult(handle, progress);
      }
      return res;
   }

   // check receiving buffer
   CGuard recvguard(m_RecvLock);

   bool res = m_pRcvBuffer->getOverlappedResult(handle, progress);
   while (wait && !res && !m_bBroken)
   {
      #ifndef WIN32
         usleep(1);
      #else
         Sleep(1);
      #endif

      res = m_pRcvBuffer->getOverlappedResult(handle, progress);
   }
   return res;
}

__int64 CUDT::sendfile(ifstream& ifs, const __int64& offset, const __int64& size, const __int32& block)
{
   CGuard sendguard(m_SendLock);

   if (m_bBroken)
      throw CUDTException(2, 1, 0);
   else if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   if (size <= 0)
      return 0;

   // lazy snd thread creation
   #ifndef WIN32
      if (!m_bSndThrStart)
   #else
      if (NULL == m_SndThread)
   #endif
   {
      m_pSndTimeWindow = new CPktTimeWindow();

      #ifndef WIN32
         if (0 != pthread_create(&m_SndThread, NULL, CUDT::sndHandler, this))
            throw CUDTException(7, 0, errno);
         m_bSndThrStart = true;
      #else
         if (NULL == (m_SndThread = CreateThread(NULL, 0, CUDT::sndHandler, this, 0, NULL)))
            throw CUDTException(7, 0, GetLastError());
      #endif
   }

   char* tempbuf;
   __int32 unitsize = block;
   __int64 count = 1;

   // positioning...
   try
   {
      ifs.seekg((streamoff)offset);
   }
   catch (...)
   {
      throw CUDTException(4, 1);
   }

   // sending block by block
   while (unitsize * count <= size)
   {
      tempbuf = new char[unitsize];

      try
      {
         ifs.read(tempbuf, unitsize);
      }
      catch (...)
      {
         throw CUDTException(4, 2);
      }

      #ifndef WIN32
         pthread_mutex_lock(&m_SendDataLock);
         while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() >= m_iSndQueueLimit))
            usleep(10);
         m_pSndBuffer->addBuffer(tempbuf, unitsize, -1, CSndBuffer::releaseBuffer);
         pthread_cond_signal(&m_SendDataCond);
         pthread_mutex_unlock(&m_SendDataLock);
      #else
         WaitForSingleObject(m_SendDataLock, INFINITE);
         while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() >= m_iSndQueueLimit))
            Sleep(1);
         m_pSndBuffer->addBuffer(tempbuf, unitsize, -1, CSndBuffer::releaseBuffer);
         SetEvent(m_SendDataCond);
         ReleaseMutex(m_SendDataLock);
      #endif

      if (m_bBroken)
         throw CUDTException(2, 1, 0);

      ++ count;
   }
   if (size - unitsize * (count - 1) > 0)
   {
      tempbuf = new char[__int32(size - unitsize * (count - 1))];

      try
      {
         ifs.read(tempbuf, (streamsize)(size - unitsize * (count - 1)));
      }
      catch (...)
      {
         throw CUDTException(4, 2);
      }

      #ifndef WIN32
         pthread_mutex_lock(&m_SendDataLock);
         while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() >= m_iSndQueueLimit))
            usleep(10);
         m_pSndBuffer->addBuffer(tempbuf, (__int32)(size - unitsize * (count - 1)), -1, CSndBuffer::releaseBuffer);
         pthread_cond_signal(&m_SendDataCond);
         pthread_mutex_unlock(&m_SendDataLock);
      #else
         WaitForSingleObject(m_SendDataLock, INFINITE);
         while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() >= m_iSndQueueLimit))
            Sleep(1);
         m_pSndBuffer->addBuffer(tempbuf, (__int32)(size - unitsize * (count - 1)), -1, CSndBuffer::releaseBuffer);
         SetEvent(m_SendDataCond);
         ReleaseMutex(m_SendDataLock);
      #endif

      if (m_bBroken)
         throw CUDTException(2, 1, 0);
   }

   // Wait until all the data is sent out
   while ((!m_bBroken) && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0))
      #ifndef WIN32
         usleep(10);
      #else
         Sleep(1);
      #endif

   if (m_bBroken && (m_pSndBuffer->getCurrBufSize() > 0))
      throw CUDTException(2, 1, 0);

   return size;
}

__int64 CUDT::recvfile(ofstream& ofs, const __int64& offset, const __int64& size, const __int32& block)
{
   if ((m_bBroken) && (0 == m_pRcvBuffer->getRcvDataSize()))
      throw CUDTException(2, 1, 0);
   else if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   if (size <= 0)
      return 0;

   __int32 unitsize = block;
   __int64 count = 1;
   char* tempbuf = new char[unitsize];
   __int32 recvsize;

   // "recvfile" is always blocking.   
   bool syn = m_bSynRecving;
   m_bSynRecving = true;

   // positioning...
   try
   {
      ofs.seekp((streamoff)offset);
   }
   catch (...)
   {
      throw CUDTException(4, 3);
   }

   __int32 overlapid;

   // receiving...
   while (unitsize * count <= size)
   {
      try
      {
         recvsize = recvX(tempbuf, unitsize, &overlapid);
         ofs.write(tempbuf, recvsize);

         if (recvsize < unitsize)
         {
            m_bSynRecving = syn;
            return unitsize * (count - 1) + recvsize;
         }
      }
      catch (CUDTException e)
      {
         throw e;
      }
      catch (...)
      {
         throw CUDTException(4, 4);
      }

      ++ count;
   }
   if (size - unitsize * (count - 1) > 0)
   {
      try
      {
         recvsize = recvX(tempbuf, (__int32)(size - unitsize * (count - 1)), &overlapid);
         ofs.write(tempbuf, recvsize);

         if (recvsize < (__int32)(size - unitsize * (count - 1)))
         {
            m_bSynRecving = syn;
            return unitsize * (count - 1) + recvsize;
         }
      }
      catch (CUDTException e)
      {
         throw e;
      }
      catch (...)
      {
         throw CUDTException(4, 4);
      }
   }

   // recover the original receiving mode
   m_bSynRecving = syn;

   delete [] tempbuf;

   return size;
}

void CUDT::sample(CPerfMon* perf, bool clear)
{
#ifdef TRACE
   timeval currtime;
   gettimeofday(&currtime, 0);

   perf->msTimeStamp = (currtime.tv_sec - m_StartTime.tv_sec) * 1000 + (currtime.tv_usec - m_StartTime.tv_usec) / 1000;

   m_llSentTotal += m_llTraceSent;
   m_llRecvTotal += m_llTraceRecv;
   m_iSndLossTotal += m_iTraceSndLoss;
   m_iRcvLossTotal += m_iTraceRcvLoss;
   m_iRetransTotal += m_iTraceRetrans;
   m_iSentACKTotal += m_iSentACK;
   m_iRecvACKTotal += m_iRecvACK;
   m_iSentNAKTotal += m_iSentNAK;
   m_iRecvNAKTotal += m_iRecvNAK;

   perf->pktSentTotal = m_llSentTotal;
   perf->pktRecvTotal = m_llRecvTotal;
   perf->pktSndLossTotal = m_iSndLossTotal;
   perf->pktRcvLossTotal = m_iRcvLossTotal;
   perf->pktRetransTotal = m_iRetransTotal;
   perf->pktSentACKTotal = m_iSentACKTotal;
   perf->pktRecvACKTotal = m_iRecvACKTotal;
   perf->pktSentNAKTotal = m_iSentNAKTotal;
   perf->pktRecvNAKTotal = m_iRecvNAKTotal;

   perf->pktSent = m_llTraceSent;
   perf->pktRecv = m_llTraceRecv;
   perf->pktSndLoss = m_iTraceSndLoss;
   perf->pktRcvLoss = m_iTraceRcvLoss;
   perf->pktRetrans = m_iTraceRetrans;
   perf->pktSentACK = m_iSentACK;
   perf->pktRecvACK = m_iRecvACK;
   perf->pktSentNAK = m_iSentNAK;
   perf->pktRecvNAK = m_iRecvNAK;

   double interval = (currtime.tv_sec - m_LastSampleTime.tv_sec) * 1000000.0 + currtime.tv_usec - m_LastSampleTime.tv_usec;

   perf->mbpsSendRate = double(m_llTraceSent) * m_iPayloadSize * 8.0 / interval;
   perf->mbpsRecvRate = double(m_llTraceRecv) * m_iPayloadSize * 8.0 / interval;

   perf->usPktSndPeriod = m_ullInterval / double(m_ullCPUFrequency);
   perf->pktFlowWindow = m_iFlowWindowSize;
   perf->pktCongestionWindow = (__int32)m_dCongestionWindow;
   perf->pktFlightSize = (m_iSndCurrSeqNo - m_iSndLastAck + 1 + m_iMaxSeqNo) % m_iMaxSeqNo;
   perf->msRTT = m_iRTT/1000.0;
   perf->mbpsBandwidth = m_iBandwidth * m_iPayloadSize * 8.0;

   #ifndef WIN32
      if (0 == pthread_mutex_trylock(&m_ConnectionLock))
   #else
      if (WAIT_OBJECT_0 == WaitForSingleObject(m_ConnectionLock, 0))
   #endif
   {
      perf->byteAvailSndBuf = (NULL == m_pSndBuffer) ? 0 : m_iSndQueueLimit - m_pSndBuffer->getCurrBufSize();
      perf->byteAvailRcvBuf = (NULL == m_pRcvBuffer) ? 0 : m_pRcvBuffer->getAvailBufSize();
   }
   else
   {
      perf->byteAvailSndBuf = 0;
      perf->byteAvailRcvBuf = 0;
   }

   if (clear)
   {
      m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceSndLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;
      m_LastSampleTime = currtime;
   }
#endif   
}

void CUDT::initSynch()
{
   #ifndef WIN32
      pthread_mutex_init(&m_SendDataLock, NULL);
      pthread_cond_init(&m_SendDataCond, NULL);
      pthread_mutex_init(&m_SendBlockLock, NULL);
      pthread_cond_init(&m_SendBlockCond, NULL);
      pthread_mutex_init(&m_RecvDataLock, NULL);
      pthread_cond_init(&m_RecvDataCond, NULL);
      pthread_mutex_init(&m_OverlappedRecvLock, NULL);
      pthread_cond_init(&m_OverlappedRecvCond, NULL);
      pthread_mutex_init(&m_SendLock, NULL);
      pthread_mutex_init(&m_RecvLock, NULL);
      pthread_mutex_init(&m_AckLock, NULL);
      pthread_mutex_init(&m_ConnectionLock, NULL);
      pthread_mutex_init(&m_WindowLock, NULL);
      pthread_cond_init(&m_WindowCond, NULL);
      pthread_mutex_init(&m_HandleLock, NULL);
   #else
      m_SendDataLock = CreateMutex(NULL, false, NULL);
      m_SendDataCond = CreateEvent(NULL, false, false, NULL);
      m_SendBlockLock = CreateMutex(NULL, false, NULL);
      m_SendBlockCond = CreateEvent(NULL, false, false, NULL);
      m_RecvDataLock = CreateMutex(NULL, false, NULL);
      m_RecvDataCond = CreateEvent(NULL, false, false, NULL);
      m_OverlappedRecvLock = CreateMutex(NULL, false, NULL);
      m_OverlappedRecvCond = CreateEvent(NULL, false, false, NULL);
      m_SendLock = CreateMutex(NULL, false, NULL);
      m_RecvLock = CreateMutex(NULL, false, NULL);
      m_AckLock = CreateMutex(NULL, false, NULL);
      m_ConnectionLock = CreateMutex(NULL, false, NULL);
      m_WindowLock = CreateMutex(NULL, false, NULL);
      m_WindowCond = CreateEvent(NULL, false, false, NULL);
      m_HandleLock = CreateMutex(NULL, false, NULL);
   #endif
}

void CUDT::destroySynch()
{
   #ifndef WIN32
      pthread_mutex_destroy(&m_SendDataLock);
      pthread_cond_destroy(&m_SendDataCond);
      pthread_mutex_destroy(&m_SendBlockLock);
      pthread_cond_destroy(&m_SendBlockCond);
      pthread_mutex_destroy(&m_RecvDataLock);
      pthread_cond_destroy(&m_RecvDataCond);
      pthread_mutex_destroy(&m_OverlappedRecvLock);
      pthread_cond_destroy(&m_OverlappedRecvCond);
      pthread_mutex_destroy(&m_SendLock);
      pthread_mutex_destroy(&m_RecvLock);
      pthread_mutex_destroy(&m_AckLock);
      pthread_mutex_destroy(&m_ConnectionLock);
      pthread_mutex_destroy(&m_WindowLock);
      pthread_cond_destroy(&m_WindowCond);
      pthread_mutex_destroy(&m_HandleLock);
   #else
      CloseHandle(m_SendDataLock);
      CloseHandle(m_SendDataCond);
      CloseHandle(m_SendBlockLock);
      CloseHandle(m_SendBlockCond);
      CloseHandle(m_RecvDataLock);
      CloseHandle(m_RecvDataCond);
      CloseHandle(m_OverlappedRecvLock);
      CloseHandle(m_OverlappedRecvCond);
      CloseHandle(m_SendLock);
      CloseHandle(m_RecvLock);
      CloseHandle(m_AckLock);
      CloseHandle(m_ConnectionLock);
      CloseHandle(m_WindowLock);
      CloseHandle(m_WindowCond);
      CloseHandle(m_HandleLock);
   #endif
}

void CUDT::releaseSynch()
{
   #ifndef WIN32
      // wake up sending thread
      pthread_cond_signal(&m_WindowCond);

      pthread_mutex_lock(&m_SendDataLock);
      pthread_cond_signal(&m_SendDataCond);
      pthread_mutex_unlock(&m_SendDataLock);

      // wake up user calls
      pthread_mutex_lock(&m_SendBlockLock);
      pthread_cond_signal(&m_SendBlockCond);
      pthread_mutex_unlock(&m_SendBlockLock);

      pthread_mutex_lock(&m_SendLock);
      pthread_mutex_unlock(&m_SendLock);

      pthread_mutex_lock(&m_RecvDataLock);
      pthread_cond_signal(&m_RecvDataCond);
      pthread_mutex_unlock(&m_RecvDataLock);

      pthread_mutex_lock(&m_OverlappedRecvLock);
      pthread_cond_signal(&m_OverlappedRecvCond);
      pthread_mutex_unlock(&m_OverlappedRecvLock);

      pthread_mutex_lock(&m_RecvLock);
      pthread_mutex_unlock(&m_RecvLock);
   #else
      SetEvent(m_WindowCond);
      SetEvent(m_SendDataCond);

      SetEvent(m_SendBlockCond);
      WaitForSingleObject(m_SendLock, INFINITE);
      ReleaseMutex(m_SendLock);
      SetEvent(m_RecvDataCond);
      SetEvent(m_OverlappedRecvCond);
      WaitForSingleObject(m_RecvLock, INFINITE);
      ReleaseMutex(m_RecvLock);
   #endif
}