www.pudn.com > GGBT.rar > Connector.cpp
// Connector.cpp: implementation of the CConnector class.
//
//////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "testbt.h"
#include "Connector.h"
#include "Connection.h"
#include "Encrypter.h"
#include "Upload.h"
#include "Downloader.h"
#include "Choker.h"
#ifdef _DEBUG
#undef THIS_FILE
static char THIS_FILE[]=__FILE__;
#define new DEBUG_NEW
#endif
long toint(char* pBuf)
{
unsigned char* p = (unsigned char*) pBuf;
return p[0]*0x1000000 + p[1]*0x10000 + p[2]*0x100 + p[3];
}
char* tobinary(long lnum, char* pBuf, long length=4)
{
char* p = (char*)&lnum;
pBuf[0] = p[3];
pBuf[1] = p[2];
pBuf[2] = p[1];
pBuf[3] = p[0];
return pBuf;
}
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
CConnector::CConnector(long lMaxUploadRate)
{
m_lMaxUploadRate = lMaxUploadRate;
}
CConnector::~CConnector()
{
for (int i=0; im_pDownload;
delete m_connections[i]->m_pUpload;
delete m_connections[i];
}
}
void CConnector::connection_made(CEncryptedConnection *pEConnection)
{
CConnection* pnew = new CConnection(pEConnection, this);
m_connections.push_back(pnew);
pnew->m_pUpload = m_pUploader->make_upload(pnew);
pnew->m_pDownload = m_pDownloader->make_download(pnew);
m_pChoker->connection_made(pnew);
}
void CConnector::Create(CDownloader* pDownloader, CUploader* pUploader, CChoker* pchoker,
long lNumPieces, CStorageWrapper* pStorage, CMeasure* pUpTotalMeasure)
{
m_pDownloader = pDownloader;
m_pUploader = pUploader;
m_lNumPieces = lNumPieces;
m_pStorage = pStorage;
m_pUpTotalMeasure = pUpTotalMeasure;
m_pChoker = pchoker;
m_bEndgame = false;
m_bRateCapped = false;
m_tcap = 0;
m_tTimeUntilUncap = 0;
check_endgame();
}
void CConnector::connection_lost(CEncryptedConnection* pEConnection)
{
bool bfind = false;
for (int i=0; im_pEConnection == pEConnection)
{
m_pChoker->connection_lost(m_connections[i]);
m_connections[i]->m_pDownload->disconnected();
delete m_connections[i]->m_pDownload;
delete m_connections[i]->m_pUpload;
delete m_connections[i];
m_connections.erase(m_connections.begin() + i);
bfind = true;
break;
}
}
assert(bfind);
}
void CConnector::connection_flushed(CEncryptedConnection *pEConnection)
{
bool bfind = false;
for (int i=0; im_pEConnection == pEConnection)
{
m_connections[i]->m_pUpload->flushed();
bfind = true;
break;
}
}
assert(bfind);
}
long CConnector::how_many_connections()
{
return m_connections.size();
}
const CDownloader* const CConnector::GetDownloader() const
{
return m_pDownloader;
};
const CMeasure* const CConnector::GetUpTotalMeasure() const
{
return m_pUpTotalMeasure;
}
void CConnector::got_message(CEncryptedConnection* pEConnection, memstream& memMessage)
{
CConnection* pCon = 0;
for (int i=0; im_pEConnection == pEConnection)
{
pCon = m_connections[i];
break;
}
}
assert(pCon);
char* pBuf = memMessage;
long length = memMessage.size();
char t = (*pBuf);
if (t == BITFIELD && pCon->m_bGotAnything)
{
pEConnection->Close();
return;
}
pCon->m_bGotAnything = true;
if (t == CHOKE || t == UNCHOKE || t == INTERESTED || t == NOT_INTERESTED)
{
if (length != 1)
{
pEConnection->Close();
return;
}
}
switch (t)
{
case CHOKE:
{
pCon->m_pDownload->got_choke();
}
break;
case UNCHOKE:
{
pCon->m_pDownload->got_unchoke();
check_endgame();
}
break;
case INTERESTED:
{
pCon->m_pUpload->got_interested();
}
break;
case NOT_INTERESTED:
{
pCon->m_pUpload->got_not_interested();
}
break;
case HAVE:
{
if (length != 5)
{
pCon->Close();
return;
}
long index = toint(pBuf + 1);
if (index >= m_lNumPieces || index < 0)
{
pCon->Close();
return;
}
pCon->m_pDownload->got_have(index);
check_endgame();
}
break;
case BITFIELD:
{
memstream memBitfield;
memBitfield.write(pBuf+1, length - 1);
vector vHave;
if (!bitfield_to_booleans(memBitfield, m_lNumPieces, vHave))
{
pCon->Close();
return;
}
pCon->m_pDownload->got_have_bitfield(vHave);
check_endgame();
}
break;
case REQUEST:
{
if (length != 13)
{
pCon->Close();
return;
}
long index = toint(pBuf + 1);
if (index >= m_lNumPieces)
{
pCon->Close();
return;
}
pCon->m_pUpload->got_request(index, toint(pBuf + 5), toint(pBuf+9));
}
break;
case CANCEL:
{
if (length != 13)
{
pCon->Close();
return;
}
long index = toint(pBuf + 1);
if (index >= m_lNumPieces)
{
pCon->Close();
return;
}
pCon->m_pUpload->got_cancel(index, toint(pBuf + 5), toint(pBuf+9));
}
break;
case PIECE:
{
if (length <= 9)
{
pCon->Close();
return;
}
long index = toint(pBuf + 1);
long begin = toint(pBuf+5);
if (index >= m_lNumPieces)
{
pCon->Close();
return;
}
memMessage.TrimLeft(9);
// memstream memPiece;
// memPiece.write(pBuf+9, length - 9);
// if (pCon->m_pDownload->got_piece(index, toint(pBuf+5), memPiece))
if (pCon->m_pDownload->got_piece(index, begin, memMessage))
{
for (int i=0; isend_have(index);
}
}
check_endgame();
}
break;
default:
{
pCon->Close();
return;
}
break;
}
}
void CConnector::check_endgame()
{
if (!m_pDownloader->is_endgame() && m_pStorage->is_everything_pending())
{
m_bEndgame = true;
m_pDownloader->make_endgame();
}
}
void CConnector::_update_upload_rate(long lAmount)
{
m_pUpTotalMeasure->update_rate(lAmount);
if (m_lMaxUploadRate> 0 && m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
{
m_bRateCapped = true;
time(&m_tcap);
m_tTimeUntilUncap = m_pUpTotalMeasure->time_until_rate(m_lMaxUploadRate);
TRACE("rate capeped() wait time :(%d)\r\n", m_tTimeUntilUncap);
}
}
void CConnector::uncap()
{
if (!m_bRateCapped)
return;
// check the time to excute?
time_t t;
time(&t);
time_t tspan = t - m_tcap;
assert(tspan >= 0);
if (tspan < 0)
m_tcap = t;
if (tspan < m_tTimeUntilUncap)
return ;
TRACE("Rate uncap()\r\n");
// flush the smaller rate connection to up the rate, until rate are recapped or all data flushed.
m_bRateCapped = false;
while (!m_bRateCapped)
{
CUpload * pUp = 0;
long lminrate = -1;
for (int i=0; im_pUpload->is_choked() &&
m_connections[i]->m_pUpload->has_queries() &&
m_connections[i]->IsFlush())
{
long lrate = m_connections[i]->m_pUpload->get_rate();
if (lminrate == -1 || lrate < lminrate)
{
pUp = m_connections[i]->m_pUpload;
lminrate = lrate;
}
}
}
if (!pUp)
break;
TRACE("nRate uncap() excute flush() minrate(%d)\r\n", lminrate);
pUp->flushed();
if (m_pUpTotalMeasure->get_rate_noupdate() > m_lMaxUploadRate)
break;
}
}
bool CConnector::IsRatecap()
{
return m_bRateCapped;
}
void CConnector::SetMaxUploadRate(long lMaxUploadRate)
{
m_lMaxUploadRate = lMaxUploadRate;
}
void CConnector::OnIdle()
{
bool bSth = false;
vector vToPause, vToContinue, vToClose;
BOOL bRet = m_criticalSection.Lock();
if (!bRet) return;
if (!m_vToPause.empty() ||
!m_vToContinue.empty() ||
!m_vToClose.empty())
{
bSth = true;
vToPause = m_vToPause;
vToContinue = m_vToContinue;
vToClose = m_vToClose;
m_vToPause.erase(m_vToPause.begin(), m_vToPause.end());
m_vToContinue.erase(m_vToContinue.begin(), m_vToContinue.end());
m_vToClose.erase(m_vToClose.begin(), m_vToClose.end());
}
bRet = m_criticalSection.Unlock();
if (!bRet) return;
if (bSth)
_OnIdle(vToPause, vToContinue, vToClose);
}
void CConnector::_OnIdle(vector& vToPause, vector& vToContinue, vector& vToClose)
{
for (int i=0; iGetIP(lAddr, sPort);
if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
{
TRACE("user pause connection\r\n");
m_connections[j]->Pause(true);
break;
}
}
}
for (i=0; iGetIP(lAddr, sPort);
if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
{
TRACE("user unpause connection\r\n");
m_connections[j]->Pause(false);
break;
}
}
}
for (i=0; iGetIP(lAddr, sPort);
if (addrPort.m_lAddr == lAddr && addrPort.m_sPort == sPort)
{
TRACE("user close connection\r\n");
m_connections[j]->Close();
break;
}
}
}
}
void CConnector::PausePeer(CAddrPort addrPort, bool bPause)
{
BOOL bRet = m_criticalSection.Lock();
if (!bRet) return;
if (bPause)
m_vToPause.push_back(addrPort);
else
m_vToContinue.push_back(addrPort);
bRet = m_criticalSection.Unlock();
if (!bRet) return;
}
void CConnector::ClosePeer(CAddrPort addrPort)
{
BOOL bRet = m_criticalSection.Lock();
if (!bRet) return;
m_vToClose.push_back(addrPort);
bRet = m_criticalSection.Unlock();
if (!bRet) return;
}