www.pudn.com > udt.rar > buffer.cpp
/***************************************************************************** This file contains the implementation of UDT sending and receiving buffer management modules. The sending buffer is a linked list of application data to be sent. The receiving buffer is a logically circular memeory block. *****************************************************************************/ #include#include "udt.h" using namespace std; CSndBuffer::CSndBuffer(): m_pBlock(NULL), m_pLastBlock(NULL), m_pCurrSendBlk(NULL), m_pCurrAckBlk(NULL), m_iCurrBufSize(0), m_iCurrSendPnt(0), m_iCurrAckPnt(0) { #ifndef WIN32 pthread_mutex_init(&m_BufLock, NULL); #else m_BufLock = CreateMutex(NULL, false, NULL); #endif } CSndBuffer::~CSndBuffer() { Block* pb = m_pBlock; // Release allocated data structure if there is any while (NULL != m_pBlock) { pb = pb->m_next; // process user data according with the routine provided by applications if (NULL != m_pBlock->m_pMemRoutine) m_pBlock->m_pMemRoutine(m_pBlock->m_pcData, m_pBlock->m_iLength); delete m_pBlock; m_pBlock = pb; } #ifndef WIN32 pthread_mutex_destroy(&m_BufLock); #else CloseHandle(m_BufLock); #endif } void CSndBuffer::addBuffer(const char* data, const __int32& len, const __int32& handle, const UDT_MEM_ROUTINE func) { CGuard bufferguard(m_BufLock); if (NULL == m_pBlock) { // Insert a block to the empty list m_pBlock = new Block; m_pBlock->m_pcData = const_cast (data); m_pBlock->m_iLength = len; m_pBlock->m_iHandle = handle; m_pBlock->m_pMemRoutine = func; m_pBlock->m_next = NULL; m_pLastBlock = m_pBlock; m_pCurrSendBlk = m_pBlock; m_iCurrSendPnt = 0; m_pCurrAckBlk = m_pBlock; m_iCurrAckPnt = 0; } else { // Insert a new block to the tail of the list m_pLastBlock->m_next = new Block; m_pLastBlock = m_pLastBlock->m_next; m_pLastBlock->m_pcData = const_cast (data); m_pLastBlock->m_iLength = len; m_pLastBlock->m_iHandle = handle; m_pLastBlock->m_pMemRoutine = func; m_pLastBlock->m_next = NULL; if (NULL == m_pCurrSendBlk) m_pCurrSendBlk = m_pLastBlock; } m_iCurrBufSize += len; } __int32 CSndBuffer::readData(char** data, const __int32& len) { CGuard bufferguard(m_BufLock); // No data to read if (NULL == m_pCurrSendBlk) return 0; // read data in the current sending block if (m_iCurrSendPnt + len < m_pCurrSendBlk->m_iLength) { *data = m_pCurrSendBlk->m_pcData + m_iCurrSendPnt; m_iCurrSendPnt += len; return len; } // Not enough data to read. // Read an irregular packet and move the current sending block pointer to the next block __int32 readlen = m_pCurrSendBlk->m_iLength - m_iCurrSendPnt; *data = m_pCurrSendBlk->m_pcData + m_iCurrSendPnt; m_pCurrSendBlk = m_pCurrSendBlk->m_next; m_iCurrSendPnt = 0; return readlen; } __int32 CSndBuffer::readData(char** data, const __int32 offset, const __int32& len) { CGuard bufferguard(m_BufLock); Block* p = m_pCurrAckBlk; // No data to read if (NULL == p) return 0; // Locate to the data position by the offset __int32 loffset = offset + m_iCurrAckPnt; while (p->m_iLength <= loffset) { loffset -= p->m_iLength; loffset -= len - ((0 == p->m_iLength % len) ? len : (p->m_iLength % len)); p = p->m_next; if (NULL == p) return 0; } // Read a regular data if (loffset + len <= p->m_iLength) { *data = p->m_pcData + loffset; return len; } // Read an irrugular data at the end of a block *data = p->m_pcData + loffset; return p->m_iLength - loffset; } void CSndBuffer::ackData(const __int32& len, const __int32& payloadsize) { CGuard bufferguard(m_BufLock); m_iCurrAckPnt += len; // Remove the block if it is acknowledged while (m_iCurrAckPnt >= m_pCurrAckBlk->m_iLength) { m_iCurrAckPnt -= m_pCurrAckBlk->m_iLength; // Update the size error between regular and irregular packets if (0 != m_pCurrAckBlk->m_iLength % payloadsize) m_iCurrAckPnt -= payloadsize - m_pCurrAckBlk->m_iLength % payloadsize; m_iCurrBufSize -= m_pCurrAckBlk->m_iLength; m_pCurrAckBlk = m_pCurrAckBlk->m_next; // process user data according with the routine provided by applications if (NULL != m_pBlock->m_pMemRoutine) m_pBlock->m_pMemRoutine(m_pBlock->m_pcData, m_pBlock->m_iLength); delete m_pBlock; m_pBlock = m_pCurrAckBlk; if (NULL == m_pBlock) break; } } __int32 CSndBuffer::getCurrBufSize() const { return m_iCurrBufSize - m_iCurrAckPnt; } bool CSndBuffer::getOverlappedResult(const __int32& handle, __int32& progress) { CGuard bufferguard(m_BufLock); if (NULL != m_pCurrAckBlk) { if (handle == m_pCurrAckBlk->m_iHandle) { progress = m_iCurrAckPnt; return false; } else { __int32 end = (m_pLastBlock->m_iHandle >= m_pCurrAckBlk->m_iHandle) ? m_pLastBlock->m_iHandle : m_pLastBlock->m_iHandle + (1 << 30); __int32 h = (handle >= m_pCurrAckBlk->m_iHandle) ? handle : handle + (1 << 30); if ((h > m_pCurrAckBlk->m_iHandle) && (h <= end)) { progress = 0; return false; } } } progress = 0; return true; } void CSndBuffer::releaseBuffer(char* buf, int) { delete [] buf; } // CRcvBuffer::CRcvBuffer(): m_iSize(40960000), m_iStartPos(0), m_iLastAckPos(0), m_iMaxOffset(0), m_pcUserBuf(NULL), m_iUserBufSize(0), m_pPendingBlock(NULL), m_pLastBlock(NULL), m_iPendingSize(0) { m_pcData = new char [m_iSize]; } CRcvBuffer::CRcvBuffer(const __int32& bufsize): m_iSize(bufsize), m_iStartPos(0), m_iLastAckPos(0), m_iMaxOffset(0), m_pcUserBuf(NULL), m_iUserBufSize(0), m_pPendingBlock(NULL), m_pLastBlock(NULL), m_iPendingSize(0) { m_pcData = new char [m_iSize]; } CRcvBuffer::~CRcvBuffer() { delete [] m_pcData; Block* p = m_pPendingBlock; while (NULL != p) { m_pPendingBlock = m_pPendingBlock->m_next; delete p; p = m_pPendingBlock; } } bool CRcvBuffer::nextDataPos(char** data, __int32 offset, const __int32& len) { // Search the user data block first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len <= m_iUserBufSize) { // find a position in user buffer *data = m_pcUserBuf + m_iUserBufAck + offset; return true; } else if (m_iUserBufAck + offset < m_iUserBufSize) { // Meet the end of the user buffer and there is not enough space for a regular packet return false; } else // offset is larger than user buffer size offset -= m_iUserBufSize - m_iUserBufAck; } // Remember the position of the furthest "dirty" data __int32 origoff = m_iMaxOffset; if (offset + len > m_iMaxOffset) m_iMaxOffset = offset + len; if (m_iLastAckPos >= m_iStartPos) if (m_iLastAckPos + offset + len <= m_iSize) { *data = m_pcData + m_iLastAckPos + offset; return true; } else if ((m_iLastAckPos + offset > m_iSize) && (offset - (m_iSize - m_iLastAckPos) + len <= m_iStartPos)) { *data = m_pcData + offset - (m_iSize - m_iLastAckPos); return true; } if (m_iLastAckPos + offset + len <= m_iStartPos) { *data = m_pcData + m_iLastAckPos + offset; return true; } // recover this pointer if no space is found m_iMaxOffset = origoff; return false; } bool CRcvBuffer::addData(char* data, __int32 offset, __int32 len) { // Check the user buffer first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len <= m_iUserBufSize) { // write data into the user buffer memcpy(m_pcUserBuf + m_iUserBufAck + offset, data, len); return true; } else if (m_iUserBufAck + offset < m_iUserBufSize) { // write part of the data to the user buffer memcpy(m_pcUserBuf + m_iUserBufAck + offset, data, m_iUserBufSize - (m_iUserBufAck + offset)); data += m_iUserBufSize - (m_iUserBufAck + offset); len -= m_iUserBufSize - (m_iUserBufAck + offset); offset = 0; } else // offset is larger than size of user buffer offset -= m_iUserBufSize - m_iUserBufAck; } // Record this value in case that the method is failed __int32 origoff = m_iMaxOffset; if (offset + len > m_iMaxOffset) m_iMaxOffset = offset + len; if (m_iLastAckPos >= m_iStartPos) if (m_iLastAckPos + offset + len <= m_iSize) { memcpy(m_pcData + m_iLastAckPos + offset, data, len); return true; } else if ((m_iLastAckPos + offset < m_iSize) && (len - (m_iSize - m_iLastAckPos - offset) <= m_iStartPos)) { memcpy(m_pcData + m_iLastAckPos + offset, data, m_iSize - m_iLastAckPos - offset); memcpy(m_pcData, data + m_iSize - m_iLastAckPos - offset, len - (m_iSize - m_iLastAckPos - offset)); return true; } else if ((m_iLastAckPos + offset >= m_iSize) && (offset - (m_iSize - m_iLastAckPos) + len <= m_iStartPos)) { memcpy(m_pcData + offset - (m_iSize - m_iLastAckPos), data, len); return true; } if (m_iLastAckPos + offset + len <= m_iStartPos) { memcpy(m_pcData + m_iLastAckPos + offset, data, len); return true; } // recover the offset pointer since the write is failed m_iMaxOffset = origoff; return false; } void CRcvBuffer::moveData(__int32 offset, const __int32& len) { // check the user buffer first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len < m_iUserBufSize) { // move data in user buffer memmove(m_pcUserBuf + m_iUserBufAck + offset, m_pcUserBuf + m_iUserBufAck + offset + len, m_iUserBufSize - (m_iUserBufAck + offset + len)); // move data from protocol buffer if (m_iMaxOffset > 0) { __int32 reallen = len; if (m_iMaxOffset < len) reallen = m_iMaxOffset; if (m_iSize < m_iLastAckPos + reallen) { memcpy(m_pcUserBuf + m_iUserBufSize - len, m_pcData + m_iLastAckPos, m_iSize - m_iLastAckPos); memcpy(m_pcUserBuf + m_iUserBufSize - len + m_iSize - m_iLastAckPos, m_pcData, m_iLastAckPos + reallen - m_iSize); } else memcpy(m_pcUserBuf + m_iUserBufSize - len, m_pcData + m_iLastAckPos, reallen); } offset = 0; } else if (m_iUserBufAck + offset < m_iUserBufSize) { if (m_iMaxOffset > m_iUserBufAck + offset + len - m_iUserBufSize) { __int32 reallen = m_iUserBufSize - (m_iUserBufAck + offset); __int32 startpos = m_iLastAckPos + len - reallen; if (m_iMaxOffset < len) reallen -= len - m_iMaxOffset; // Be sure that the m_iSize is at least 1 packet size, whereas len cannot be greater than this value, checked in setOpt(). if (m_iSize < startpos) memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos - m_iSize, reallen); else if (m_iSize < startpos + reallen) { memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos, m_iSize - startpos); memcpy(m_pcUserBuf + m_iUserBufAck + offset + m_iSize - startpos, m_pcData, startpos + reallen - m_iSize); } else memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos, reallen); } offset = 0; } else // offset is larger than size of user buffer offset -= m_iUserBufSize - m_iUserBufAck; } // No data to move if (m_iMaxOffset - offset < len) { m_iMaxOffset = offset; return; } // Move data in protocol buffer. if (m_iLastAckPos + m_iMaxOffset <= m_iSize) memmove(m_pcData + m_iLastAckPos + offset, m_pcData + m_iLastAckPos + offset + len, m_iMaxOffset - offset - len); else if (m_iLastAckPos + offset > m_iSize) memmove(m_pcData + (m_iLastAckPos + offset) % m_iSize, m_pcData + (m_iLastAckPos + offset + len) % m_iSize, m_iMaxOffset - offset - len); else if (m_iLastAckPos + offset + len <= m_iSize) { memmove(m_pcData + m_iLastAckPos + offset, m_pcData + m_iLastAckPos + offset + len, m_iSize - m_iLastAckPos - offset - len); memmove(m_pcData + m_iSize - len, m_pcData, len); memmove(m_pcData, m_pcData + len, m_iLastAckPos + m_iMaxOffset - m_iSize - len); } else { memmove(m_pcData + m_iLastAckPos + offset, m_pcData + len - (m_iSize - m_iLastAckPos - offset), m_iSize - m_iLastAckPos - offset); memmove(m_pcData, m_pcData + len, m_iLastAckPos + m_iMaxOffset - m_iSize - len); } // Update the offset pointer m_iMaxOffset -= len; } bool CRcvBuffer::readBuffer(char* data, const __int32& len) { if (m_iStartPos + len <= m_iLastAckPos) { // Simplest situation, read "len" data from start position memcpy(data, m_pcData + m_iStartPos, len); m_iStartPos += len; return true; } else if (m_iLastAckPos < m_iStartPos) { if (m_iStartPos + len < m_iSize) { // Data is not cover the ohysical boundary of the buffer memcpy(data, m_pcData + m_iStartPos, len); m_iStartPos += len; return true; } if (len - (m_iSize - m_iStartPos) <= m_iLastAckPos) { // data length exceeds the physical boundary, read twice memcpy(data, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(data + m_iSize - m_iStartPos, m_pcData, len - (m_iSize - m_iStartPos)); m_iStartPos = len - (m_iSize - m_iStartPos); return true; } } // Not enough data to read return false; } __int32 CRcvBuffer::ackData(const __int32& len) { __int32 ret = 0; if (NULL != m_pcUserBuf) if (m_iUserBufAck + len < m_iUserBufSize) { // update user buffer ACK pointer m_iUserBufAck += len; return 0; } else { // user buffer is fulfilled // update protocol ACK pointer m_iLastAckPos += m_iUserBufAck + len - m_iUserBufSize; m_iMaxOffset -= m_iUserBufAck + len - m_iUserBufSize; // the overlapped IO is completed, a pending buffer should be activated m_pcUserBuf = NULL; m_iUserBufSize = 0; if (NULL != m_pPendingBlock) { registerUserBuf(m_pPendingBlock->m_pcData, m_pPendingBlock->m_iLength, m_pPendingBlock->m_iHandle, m_pPendingBlock->m_pMemRoutine); m_iPendingSize -= m_pPendingBlock->m_iLength; m_pPendingBlock = m_pPendingBlock->m_next; if (NULL == m_pPendingBlock) m_pLastBlock = NULL; } // returned value is 1 means user buffer is fulfilled ret = 1; } else { // there is no user buffer m_iLastAckPos += len; m_iMaxOffset -= len; } m_iLastAckPos %= m_iSize; return ret; } __int32 CRcvBuffer::registerUserBuf(char* buf, const __int32& len, const __int32& handle, const UDT_MEM_ROUTINE func) { if (NULL != m_pcUserBuf) { // there is ongoing recv, new buffer is put into pending list. Block *nb = new Block; nb->m_pcData = buf; nb->m_iLength = len; nb->m_iHandle = handle; nb->m_pMemRoutine = func; nb->m_next = NULL; if (NULL == m_pPendingBlock) m_pLastBlock = m_pPendingBlock = nb; else m_pLastBlock->m_next = nb; m_iPendingSize += len; return 0; } m_iUserBufAck = 0; m_iUserBufSize = len; m_pcUserBuf = buf; m_iHandle = handle; // find the furthest "dirty" data that need to be copied __int32 currwritepos = (m_iLastAckPos + m_iMaxOffset) % m_iSize; // copy data from protocol buffer into user buffer if (m_iStartPos <= currwritepos) if (currwritepos - m_iStartPos <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, currwritepos - m_iStartPos); m_iMaxOffset = 0; } else { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, len); m_iMaxOffset -= len; } else if (m_iSize - (m_iStartPos - currwritepos) <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(m_pcUserBuf + m_iSize - m_iStartPos, m_pcData, currwritepos); m_iMaxOffset = 0; } else { if (m_iSize - m_iStartPos <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(m_pcUserBuf + m_iSize - m_iStartPos, m_pcData, len - (m_iSize - m_iStartPos)); } else memcpy(m_pcUserBuf, m_pcData + m_iStartPos, len); m_iMaxOffset -= len; } // Update the user buffer pointer if (m_iStartPos <= m_iLastAckPos) m_iUserBufAck += m_iLastAckPos - m_iStartPos; else m_iUserBufAck += m_iSize - m_iStartPos + m_iLastAckPos; // update the protocol buffer pointer m_iStartPos = (m_iStartPos + len) % m_iSize; m_iLastAckPos = m_iStartPos; return m_iUserBufAck; } void CRcvBuffer::removeUserBuf() { m_pcUserBuf = NULL; m_iUserBufAck = 0; } __int32 CRcvBuffer::getAvailBufSize() const { __int32 bs = m_iSize; bs -= m_iLastAckPos - m_iStartPos; if (m_iLastAckPos < m_iStartPos) bs -= m_iSize; if (NULL != m_pcUserBuf) bs += m_iUserBufSize - m_iUserBufAck; return bs; } __int32 CRcvBuffer::getRcvDataSize() const { return (m_iLastAckPos - m_iStartPos + m_iSize) % m_iSize; } bool CRcvBuffer::getOverlappedResult(const __int32& handle, __int32& progress) { if ((NULL != m_pcUserBuf) && (handle == m_iHandle)) { progress = m_iUserBufAck; return false; } progress = 0; if (NULL != m_pPendingBlock) { __int32 end = (m_pLastBlock->m_iHandle >= m_pPendingBlock->m_iHandle) ? m_pLastBlock->m_iHandle : m_pLastBlock->m_iHandle + (1 << 30); __int32 h = (handle >= m_pPendingBlock->m_iHandle) ? handle : handle + (1 << 30); if ((h >= m_pPendingBlock->m_iHandle) && (h <= end)) return false; } return true; } __int32 CRcvBuffer::getPendingQueueSize() const { return m_iPendingSize + m_iUserBufSize; }