www.pudn.com > udt.rar > common.cpp
/***************************************************************************** This file contains implementation of UDT common routines of timer, mutex facility, ACK window, packet time window, and exception processing. CTimer is a high precision timing facility, which uses the CPU clock cycle as the minimum time unit. CGuard is mutex facility that can automatically lock a method. CACKWindow is the window management of UDT ACK packet. (reference: UDT header definition: packet.h) CPktTimeWindow is used to record and process packet sending and arrival timing information. CUDTException is used for UDT exception processing, which is the only method to catch and handle UDT errors and exceptions. *****************************************************************************/ #ifndef WIN32 #include#include #include #include #else #include #include #endif #include #include "udt.h" using namespace std; #ifdef WIN32 int gettimeofday(timeval *tv, void*) { LARGE_INTEGER ccf; if (QueryPerformanceFrequency(&ccf)) { LARGE_INTEGER cc; QueryPerformanceCounter(&cc); tv->tv_sec = (long)(cc.QuadPart / ccf.QuadPart); tv->tv_usec = (long)((cc.QuadPart % ccf.QuadPart) / (ccf.QuadPart / 1000000)); } else { unsigned __int64 ft; GetSystemTimeAsFileTime((FILETIME *)&ft); tv->tv_sec = (long)(ft / 10000000); tv->tv_usec = (long)((ft % 10000000) / 10); } return 0; } int readv(SOCKET s, const iovec* vector, int count) { DWORD rsize = 0; DWORD flag = 0; WSARecv(s, (LPWSABUF)vector, count, &rsize, &flag, NULL, NULL); return rsize; } int writev(SOCKET s, const iovec* vector, int count) { DWORD ssize = 0; WSASend(s, (LPWSABUF)vector, count, &ssize, 0, NULL, NULL); return ssize; } #endif unsigned __int64 CTimer::s_ullCPUFrequency = CTimer::readCPUFrequency(); void CTimer::rdtsc(unsigned __int64 &x) { #ifdef WIN32 if (!QueryPerformanceCounter((LARGE_INTEGER *)&x)) { timeval t; gettimeofday(&t, 0); x = t.tv_sec * 1000000 + t.tv_usec; } #elif IA32 // read CPU clock with RDTSC instruction on IA32 acrh __asm__ volatile (".byte 0x0f, 0x31" : "=A" (x)); // on Windows /* unsigned __int32 a, b; __asm { __emit 0x0f __emit 0x31 mov a, eax mov b, ebx } x = b; x = (x << 32) + a; */ #elif IA64 __asm__ volatile ("mov %0=ar.itc" : "=r"(x) :: "memory"); #elif AMD64 unsigned __int32 lval, hval; __asm__ volatile ("rdtsc" : "=a" (lval), "=d" (hval)); x = hval; x = (x << 32) | lval; #else // use system call to read time clock for other archs timeval t; gettimeofday(&t, 0); x = t.tv_sec * 1000000 + t.tv_usec; #endif } unsigned __int64 CTimer::readCPUFrequency() { #ifdef WIN32 __int64 ccf; if (QueryPerformanceFrequency((LARGE_INTEGER *)&ccf)) return ccf / 1000000; else return 1; #elif IA32 || IA64 || AMD64 // alternative: read /proc/cpuinfo unsigned __int64 t1, t2; rdtsc(t1); usleep(100000); rdtsc(t2); // CPU clocks per microsecond return (t2 - t1) / 100000; #else return 1; #endif } unsigned __int64 CTimer::getCPUFrequency() { return s_ullCPUFrequency; } void CTimer::sleep(const unsigned __int64& interval) { unsigned __int64 t; rdtsc(t); // sleep next "interval" time sleepto(t + interval); } void CTimer::sleepto(const unsigned __int64& nexttime) { // Use class member such that the method can be interrupted by others m_ullSchedTime = nexttime; unsigned __int64 t; rdtsc(t); while (t < m_ullSchedTime) { #ifdef IA32 //__asm__ volatile ("nop; nop; nop; nop; nop;"); __asm__ volatile ("pause; rep; nop; nop; nop; nop; nop;"); #elif IA64 __asm__ volatile ("nop 0; nop 0; nop 0; nop 0; nop 0;"); #elif AMD64 __asm__ volatile ("nop; nop; nop; nop; nop;"); #endif // TODO: use high precision timer if it is available rdtsc(t); } } void CTimer::interrupt() { // schedule the sleepto time to the current CCs, so that it will stop rdtsc(m_ullSchedTime); } // // Automatically lock in constructor CGuard::CGuard(pthread_mutex_t& lock): m_Mutex(lock) { #ifndef WIN32 m_iLocked = pthread_mutex_lock(&m_Mutex); #else m_iLocked = WaitForSingleObject(m_Mutex, INFINITE); #endif } // Automatically unlock in destructor CGuard::~CGuard() { #ifndef WIN32 if (0 == m_iLocked) pthread_mutex_unlock(&m_Mutex); #else if (WAIT_FAILED != m_iLocked) ReleaseMutex(m_Mutex); #endif } // CACKWindow::CACKWindow(): m_iSize(1024), m_iHead(0), m_iTail(0) { m_piACKSeqNo = new __int32[m_iSize]; m_piACK = new __int32[m_iSize]; m_pTimeStamp = new timeval[m_iSize]; m_piACKSeqNo[0] = -1; } CACKWindow::CACKWindow(const __int32& size): m_iSize(size), m_iHead(0), m_iTail(0) { m_piACKSeqNo = new __int32[m_iSize]; m_piACK = new __int32[m_iSize]; m_pTimeStamp = new timeval[m_iSize]; m_piACKSeqNo[0] = -1; } CACKWindow::~CACKWindow() { delete [] m_piACKSeqNo; delete [] m_piACK; delete [] m_pTimeStamp; } void CACKWindow::store(const __int32& seq, const __int32& ack) { m_piACKSeqNo[m_iHead] = seq; m_piACK[m_iHead] = ack; gettimeofday(m_pTimeStamp + m_iHead, 0); m_iHead = (m_iHead + 1) % m_iSize; // overwrite the oldest ACK since it is not likely to be acknowledged if (m_iHead == m_iTail) m_iTail = (m_iTail + 1) % m_iSize; } __int32 CACKWindow::acknowledge(const __int32& seq, __int32& ack) { if (m_iHead >= m_iTail) { // Head has not exceeded the physical boundary of the window for (__int32 i = m_iTail, n = m_iHead; i <= n; ++ i) // looking for indentical ACK Seq. No. if (seq == m_piACKSeqNo[i]) { // return the Data ACK it carried ack = m_piACK[i]; // calculate RTT timeval currtime; gettimeofday(&currtime, 0); __int32 rtt = (currtime.tv_sec - m_pTimeStamp[i].tv_sec) * 1000000 + currtime.tv_usec - m_pTimeStamp[i].tv_usec; if (i == m_iHead) { m_iTail = m_iHead = 0; m_piACKSeqNo[0] = -1; } else m_iTail = (i + 1) % m_iSize; return rtt; } // Bad input, the ACK node has been overwritten return -1; } // Head has exceeded the physical window boundary, so it is behind tail for (__int32 i = m_iTail, n = m_iHead + m_iSize; i <= n; ++ i) // looking for indentical ACK seq. no. if (seq == m_piACKSeqNo[i % m_iSize]) { // return Data ACK i %= m_iSize; ack = m_piACK[i]; // calculate RTT timeval currtime; gettimeofday((timeval *)&currtime, 0); __int32 rtt = (currtime.tv_sec - m_pTimeStamp[i].tv_sec) * 1000000 + currtime.tv_usec - m_pTimeStamp[i].tv_usec; if (i == m_iHead) { m_iTail = m_iHead = 0; m_piACKSeqNo[0] = -1; } else m_iTail = (i + 1) % m_iSize; return rtt; } // bad input, the ACK node has been overwritten return -1; } // CPktTimeWindow::CPktTimeWindow(): m_iAWSize(16), m_iRWSize(16), m_iPWSize(16) { m_piPktWindow = new __int32[m_iAWSize]; m_piRTTWindow = new __int32[m_iRWSize]; m_piPCTWindow = new __int32[m_iRWSize]; m_piPDTWindow = new __int32[m_iRWSize]; m_piProbeWindow = new __int32[m_iPWSize]; m_iPktWindowPtr = 0; m_iRTTWindowPtr = 0; m_iProbeWindowPtr = 0; gettimeofday(&m_LastSentTime, 0); gettimeofday(&m_LastArrTime, 0); m_iMinPktSndInt = 1000000; for (__int32 i = 0; i < m_iAWSize; ++ i) m_piPktWindow[i] = 1; for (__int32 j = 0; j < m_iRWSize; ++ j) m_piRTTWindow[j] = m_piPCTWindow[j] = m_piPDTWindow[j] = 0; for (__int32 k = 0; k < m_iPWSize; ++ k) m_piProbeWindow[k] = 1000; } CPktTimeWindow::CPktTimeWindow(const __int32& s1, const __int32& s2, const __int32& s3): m_iAWSize(s1), m_iRWSize(s2), m_iPWSize(s3) { m_piPktWindow = new __int32[m_iAWSize]; m_piRTTWindow = new __int32[m_iRWSize]; m_piPCTWindow = new __int32[m_iRWSize]; m_piPDTWindow = new __int32[m_iRWSize]; m_piProbeWindow = new __int32[m_iPWSize]; m_iPktWindowPtr = 0; m_iRTTWindowPtr = 0; m_iProbeWindowPtr = 0; gettimeofday(&m_LastSentTime, 0); gettimeofday(&m_LastArrTime, 0); m_iMinPktSndInt = 1000000; for (__int32 i = 0; i < m_iAWSize; ++ i) m_piPktWindow[i] = 1; for (__int32 j = 0; j < m_iRWSize; ++ j) m_piRTTWindow[j] = m_piPCTWindow[j] = m_piPDTWindow[j] = 0; for (__int32 k = 0; k < m_iPWSize; ++ k) m_piProbeWindow[k] = 1000; } CPktTimeWindow::~CPktTimeWindow() { delete [] m_piPktWindow; delete [] m_piRTTWindow; delete [] m_piPCTWindow; delete [] m_piPDTWindow; delete [] m_piProbeWindow; } __int32 CPktTimeWindow::getMinPktSndInt() const { return m_iMinPktSndInt; } __int32 CPktTimeWindow::getPktRcvSpeed() const { // sorting __int32 temp; for (__int32 i = 0, n = (m_iAWSize >> 1) + 1; i < n; ++ i) for (__int32 j = i, m = m_iAWSize; j < m; ++ j) if (m_piPktWindow[i] > m_piPktWindow[j]) { temp = m_piPktWindow[i]; m_piPktWindow[i] = m_piPktWindow[j]; m_piPktWindow[j] = temp; } // read the median value __int32 median = (m_piPktWindow[(m_iAWSize >> 1) - 1] + m_piPktWindow[m_iAWSize >> 1]) >> 1; __int32 count = 0; __int32 sum = 0; __int32 upper = median << 3; __int32 lower = median >> 3; // median filtering for (__int32 k = 0, z = m_iAWSize; k < z; ++ k) if ((m_piPktWindow[k] < upper) && (m_piPktWindow[k] > lower)) { ++ count; sum += m_piPktWindow[k]; } // claculate speed, or return 0 if not enough valid value if (count > (m_iAWSize >> 1)) return (__int32)ceil(1000000.0 / (sum / count)); else return 0; } bool CPktTimeWindow::getDelayTrend() const { double pct = 0.0; double pdt = 0.0; for (__int32 i = 0, n = m_iRWSize; i < n; ++ i) if (i != m_iRTTWindowPtr) { pct += m_piPCTWindow[i]; pdt += m_piPDTWindow[i]; } // calculate PCT and PDT value pct /= m_iRWSize - 1; if (0 != pdt) pdt = (m_piRTTWindow[(m_iRTTWindowPtr - 1 + m_iRWSize) % m_iRWSize] - m_piRTTWindow[m_iRTTWindowPtr]) / pdt; // PCT/PDT judgement // reference: M. Jain, C. Dovrolis, Pathload: a measurement tool for end-to-end available bandwidth return ((pct > 0.66) && (pdt > 0.45)) || ((pct > 0.54) && (pdt > 0.55)); } __int32 CPktTimeWindow::getBandwidth() const { // sorting __int32 temp; for (__int32 i = 0, n = (m_iPWSize >> 1) + 1; i < n; ++ i) for (__int32 j = i, m = m_iPWSize; j < m; ++ j) if (m_piProbeWindow[i] > m_piProbeWindow[j]) { temp = m_piProbeWindow[i]; m_piProbeWindow[i] = m_piProbeWindow[j]; m_piProbeWindow[j] = temp; } // read the median value __int32 median = (m_piProbeWindow[(m_iPWSize >> 1) - 1] + m_piProbeWindow[m_iPWSize >> 1]) >> 1; __int32 count = 1; __int32 sum = median; __int32 upper = median << 3; __int32 lower = median >> 3; // median filtering for (__int32 k = 0, z = m_iPWSize; k < z; ++ k) if ((m_piProbeWindow[k] < upper) && (m_piProbeWindow[k] > lower)) { ++ count; sum += m_piProbeWindow[k]; } return (__int32)ceil(1000000.0 / (double(sum) / double(count))); } void CPktTimeWindow::onPktSent(const timeval& currtime) { __int32 interval = (currtime.tv_sec - m_LastSentTime.tv_sec) * 1000000 + currtime.tv_usec - m_LastSentTime.tv_usec; if ((interval < m_iMinPktSndInt) && (interval > 0)) m_iMinPktSndInt = interval; m_LastSentTime = currtime; } void CPktTimeWindow::onPktArrival() { gettimeofday(&m_CurrArrTime, 0); // record the packet interval between the current and the last one m_piPktWindow[m_iPktWindowPtr] = (m_CurrArrTime.tv_sec - m_LastArrTime.tv_sec) * 1000000 + m_CurrArrTime.tv_usec - m_LastArrTime.tv_usec; // the window is logically circular m_iPktWindowPtr = (m_iPktWindowPtr + 1) % m_iAWSize; // remember last packet arrival time m_LastArrTime = m_CurrArrTime; } void CPktTimeWindow::ack2Arrival(const __int32& rtt) { // record RTT, comparison (1 or 0), and absolute difference m_piRTTWindow[m_iRTTWindowPtr] = rtt; m_piPCTWindow[m_iRTTWindowPtr] = (rtt > m_piRTTWindow[(m_iRTTWindowPtr - 1 + m_iRWSize) % m_iRWSize]) ? 1 : 0; m_piPDTWindow[m_iRTTWindowPtr] = abs(rtt - m_piRTTWindow[(m_iRTTWindowPtr - 1 + m_iRWSize) % m_iRWSize]); // the window is logically circular m_iRTTWindowPtr = (m_iRTTWindowPtr + 1) % m_iRWSize; } void CPktTimeWindow::probe1Arrival() { gettimeofday(&m_ProbeTime, 0); } void CPktTimeWindow::probe2Arrival() { gettimeofday(&m_CurrArrTime, 0); // record the probing packets interval m_piProbeWindow[m_iProbeWindowPtr] = (m_CurrArrTime.tv_sec - m_ProbeTime.tv_sec) * 1000000 + m_CurrArrTime.tv_usec - m_ProbeTime.tv_usec; // the window is logically circular m_iProbeWindowPtr = (m_iProbeWindowPtr + 1) % m_iPWSize; } // CCC::CCC(): m_dPktSndPeriod(1.0), m_dCWndSize(16.0), m_iACKPeriod(10), m_iACKInterval(0), m_iRTO(-1) { } void CCC::setACKTimer(const __int32& msINT) { m_iACKPeriod = msINT; } void CCC::setACKInterval(const __int32& pktINT) { m_iACKInterval = pktINT; } void CCC::setRTO(const __int32& usRTO) { m_iRTO = usRTO; } void CCC::sendCustomMsg(CPacket& pkt) const { if (NULL != m_pUDT) *m_pUDT->m_pChannel << pkt; } const CPerfMon* CCC::getPerfInfo() { if (NULL != m_pUDT) m_pUDT->sample(&m_PerfInfo, false); return &m_PerfInfo; } // CUDTException::CUDTException(__int32 major, __int32 minor, __int32 err): m_iMajor(major), m_iMinor(minor) { if (-1 == err) #ifndef WIN32 m_iErrno = errno; #else m_iErrno = GetLastError(); #endif else m_iErrno = err; } CUDTException::CUDTException(const CUDTException& e): m_iMajor(e.m_iMajor), m_iMinor(e.m_iMinor), m_iErrno(e.m_iErrno) { } CUDTException::~CUDTException() { } const char* CUDTException::getErrorMessage() { // translate "Major:Minor" code into text message. switch (m_iMajor) { case 0: strcpy(m_pcMsg, "Success"); break; case 1: strcpy(m_pcMsg, "Couldn't set up network connection"); switch (m_iMinor) { case 1: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "connection time out"); break; case 2: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "connection rejected"); break; case 3: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "unable to create new threads"); break; default: break; } break; case 2: switch (m_iMinor) { case 1: strcpy(m_pcMsg, "Connection broken"); break; case 2: strcpy(m_pcMsg, "Connection does not exist"); break; default: break; } break; case 3: strcpy(m_pcMsg, "Memory exceptions occurs"); break; case 4: strcpy(m_pcMsg, "File exceptions occurs"); switch (m_iMinor) { case 1: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "cannot seek read position"); break; case 2: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "failure in read"); break; case 3: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "cannot seek write position"); break; case 4: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "failure in write"); break; default: break; } break; case 5: strcpy(m_pcMsg, "Operation not supported"); switch (m_iMinor) { case 1: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Cannot do this operation on a BOUND socket"); break; case 2: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Cannot do this operation on a CONNECTED socket"); break; case 3: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Bad parameters"); break; case 4: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Invalid socket ID"); break; case 5: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Cannot do this operation on an UNBOUND socket"); break; case 6: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Socket is not in listening state"); break; default: break; } break; case 6: strcpy(m_pcMsg, "Non-blocking call failed"); switch (m_iMinor) { case 1: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "No buffer available for sending"); break; case 2: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "No data available for reading"); break; case 3: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "No buffer available for overlapped reading"); break; case 4: strcpy(m_pcMsg + strlen(m_pcMsg), ": "); strcpy(m_pcMsg + strlen(m_pcMsg), "Non-blocking overlapped recv is on going"); break; default: break; } break; default: strcpy(m_pcMsg, "Error"); } // Adding "errno" information if (0 < m_iErrno) { strcpy(m_pcMsg + strlen(m_pcMsg), ": "); #ifndef WIN32 strncpy(m_pcMsg + strlen(m_pcMsg), strerror(m_iErrno), 1024 - strlen(m_pcMsg) - 2); #else LPVOID lpMsgBuf; FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, m_iErrno, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), (LPTSTR)&lpMsgBuf, 0, NULL); strncpy(m_pcMsg + strlen(m_pcMsg), (char*)lpMsgBuf, 1024 - strlen(m_pcMsg) - 2); LocalFree(lpMsgBuf); #endif } // period #ifndef WIN32 strcpy(m_pcMsg + strlen(m_pcMsg), "."); #endif return m_pcMsg; } const __int32 CUDTException::getErrorCode() const { return m_iMajor * 1000 + m_iMinor; }