www.pudn.com > GGBT.rar > Connector.cpp


// Connector.cpp: implementation of the CConnector class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#include "stdafx.h" 
#include "testbt.h" 
#include "Connector.h" 
 
 
#include "Connection.h" 
#include "Encrypter.h" 
#include "Upload.h" 
#include "Downloader.h" 
#include "Choker.h" 
 
#ifdef _DEBUG 
#undef THIS_FILE 
static char THIS_FILE[]=__FILE__; 
#define new DEBUG_NEW 
#endif 
 
long toint(char* pBuf) 
{ 
	unsigned char* p = (unsigned char*) pBuf; 
	return p[0]*0x1000000 + p[1]*0x10000 + p[2]*0x100 + p[3]; 
} 
 
char* tobinary(long lnum, char* pBuf, long length=4) 
{ 
	char* p = (char*)&lnum; 
	pBuf[0] = p[3]; 
	pBuf[1] = p[2]; 
	pBuf[2] = p[1]; 
	pBuf[3] = p[0]; 
	return pBuf; 
} 
 
////////////////////////////////////////////////////////////////////// 
// Construction/Destruction 
////////////////////////////////////////////////////////////////////// 
 
CConnector::CConnector(long lMaxUploadRate) 
{ 
	m_lMaxUploadRate = lMaxUploadRate; 
} 
 
 
CConnector::~CConnector() 
{ 
	for (int i=0;  im_pDownload; 
		delete m_connections[i]->m_pUpload; 
		delete m_connections[i]; 
	} 
} 
 
 
void CConnector::connection_made(CEncryptedConnection *pEConnection) 
{ 
	CConnection* pnew = new CConnection(pEConnection, this); 
	m_connections.push_back(pnew); 
	pnew->m_pUpload = m_pUploader->make_upload(pnew); 
	pnew->m_pDownload = m_pDownloader->make_download(pnew); 
	m_pChoker->connection_made(pnew); 
 
} 
 
void CConnector::Create(CDownloader* pDownloader, CUploader* pUploader, CChoker* pchoker,  
		 long lNumPieces, CStorageWrapper* pStorage,  CMeasure* pUpTotalMeasure) 
{ 
	m_pDownloader = pDownloader; 
	m_pUploader = pUploader; 
	m_lNumPieces = lNumPieces; 
	m_pStorage = pStorage; 
	m_pUpTotalMeasure = pUpTotalMeasure; 
	m_pChoker = pchoker; 
	m_bEndgame = false; 
	m_bRateCapped = false; 
	m_tcap = 0; 
	m_tTimeUntilUncap = 0;	 
 
	check_endgame(); 
} 
 
 
void CConnector::connection_lost(CEncryptedConnection* pEConnection) 
{ 
	bool bfind = false; 
	for (int i=0;  im_pEConnection == pEConnection) 
		{ 
			m_pChoker->connection_lost(m_connections[i]); 
			m_connections[i]->m_pDownload->disconnected(); 
			delete m_connections[i]->m_pDownload; 
			delete m_connections[i]->m_pUpload; 
			delete m_connections[i]; 
			m_connections.erase(m_connections.begin() + i); 
			bfind = true; 
			break; 
		} 
	} 
	assert(bfind); 
} 
 
void CConnector::connection_flushed(CEncryptedConnection *pEConnection) 
{	 
	bool bfind = false; 
	for (int i=0;  im_pEConnection == pEConnection) 
		{ 
			m_connections[i]->m_pUpload->flushed(); 
			bfind = true; 
			break; 
		} 
	} 
	assert(bfind); 
} 
 
long CConnector::how_many_connections() 
{ 
	return m_connections.size(); 
} 
const CDownloader* const CConnector::GetDownloader() const 
{ 
	return m_pDownloader; 
}; 
 
const CMeasure* const CConnector::GetUpTotalMeasure() const 
{ 
	return m_pUpTotalMeasure; 
} 
 
