www.pudn.com > UDP-based_Reliable_Data_Transfer_Library.zip > buffer.cpp
/***************************************************************************** Copyright © 2001 - 2006, The Board of Trustees of the University of Illinois. All Rights Reserved. UDP-based Data Transfer Library (UDT) version 3 Laboratory for Advanced Computing (LAC) National Center for Data Mining (NCDM) University of Illinois at Chicago http://www.lac.uic.edu/ This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. *****************************************************************************/ /***************************************************************************** This file contains the implementation of UDT sending and receiving buffer management modules. The sending buffer is a linked list of application data to be sent. The receiving buffer is a logically circular memeory block. *****************************************************************************/ /***************************************************************************** written by Yunhong Gu [gu@lac.uic.edu], last updated 02/14/2006 *****************************************************************************/ #include#include #include "common.h" #include "buffer.h" CSndBuffer::CSndBuffer(const __int32& mss): m_pBlock(NULL), m_pLastBlock(NULL), m_pCurrSendBlk(NULL), m_pCurrAckBlk(NULL), m_iCurrBufSize(0), m_iCurrSendPnt(0), m_iCurrAckPnt(0), m_iNextMsgNo(0), m_iMSS(mss) { #ifndef WIN32 pthread_mutex_init(&m_BufLock, NULL); #else m_BufLock = CreateMutex(NULL, false, NULL); #endif } CSndBuffer::~CSndBuffer() { Block* pb = m_pBlock; // Release allocated data structure if there is any while (NULL != m_pBlock) { pb = pb->m_next; // process user data according with the routine provided by applications if (NULL != m_pBlock->m_pMemRoutine) m_pBlock->m_pMemRoutine(m_pBlock->m_pcData, m_pBlock->m_iLength); delete m_pBlock; m_pBlock = pb; } #ifndef WIN32 pthread_mutex_destroy(&m_BufLock); #else CloseHandle(m_BufLock); #endif } void CSndBuffer::addBuffer(const char* data, const __int32& len, const __int32& handle, const UDT_MEM_ROUTINE func, const __int32& ttl, const __int32& seqno, const bool& order) { CGuard bufferguard(m_BufLock); if (NULL == m_pBlock) { // Insert a block to the empty list m_pBlock = new Block; m_pBlock->m_pcData = const_cast (data); m_pBlock->m_iLength = len; gettimeofday(&m_pBlock->m_OriginTime, 0); m_pBlock->m_iTTL = ttl; m_pBlock->m_iMsgNo = m_iNextMsgNo; m_pBlock->m_iSeqNo = seqno; m_pBlock->m_iInOrder = order; m_pBlock->m_iInOrder <<= 29; m_pBlock->m_iHandle = handle; m_pBlock->m_pMemRoutine = func; m_pBlock->m_next = NULL; m_pLastBlock = m_pBlock; m_pCurrSendBlk = m_pBlock; m_iCurrSendPnt = 0; m_pCurrAckBlk = m_pBlock; m_iCurrAckPnt = 0; } else { // Insert a new block to the tail of the list __int32 lastseq = m_pLastBlock->m_iSeqNo; __int32 offset = m_pLastBlock->m_iLength; m_pLastBlock->m_next = new Block; m_pLastBlock = m_pLastBlock->m_next; m_pLastBlock->m_pcData = const_cast (data); m_pLastBlock->m_iLength = len; gettimeofday(&m_pLastBlock->m_OriginTime, 0); m_pLastBlock->m_iTTL = ttl; m_pLastBlock->m_iMsgNo = m_iNextMsgNo; m_pLastBlock->m_iSeqNo = lastseq + (__int32)ceil(double(offset) / m_iMSS); m_pLastBlock->m_iInOrder = order; m_pLastBlock->m_iInOrder <<= 29; m_pLastBlock->m_iHandle = handle; m_pLastBlock->m_pMemRoutine = func; m_pLastBlock->m_next = NULL; if (NULL == m_pCurrSendBlk) m_pCurrSendBlk = m_pLastBlock; } m_iCurrBufSize += len; m_iNextMsgNo = CMsgNo::incmsg(m_iNextMsgNo); } __int32 CSndBuffer::readData(char** data, const __int32& len, __int32& msgno) { CGuard bufferguard(m_BufLock); // No data to read if (NULL == m_pCurrSendBlk) return 0; // read data in the current sending block if (m_iCurrSendPnt + len < m_pCurrSendBlk->m_iLength) { *data = m_pCurrSendBlk->m_pcData + m_iCurrSendPnt; msgno = m_pCurrSendBlk->m_iMsgNo | m_pCurrSendBlk->m_iInOrder; if (0 == m_iCurrSendPnt) msgno |= 0x80000000; if (m_pCurrSendBlk->m_iLength == m_iCurrSendPnt + len) msgno |= 0x40000000; m_iCurrSendPnt += len; return len; } // Not enough data to read. // Read an irregular packet and move the current sending block pointer to the next block __int32 readlen = m_pCurrSendBlk->m_iLength - m_iCurrSendPnt; *data = m_pCurrSendBlk->m_pcData + m_iCurrSendPnt; if (0 == m_iCurrSendPnt) msgno = m_pCurrSendBlk->m_iMsgNo | 0xC0000000 | m_pCurrSendBlk->m_iInOrder; else msgno = m_pCurrSendBlk->m_iMsgNo | 0x40000000 | m_pCurrSendBlk->m_iInOrder; m_pCurrSendBlk = m_pCurrSendBlk->m_next; m_iCurrSendPnt = 0; return readlen; } __int32 CSndBuffer::readData(char** data, const __int32 offset, const __int32& len, __int32& msgno, __int32& seqno, __int32& msglen) { CGuard bufferguard(m_BufLock); Block* p = m_pCurrAckBlk; // No data to read if (NULL == p) return 0; // Locate to the data position by the offset __int32 loffset = offset + m_iCurrAckPnt; while (p->m_iLength <= loffset) { loffset -= p->m_iLength; loffset -= len - ((0 == p->m_iLength % len) ? len : (p->m_iLength % len)); p = p->m_next; if (NULL == p) return 0; } if (p->m_iTTL >= 0) { timeval currtime; gettimeofday(&currtime, 0); __int32 e = (currtime.tv_sec - p->m_OriginTime.tv_sec) * 1000000 + currtime.tv_usec - p->m_OriginTime.tv_usec; if (e > p->m_iTTL) { msgno = p->m_iMsgNo; seqno = p->m_iSeqNo; msglen = p->m_iLength; return -1; } } // Read a regular data if (loffset + len <= p->m_iLength) { *data = p->m_pcData + loffset; msgno = p->m_iMsgNo | p->m_iInOrder; if (0 == loffset) msgno |= 0x80000000; if (p->m_iLength == loffset + len) msgno |= 0x40000000; return len; } // Read an irrugular data at the end of a block *data = p->m_pcData + loffset; msgno = p->m_iMsgNo | p->m_iInOrder; if (0 == loffset) msgno |= 0xC0000000; else msgno |= 0x40000000; return p->m_iLength - loffset; } void CSndBuffer::ackData(const __int32& len, const __int32& payloadsize) { CGuard bufferguard(m_BufLock); m_iCurrAckPnt += len; // Remove the block if it is acknowledged while (m_iCurrAckPnt >= m_pCurrAckBlk->m_iLength) { m_iCurrAckPnt -= m_pCurrAckBlk->m_iLength; // Update the size error between regular and irregular packets if (0 != m_pCurrAckBlk->m_iLength % payloadsize) m_iCurrAckPnt -= payloadsize - m_pCurrAckBlk->m_iLength % payloadsize; m_iCurrBufSize -= m_pCurrAckBlk->m_iLength; m_pCurrAckBlk = m_pCurrAckBlk->m_next; // process user data according with the routine provided by applications if (NULL != m_pBlock->m_pMemRoutine) m_pBlock->m_pMemRoutine(m_pBlock->m_pcData, m_pBlock->m_iLength); delete m_pBlock; m_pBlock = m_pCurrAckBlk; if (NULL == m_pBlock) break; } } __int32 CSndBuffer::getCurrBufSize() const { return m_iCurrBufSize - m_iCurrAckPnt; } bool CSndBuffer::getOverlappedResult(const __int32& handle, __int32& progress) { CGuard bufferguard(m_BufLock); if (NULL != m_pCurrAckBlk) { if (handle == m_pCurrAckBlk->m_iHandle) { progress = m_iCurrAckPnt; return false; } else { __int32 end = (m_pLastBlock->m_iHandle >= m_pCurrAckBlk->m_iHandle) ? m_pLastBlock->m_iHandle : m_pLastBlock->m_iHandle + (1 << 30); __int32 h = (handle >= m_pCurrAckBlk->m_iHandle) ? handle : handle + (1 << 30); if ((h > m_pCurrAckBlk->m_iHandle) && (h <= end)) { progress = 0; return false; } } } progress = 0; return true; } void CSndBuffer::releaseBuffer(char* buf, int) { delete [] buf; } //////////////////////////////////////////////////////////////////////////////// CRcvBuffer::CRcvBuffer(const __int32& mss): m_pcData(NULL), m_iSize(40960000), m_iStartPos(0), m_iLastAckPos(0), m_iMaxOffset(0), m_pcUserBuf(NULL), m_iUserBufSize(0), m_pPendingBlock(NULL), m_pLastBlock(NULL), m_iPendingSize(0), m_pMessageList(NULL), m_iMSS(mss) { m_pcData = new char [m_iSize]; #ifndef WIN32 pthread_mutex_init(&m_MsgLock, NULL); #else m_MsgLock = CreateMutex(NULL, false, NULL); #endif } CRcvBuffer::CRcvBuffer(const __int32& mss, const __int32& bufsize): m_pcData(NULL), m_iSize(bufsize), m_iStartPos(0), m_iLastAckPos(0), m_iMaxOffset(0), m_pcUserBuf(NULL), m_iUserBufSize(0), m_pPendingBlock(NULL), m_pLastBlock(NULL), m_iPendingSize(0), m_pMessageList(NULL), m_iMSS(mss) { m_pcData = new char [m_iSize]; #ifndef WIN32 pthread_mutex_init(&m_MsgLock, NULL); #else m_MsgLock = CreateMutex(NULL, false, NULL); #endif } CRcvBuffer::~CRcvBuffer() { delete [] m_pcData; Block* p = m_pPendingBlock; while (NULL != p) { m_pPendingBlock = m_pPendingBlock->m_next; delete p; p = m_pPendingBlock; } if (NULL != m_pMessageList) delete [] m_pMessageList; #ifndef WIN32 pthread_mutex_destroy(&m_MsgLock); #else CloseHandle(m_MsgLock); #endif } bool CRcvBuffer::nextDataPos(char** data, __int32 offset, const __int32& len) { // Search the user data block first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len <= m_iUserBufSize) { // find a position in user buffer *data = m_pcUserBuf + m_iUserBufAck + offset; return true; } else if (m_iUserBufAck + offset < m_iUserBufSize) { // Meet the end of the user buffer and there is not enough space for a regular packet return false; } else // offset is larger than user buffer size offset -= m_iUserBufSize - m_iUserBufAck; } // Remember the position of the furthest "dirty" data __int32 origoff = m_iMaxOffset; if (offset + len > m_iMaxOffset) m_iMaxOffset = offset + len; if (m_iLastAckPos >= m_iStartPos) if (m_iLastAckPos + offset + len <= m_iSize) { *data = m_pcData + m_iLastAckPos + offset; return true; } else if ((m_iLastAckPos + offset > m_iSize) && (offset - (m_iSize - m_iLastAckPos) + len <= m_iStartPos)) { *data = m_pcData + offset - (m_iSize - m_iLastAckPos); return true; } if (m_iLastAckPos + offset + len <= m_iStartPos) { *data = m_pcData + m_iLastAckPos + offset; return true; } // recover this pointer if no space is found m_iMaxOffset = origoff; return false; } bool CRcvBuffer::addData(char** data, __int32 offset, __int32 len) { // Check the user buffer first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len <= m_iUserBufSize) { // write data into the user buffer memcpy(m_pcUserBuf + m_iUserBufAck + offset, *data, len); return true; } else if (m_iUserBufAck + offset < m_iUserBufSize) { // write part of the data to the user buffer memcpy(m_pcUserBuf + m_iUserBufAck + offset, *data, m_iUserBufSize - (m_iUserBufAck + offset)); *data += m_iUserBufSize - (m_iUserBufAck + offset); len -= m_iUserBufSize - (m_iUserBufAck + offset); offset = 0; } else // offset is larger than size of user buffer offset -= m_iUserBufSize - m_iUserBufAck; } // Record this value in case that the method is failed __int32 origoff = m_iMaxOffset; if (offset + len > m_iMaxOffset) m_iMaxOffset = offset + len; if (m_iLastAckPos >= m_iStartPos) if (m_iLastAckPos + offset + len <= m_iSize) { memcpy(m_pcData + m_iLastAckPos + offset, *data, len); *data = m_pcData + m_iLastAckPos + offset; return true; } else if ((m_iLastAckPos + offset < m_iSize) && (len - (m_iSize - m_iLastAckPos - offset) <= m_iStartPos)) { memcpy(m_pcData + m_iLastAckPos + offset, *data, m_iSize - m_iLastAckPos - offset); memcpy(m_pcData, *data + m_iSize - m_iLastAckPos - offset, len - (m_iSize - m_iLastAckPos - offset)); *data = m_pcData + m_iLastAckPos + offset; return true; } else if ((m_iLastAckPos + offset >= m_iSize) && (offset - (m_iSize - m_iLastAckPos) + len <= m_iStartPos)) { memcpy(m_pcData + offset - (m_iSize - m_iLastAckPos), *data, len); *data = m_pcData + offset - (m_iSize - m_iLastAckPos); return true; } if (m_iLastAckPos + offset + len <= m_iStartPos) { memcpy(m_pcData + m_iLastAckPos + offset, *data, len); *data = m_pcData + m_iLastAckPos + offset; return true; } // recover the offset pointer since the write is failed m_iMaxOffset = origoff; return false; } void CRcvBuffer::moveData(__int32 offset, const __int32& len) { // check the user buffer first if (NULL != m_pcUserBuf) { if (m_iUserBufAck + offset + len < m_iUserBufSize) { // move data in user buffer memmove(m_pcUserBuf + m_iUserBufAck + offset, m_pcUserBuf + m_iUserBufAck + offset + len, m_iUserBufSize - (m_iUserBufAck + offset + len)); // move data from protocol buffer if (m_iMaxOffset > 0) { __int32 reallen = len; if (m_iMaxOffset < len) reallen = m_iMaxOffset; if (m_iSize < m_iLastAckPos + reallen) { memcpy(m_pcUserBuf + m_iUserBufSize - len, m_pcData + m_iLastAckPos, m_iSize - m_iLastAckPos); memcpy(m_pcUserBuf + m_iUserBufSize - len + m_iSize - m_iLastAckPos, m_pcData, m_iLastAckPos + reallen - m_iSize); } else memcpy(m_pcUserBuf + m_iUserBufSize - len, m_pcData + m_iLastAckPos, reallen); } offset = 0; } else if (m_iUserBufAck + offset < m_iUserBufSize) { if (m_iMaxOffset > m_iUserBufAck + offset + len - m_iUserBufSize) { __int32 reallen = m_iUserBufSize - (m_iUserBufAck + offset); __int32 startpos = m_iLastAckPos + len - reallen; if (m_iMaxOffset < len) reallen -= len - m_iMaxOffset; // Be sure that the m_iSize is at least 1 packet size, whereas len cannot be greater than this value, checked in setOpt(). if (m_iSize < startpos) memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos - m_iSize, reallen); else if (m_iSize < startpos + reallen) { memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos, m_iSize - startpos); memcpy(m_pcUserBuf + m_iUserBufAck + offset + m_iSize - startpos, m_pcData, startpos + reallen - m_iSize); } else memcpy(m_pcUserBuf + m_iUserBufAck + offset, m_pcData + startpos, reallen); } offset = 0; } else // offset is larger than size of user buffer offset -= m_iUserBufSize - m_iUserBufAck; } // No data to move if (m_iMaxOffset - offset < len) { m_iMaxOffset = offset; return; } // Move data in protocol buffer. if (m_iLastAckPos + m_iMaxOffset <= m_iSize) memmove(m_pcData + m_iLastAckPos + offset, m_pcData + m_iLastAckPos + offset + len, m_iMaxOffset - offset - len); else if (m_iLastAckPos + offset > m_iSize) memmove(m_pcData + (m_iLastAckPos + offset) % m_iSize, m_pcData + (m_iLastAckPos + offset + len) % m_iSize, m_iMaxOffset - offset - len); else if (m_iLastAckPos + offset + len <= m_iSize) { memmove(m_pcData + m_iLastAckPos + offset, m_pcData + m_iLastAckPos + offset + len, m_iSize - m_iLastAckPos - offset - len); memmove(m_pcData + m_iSize - len, m_pcData, len); memmove(m_pcData, m_pcData + len, m_iLastAckPos + m_iMaxOffset - m_iSize - len); } else { memmove(m_pcData + m_iLastAckPos + offset, m_pcData + len - (m_iSize - m_iLastAckPos - offset), m_iSize - m_iLastAckPos - offset); memmove(m_pcData, m_pcData + len, m_iLastAckPos + m_iMaxOffset - m_iSize - len); } // Update the offset pointer m_iMaxOffset -= len; } bool CRcvBuffer::readBuffer(char* data, const __int32& len) { if (m_iStartPos + len <= m_iLastAckPos) { // Simplest situation, read "len" data from start position memcpy(data, m_pcData + m_iStartPos, len); m_iStartPos += len; return true; } else if (m_iLastAckPos < m_iStartPos) { if (m_iStartPos + len < m_iSize) { // Data is not cover the ohysical boundary of the buffer memcpy(data, m_pcData + m_iStartPos, len); m_iStartPos += len; return true; } if (len - (m_iSize - m_iStartPos) <= m_iLastAckPos) { // data length exceeds the physical boundary, read twice memcpy(data, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(data + m_iSize - m_iStartPos, m_pcData, len - (m_iSize - m_iStartPos)); m_iStartPos = len - (m_iSize - m_iStartPos); return true; } } // Not enough data to read return false; } __int32 CRcvBuffer::ackData(const __int32& len) { __int32 ret = 0; if (NULL != m_pcUserBuf) if (m_iUserBufAck + len < m_iUserBufSize) { // update user buffer ACK pointer m_iUserBufAck += len; return 0; } else { // user buffer is fulfilled // update protocol ACK pointer m_iLastAckPos += m_iUserBufAck + len - m_iUserBufSize; m_iMaxOffset -= m_iUserBufAck + len - m_iUserBufSize; // the overlapped IO is completed, a pending buffer should be activated m_pcUserBuf = NULL; m_iUserBufSize = 0; if (NULL != m_pPendingBlock) { registerUserBuf(m_pPendingBlock->m_pcData, m_pPendingBlock->m_iLength, m_pPendingBlock->m_iHandle, m_pPendingBlock->m_pMemRoutine); m_iPendingSize -= m_pPendingBlock->m_iLength; m_pPendingBlock = m_pPendingBlock->m_next; if (NULL == m_pPendingBlock) m_pLastBlock = NULL; } // returned value is 1 means user buffer is fulfilled ret = 1; } else { // there is no user buffer m_iLastAckPos += len; m_iMaxOffset -= len; } m_iLastAckPos %= m_iSize; return ret; } __int32 CRcvBuffer::registerUserBuf(char* buf, const __int32& len, const __int32& handle, const UDT_MEM_ROUTINE func) { if (NULL != m_pcUserBuf) { // there is ongoing recv, new buffer is put into pending list. Block *nb = new Block; nb->m_pcData = buf; nb->m_iLength = len; nb->m_iHandle = handle; nb->m_pMemRoutine = func; nb->m_next = NULL; if (NULL == m_pPendingBlock) m_pLastBlock = m_pPendingBlock = nb; else m_pLastBlock->m_next = nb; m_iPendingSize += len; return 0; } m_iUserBufAck = 0; m_iUserBufSize = len; m_pcUserBuf = buf; m_iHandle = handle; // find the furthest "dirty" data that need to be copied __int32 currwritepos = (m_iLastAckPos + m_iMaxOffset) % m_iSize; // copy data from protocol buffer into user buffer if (m_iStartPos <= currwritepos) if (currwritepos - m_iStartPos <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, currwritepos - m_iStartPos); m_iMaxOffset = 0; } else { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, len); m_iMaxOffset -= len; } else if (m_iSize - (m_iStartPos - currwritepos) <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(m_pcUserBuf + m_iSize - m_iStartPos, m_pcData, currwritepos); m_iMaxOffset = 0; } else { if (m_iSize - m_iStartPos <= len) { memcpy(m_pcUserBuf, m_pcData + m_iStartPos, m_iSize - m_iStartPos); memcpy(m_pcUserBuf + m_iSize - m_iStartPos, m_pcData, len - (m_iSize - m_iStartPos)); } else memcpy(m_pcUserBuf, m_pcData + m_iStartPos, len); m_iMaxOffset -= len; } // Update the user buffer pointer if (m_iStartPos <= m_iLastAckPos) m_iUserBufAck += m_iLastAckPos - m_iStartPos; else m_iUserBufAck += m_iSize - m_iStartPos + m_iLastAckPos; // update the protocol buffer pointer m_iStartPos = (m_iStartPos + len) % m_iSize; m_iLastAckPos = m_iStartPos; return m_iUserBufAck; } void CRcvBuffer::removeUserBuf() { m_pcUserBuf = NULL; m_iUserBufAck = 0; } __int32 CRcvBuffer::getAvailBufSize() const { __int32 bs = m_iSize; bs -= m_iLastAckPos - m_iStartPos; if (m_iLastAckPos < m_iStartPos) bs -= m_iSize; if (NULL != m_pcUserBuf) bs += m_iUserBufSize - m_iUserBufAck; return bs; } __int32 CRcvBuffer::getRcvDataSize() const { return (m_iLastAckPos - m_iStartPos + m_iSize) % m_iSize; } bool CRcvBuffer::getOverlappedResult(const __int32& handle, __int32& progress) { if ((NULL != m_pcUserBuf) && (handle == m_iHandle)) { progress = m_iUserBufAck; return false; } progress = 0; if (NULL != m_pPendingBlock) { __int32 end = (m_pLastBlock->m_iHandle >= m_pPendingBlock->m_iHandle) ? m_pLastBlock->m_iHandle : m_pLastBlock->m_iHandle + (1 << 30); __int32 h = (handle >= m_pPendingBlock->m_iHandle) ? handle : handle + (1 << 30); if ((h >= m_pPendingBlock->m_iHandle) && (h <= end)) return false; } return true; } __int32 CRcvBuffer::getPendingQueueSize() const { return m_iPendingSize + m_iUserBufSize; } void CRcvBuffer::initMsgList() { // the message list should contain the most possible number of messages: when each packet is a message m_iMsgInfoSize = m_iSize / m_iMSS + 1; m_pMessageList = new MsgInfo[m_iMsgInfoSize]; m_iPtrFirstMsg = -1; m_iPtrRecentACK = -1; m_iLastMsgNo = 0; m_iValidMsgCount = 0; for (int i = 0; i < m_iMsgInfoSize; ++ i) { m_pMessageList[i].m_pcData = NULL; m_pMessageList[i].m_iMsgNo = -1; m_pMessageList[i].m_iStartSeq = -1; m_pMessageList[i].m_iEndSeq = -1; m_pMessageList[i].m_iSizeDiff = 0; m_pMessageList[i].m_bValid = false; m_pMessageList[i].m_bDropped = false; m_pMessageList[i].m_bInOrder = false; m_pMessageList[i].m_iMsgNo = -1; } } void CRcvBuffer::checkMsg(const __int32& type, const __int32& msgno, const __int32& seqno, const char* ptr, const bool& inorder, const __int32& diff) { CGuard msgguard(m_MsgLock); __int32 pos; if (-1 == m_iPtrFirstMsg) { pos = m_iPtrFirstMsg = 0; m_iPtrRecentACK = -1; } else { pos = m_iPtrFirstMsg + CMsgNo::msgoff(m_pMessageList[m_iPtrFirstMsg].m_iMsgNo, msgno); if (pos >= m_iMsgInfoSize) pos -= m_iMsgInfoSize; else if (pos < 0) { pos += m_iMsgInfoSize; m_iPtrFirstMsg = pos; } } MsgInfo* p = m_pMessageList + pos; p->m_iMsgNo = msgno; switch (type) { case 3: // single packet message p->m_pcData = (char*)ptr; p->m_iStartSeq = p->m_iEndSeq = seqno; p->m_bInOrder = inorder; p->m_iSizeDiff = diff; break; case 2: // first packet of the message p->m_pcData = (char*)ptr; p->m_iStartSeq = seqno; p->m_bInOrder = inorder; break; case 1: // last packet of the message p->m_iEndSeq = seqno; p->m_iSizeDiff = diff; break; } // update the largest msg no so far if (CMsgNo::msgcmp(m_iLastMsgNo, msgno) < 0) m_iLastMsgNo = msgno; } bool CRcvBuffer::ackMsg(const __int32& ack, const CRcvLossList* rll) { CGuard msgguard(m_MsgLock); // no message exist, return if (-1 == m_iPtrFirstMsg) { // also means no message is valid m_iStartPos = m_iLastAckPos; return false; } __int32 ptr; __int32 len; if (-1 == m_iPtrRecentACK) { // all messages are new, check from the start ptr = m_iPtrFirstMsg; len = CMsgNo::msglen(m_pMessageList[ptr].m_iMsgNo, m_iLastMsgNo); } else { // check from the last ACK point ptr = m_iPtrRecentACK + 1; if (ptr == m_iMsgInfoSize) ptr = 0; len = CMsgNo::msglen(m_pMessageList[ptr].m_iMsgNo, m_iLastMsgNo); } for (__int32 i = 0; i < len; ++ i) { if ((m_pMessageList[ptr].m_iStartSeq != -1) && (m_pMessageList[ptr].m_iEndSeq != -1) && (!m_pMessageList[ptr].m_bDropped) && (!(rll->find(m_pMessageList[ptr].m_iStartSeq, m_pMessageList[ptr].m_iEndSeq))) && (!m_pMessageList[ptr].m_bInOrder || CSeqNo::seqcmp(m_pMessageList[ptr].m_iEndSeq, ack) <= 0)) { m_pMessageList[ptr].m_bValid = true; m_pMessageList[ptr].m_iLength = CSeqNo::seqlen(m_pMessageList[ptr].m_iStartSeq, m_pMessageList[ptr].m_iEndSeq) * m_iMSS - m_pMessageList[ptr].m_iSizeDiff; ++ m_iValidMsgCount; } if ((m_pMessageList[ptr].m_iEndSeq != -1) && (CSeqNo::seqcmp(m_pMessageList[ptr].m_iEndSeq, ack) <= 0)) m_iPtrRecentACK = ptr; ++ ptr; if (ptr == m_iMsgInfoSize) ptr = 0; } return (m_iValidMsgCount > 0); } void CRcvBuffer::dropMsg(const __int32& msgno) { CGuard msgguard(m_MsgLock); // no message exist, return if (-1 == m_iPtrFirstMsg) return; __int32 ptr = m_iPtrFirstMsg + CMsgNo::msglen(m_pMessageList[m_iPtrFirstMsg].m_iMsgNo, msgno); if (ptr >= m_iMsgInfoSize) ptr -= m_iMsgInfoSize; m_pMessageList[ptr].m_iMsgNo = msgno; m_pMessageList[ptr].m_bDropped = true; // update the largest msg no so far if (CMsgNo::msgcmp(m_iLastMsgNo, msgno) < 0) m_iLastMsgNo = msgno; } __int32 CRcvBuffer::readMsg(char* data, const __int32& len) { CGuard msgguard(m_MsgLock); // no message exist, return if (-1 == m_iPtrFirstMsg) return 0; __int32 ptr = m_iPtrFirstMsg; // searching first valid message while (m_pMessageList[ptr].m_iMsgNo != m_iLastMsgNo) { if (m_pMessageList[ptr].m_bValid) break; ++ ptr; if (ptr == m_iMsgInfoSize) ptr = 0; } __int32 size = 0; if (m_pMessageList[ptr].m_bValid) { if ((m_pMessageList[ptr].m_bInOrder) || (CSeqNo::seqcmp(m_pMessageList[ptr].m_iEndSeq, m_pMessageList[m_iPtrRecentACK].m_iEndSeq) <= 0)) { m_iStartPos = m_pMessageList[ptr].m_pcData + CSeqNo::seqlen(m_pMessageList[ptr].m_iStartSeq, m_pMessageList[ptr].m_iEndSeq) * m_iMSS - m_pcData; if (m_iStartPos > m_iSize) m_iStartPos -= m_iSize; } else { if (NULL != m_pMessageList[m_iPtrRecentACK].m_pcData) m_iStartPos = m_pMessageList[m_iPtrRecentACK].m_pcData - m_pcData; } size = (len > m_pMessageList[ptr].m_iLength) ? m_pMessageList[ptr].m_iLength : len; if (m_pMessageList[ptr].m_pcData - m_pcData + size < m_iSize) { memcpy(data, m_pMessageList[ptr].m_pcData, size); } else { __int32 partial = m_pMessageList[ptr].m_pcData - m_pcData + size - m_iSize; memcpy(data, m_pMessageList[ptr].m_pcData, size - partial); memcpy(data + size - partial, m_pcData, partial); } m_pMessageList[ptr].m_bValid = false; -- m_iValidMsgCount; } // all messages prior to the first valid message before the recent ACK point are permanently invalid if (CMsgNo::msgcmp(m_pMessageList[ptr].m_iMsgNo, m_pMessageList[m_iPtrRecentACK].m_iMsgNo) <= 0) { while (ptr != m_iPtrRecentACK) { ptr ++; if (ptr == m_iMsgInfoSize) ptr = 0; if (m_pMessageList[ptr].m_bValid) break; } } else ptr = m_iPtrRecentACK; // release the invalid message items while (m_iPtrFirstMsg != ptr) { m_pMessageList[m_iPtrFirstMsg].m_pcData = NULL; m_pMessageList[m_iPtrFirstMsg].m_iMsgNo = -1; m_pMessageList[m_iPtrFirstMsg].m_iStartSeq = -1; m_pMessageList[m_iPtrFirstMsg].m_iEndSeq = -1; m_pMessageList[m_iPtrFirstMsg].m_iLength = -1; m_pMessageList[m_iPtrFirstMsg].m_bValid = false; m_pMessageList[m_iPtrFirstMsg].m_bDropped = false; m_pMessageList[m_iPtrFirstMsg].m_bInOrder = false; ++ m_iPtrFirstMsg; if (m_iPtrFirstMsg == m_iMsgInfoSize) m_iPtrFirstMsg = 0; } // all messages are invalid, re-init the message list if ((m_pMessageList[m_iPtrFirstMsg].m_iMsgNo == m_iLastMsgNo) && !(m_pMessageList[m_iPtrFirstMsg].m_bValid)) { m_pMessageList[m_iPtrFirstMsg].m_pcData = NULL; m_pMessageList[m_iPtrFirstMsg].m_iMsgNo = -1; m_pMessageList[m_iPtrFirstMsg].m_iStartSeq = -1; m_pMessageList[m_iPtrFirstMsg].m_iEndSeq = -1; m_pMessageList[m_iPtrFirstMsg].m_iLength = -1; m_pMessageList[m_iPtrFirstMsg].m_bValid = false; m_pMessageList[m_iPtrFirstMsg].m_bDropped = false; m_pMessageList[m_iPtrFirstMsg].m_bInOrder = false; m_iPtrFirstMsg = -1; m_iPtrRecentACK = -1; m_iStartPos = m_iLastAckPos; } return size; } __int32 CRcvBuffer::getValidMsgCount() { CGuard msgguard(m_MsgLock); return m_iValidMsgCount; }