void CConnector::got_message(CEncryptedConnection* pEConnection, memstream& memMessage) 
{ 
	CConnection* pCon = 0; 
	for (int i=0;  im_pEConnection == pEConnection) 
		{ 
			pCon = m_connections[i]; 
			break; 
		} 
	} 
	assert(pCon); 
 
 
	char* pBuf = memMessage; 
	long length = memMessage.size(); 
 
	char t = (*pBuf); 
 
	if (t == BITFIELD && pCon->m_bGotAnything) 
	{ 
		pEConnection->Close(); 
		return; 
	} 
	pCon->m_bGotAnything = true; 
 
	if (t == CHOKE || t == UNCHOKE || t == INTERESTED || t == NOT_INTERESTED) 
	{ 
		if (length != 1) 
		{ 
			pEConnection->Close(); 
			return; 
		} 
	} 
 
	switch (t) 
	{ 
	case CHOKE: 
		{ 
			pCon->m_pDownload->got_choke(); 
		} 
		break; 
		 
	case UNCHOKE: 
		{ 
			pCon->m_pDownload->got_unchoke(); 
			check_endgame(); 
		} 
		break; 
		 
	case INTERESTED: 
		{ 
			pCon->m_pUpload->got_interested(); 
		} 
		break; 
		 
	case NOT_INTERESTED: 
		{ 
			pCon->m_pUpload->got_not_interested(); 
		} 
		break; 
		 
	case HAVE: 
		{ 
			if (length != 5) 
			{ 
				pCon->Close(); 
				return; 
			} 
			long index = toint(pBuf + 1); 
			if (index >= m_lNumPieces || index < 0) 
			{ 
				pCon->Close(); 
				return; 
			} 
			pCon->m_pDownload->got_have(index); 
			check_endgame(); 
		} 
		break; 
		 
	case BITFIELD: 
		{ 
			memstream memBitfield; 
			memBitfield.write(pBuf+1, length - 1); 
			vector vHave; 
 
			if (!bitfield_to_booleans(memBitfield, m_lNumPieces, vHave)) 
			{ 
				pCon->Close(); 
				return; 
			} 
			pCon->m_pDownload->got_have_bitfield(vHave); 
			check_endgame(); 
		} 
		break; 
		 
	case REQUEST: 
		{ 
			if (length != 13) 
			{ 
				pCon->Close(); 
				return; 
			} 
 
			long index = toint(pBuf + 1); 
			if (index >= m_lNumPieces) 
			{				 
				pCon->Close(); 
				return; 
			} 
			pCon->m_pUpload->got_request(index, toint(pBuf + 5), toint(pBuf+9)); 
		} 
		break; 
 
	case CANCEL: 
		{ 
			if (length != 13) 
			{ 
				pCon->Close(); 
				return; 
			} 
 
			long index = toint(pBuf + 1); 
			if (index >= m_lNumPieces) 
			{				 
				pCon->Close(); 
				return; 
			} 
			pCon->m_pUpload->got_cancel(index, toint(pBuf + 5), toint(pBuf+9)); 
 
		} 
		break; 
		 
	case PIECE: 
		{ 
			if (length <= 9) 
			{ 
				pCon->Close(); 
				return; 
			}			 
			long index = toint(pBuf + 1); 
			long begin = toint(pBuf+5); 
			if (index >= m_lNumPieces) 
			{ 
				pCon->Close(); 
				return; 
			} 
 
			memMessage.TrimLeft(9); 
			// memstream memPiece; 
			// memPiece.write(pBuf+9, length - 9); 
			// if (pCon->m_pDownload->got_piece(index, toint(pBuf+5), memPiece)) 
			if (pCon->m_pDownload->got_piece(index, begin, memMessage)) 
			{ 
				for (int i=0;  isend_have(index); 
				} 
			} 
			check_endgame(); 
		} 
		break; 
	default: 
		{ 
			pCon->Close(); 
			return; 
		} 
		break; 
	}	 
 
} 
 
void CConnector::check_endgame() 
{ 
	if (!m_pDownloader->is_endgame() && m_pStorage->is_everything_pending()) 
	{ 
		m_bEndgame = true; 
		m_pDownloader->make_endgame(); 
	} 
} 
 
void CConnector::_update_upload_rate(long lAmount) 
{ 
	m_pUpTotalMeasure->update_rate(lAmount); 
	if (m_lMaxUploadRate> 0 && m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate) 
	{ 
		m_bRateCapped = true; 
		time(&m_tcap); 
		m_tTimeUntilUncap = m_pUpTotalMeasure->time_until_rate(m_lMaxUploadRate); 
 
		TRACE("rate capeped() wait time :(%d)\r\n", m_tTimeUntilUncap); 
	} 
} 
 
void CConnector::uncap() 
{ 
	if (!m_bRateCapped) 
		return; 
	 
	// check the time to excute? 
	time_t t; 
	time(&t); 
	time_t tspan = t - m_tcap; 
	assert(tspan >= 0); 
	if (tspan < 0) 
		m_tcap = t; 
	if (tspan < m_tTimeUntilUncap) 
		return ; 
 
 
	TRACE("Rate uncap()\r\n"); 
 
	// flush the smaller rate connection to up the rate, until rate are recapped or all data flushed. 
	m_bRateCapped = false; 
	while (!m_bRateCapped) 
	{ 
		CUpload * pUp = 0; 
		long lminrate = -1; 
 
		for (int i=0; im_pUpload->is_choked() &&  
				m_connections[i]->m_pUpload->has_queries() && 
				m_connections[i]->IsFlush()) 
			{ 
				long lrate = m_connections[i]->m_pUpload->get_rate(); 
				if (lminrate == -1 || lrate < lminrate) 
				{ 
					pUp = m_connections[i]->m_pUpload; 
					lminrate = lrate; 
				} 
			} 
		} 
 
		if (!pUp)  
			break; 
 
		TRACE("nRate uncap() excute flush() minrate(%d)\r\n", lminrate); 
		pUp->flushed(); 
 
		if (m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate) 
			break; 
	} 
 
} 
 
bool CConnector::IsRatecap() 
{ 
	return m_bRateCapped; 
} 
 
void CConnector::SetMaxUploadRate(long lMaxUploadRate) 
{ 
	m_lMaxUploadRate = lMaxUploadRate; 
} 
 
void CConnector::OnIdle() 
{ 
	bool bSth = false; 
	vector vToPause, vToContinue, vToClose; 
	 
	BOOL bRet = m_criticalSection.Lock(); 
	if (!bRet) return; 
	 
	if (!m_vToPause.empty() || 
		!m_vToContinue.empty() ||  
		!m_vToClose.empty()) 
	{ 
		bSth = true; 
		vToPause = m_vToPause; 
		vToContinue = m_vToContinue; 
		vToClose = m_vToClose; 
 
		m_vToPause.erase(m_vToPause.begin(), m_vToPause.end()); 
		m_vToContinue.erase(m_vToContinue.begin(), m_vToContinue.end()); 
		m_vToClose.erase(m_vToClose.begin(), m_vToClose.end()); 
	} 
 
	bRet = m_criticalSection.Unlock();	 
	if (!bRet) return; 
 
	if (bSth) 
		_OnIdle(vToPause, vToContinue, vToClose); 
} 
 
void CConnector::_OnIdle(vector& vToPause, vector& vToContinue, vector& vToClose) 
{ 
	for (int i=0; iGetIP(lAddr, sPort); 
			 
			if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort) 
			{ 
				TRACE("user pause connection\r\n"); 
				m_connections[j]->Pause(true); 
				break; 
			} 
		} 
	}	 
	 
	for (i=0; iGetIP(lAddr, sPort); 
			 
			if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort) 
			{ 
				TRACE("user unpause connection\r\n"); 
				m_connections[j]->Pause(false); 
				break; 
			} 
		} 
	} 
 
	for (i=0; iGetIP(lAddr, sPort); 
			 
			if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort) 
			{ 
				TRACE("user close connection\r\n"); 
				m_connections[j]->Close(); 
				break; 
			} 
		} 
	} 
} 
 
void CConnector::PausePeer(CAddrPort addrPort, bool bPause) 
{ 
	BOOL bRet = m_criticalSection.Lock(); 
	if (!bRet) return; 
 
	if (bPause) 
		m_vToPause.push_back(addrPort); 
	else 
		m_vToContinue.push_back(addrPort); 
 
	bRet = m_criticalSection.Unlock(); 
	if (!bRet) return; 
} 
 
void CConnector::ClosePeer(CAddrPort addrPort) 
{ 
	BOOL bRet = m_criticalSection.Lock(); 
	if (!bRet) return; 
 
	m_vToClose.push_back(addrPort); 
	 
	bRet = m_criticalSection.Unlock(); 
	if (!bRet) return; 
}