www.pudn.com > DCPlusPlus-src.zip > ConnectionManager.cpp
/*
* Copyright (C) 2001-2004 Jacek Sieka, j_s at telia com
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
#include "stdinc.h"
#include "DCPlusPlus.h"
#include "ConnectionManager.h"
#include "DownloadManager.h"
#include "UploadManager.h"
#include "CryptoManager.h"
#include "ClientManager.h"
#include "QueueManager.h"
#include "UserConnection.h"
ConnectionManager::ConnectionManager() : floodCounter(0), shuttingDown(false) {
TimerManager::getInstance()->addListener(this);
socket.addListener(this);
features.push_back(UserConnection::FEATURE_MINISLOTS);
features.push_back(UserConnection::FEATURE_XML_BZLIST);
features.push_back(UserConnection::FEATURE_ADCGET);
features.push_back(UserConnection::FEATURE_TTHL);
features.push_back(UserConnection::FEATURE_TTHF);
};
/**
* Request a connection for downloading.
* DownloadManager::addConnection will be called as soon as the connection is ready
* for downloading.
* @param aUser The user to connect to.
*/
void ConnectionManager::getDownloadConnection(const User::Ptr& aUser) {
dcassert((bool)aUser);
ConnectionQueueItem* cqi = NULL;
{
Lock l(cs);
// Check the download pool
if(find(downPool.begin(), downPool.end(), aUser) != downPool.end()) {
if(find(pendingAdd.begin(), pendingAdd.end(), aUser) == pendingAdd.end())
pendingAdd.push_back(aUser);
return;
}
// See if we're already trying to connect
if(find(pendingDown.begin(), pendingDown.end(), aUser) != pendingDown.end())
return;
// Check if we have an active download connection already
for(ConnectionQueueItem::Iter j = active.begin(); j != active.end(); ++j) {
dcassert((*j)->getConnection());
if((*j == aUser) && ((*j)->getConnection()->isSet(UserConnection::FLAG_DOWNLOAD)))
return;
}
// Add it to the pending...
cqi = new ConnectionQueueItem(aUser);
cqi->setState(ConnectionQueueItem::WAITING);
pendingDown.push_back(cqi);
fire(ConnectionManagerListener::Added(), cqi);
}
}
void ConnectionManager::putDownloadConnection(UserConnection* aSource, bool reuse /* = false */) {
// Pool it for later usage...
if(reuse) {
aSource->addListener(this);
{
Lock l(cs);
aSource->getCQI()->setState(ConnectionQueueItem::IDLE);
dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end());
active.erase(find(active.begin(), active.end(), aSource->getCQI()));
downPool.push_back(aSource->getCQI());
}
dcdebug("ConnectionManager::putDownloadConnection Pooling reusable connection %p to %s\n", aSource, aSource->getUser()->getNick().c_str());
} else {
if(QueueManager::getInstance()->hasDownload(aSource->getCQI()->getUser())) {
aSource->removeListeners();
aSource->disconnect();
Lock l(cs);
ConnectionQueueItem* cqi = aSource->getCQI();
dcassert(cqi);
// Remove the userconnection, don't need it any more
dcassert(find(userConnections.begin(), userConnections.end(), aSource) != userConnections.end());
userConnections.erase(find(userConnections.begin(), userConnections.end(), aSource));
pendingDelete.push_back(aSource);
cqi->setConnection(NULL);
cqi->setState(ConnectionQueueItem::WAITING);
dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end());
active.erase(find(active.begin(), active.end(), aSource->getCQI()));
cqi->setLastAttempt(GET_TICK());
pendingDown.push_back(cqi);
} else {
{
Lock l(cs);
dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end());
active.erase(find(active.begin(), active.end(), aSource->getCQI()));
}
putConnection(aSource);
}
}
}
void ConnectionManager::putUploadConnection(UserConnection* aSource) {
{
Lock l(cs);
dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end());
active.erase(find(active.begin(), active.end(), aSource->getCQI()));
}
putConnection(aSource);
}
void ConnectionManager::putConnection(UserConnection* aConn) {
aConn->removeListeners();
aConn->disconnect();
ConnectionQueueItem* cqi = NULL;
{
Lock l(cs);
cqi = aConn->getCQI();
dcassert(find(userConnections.begin(), userConnections.end(), aConn) != userConnections.end());
userConnections.erase(find(userConnections.begin(), userConnections.end(), aConn));
pendingDelete.push_back(aConn);
}
if(cqi) {
fire(ConnectionManagerListener::Removed(), cqi);
delete cqi;
}
}
void ConnectionManager::on(TimerManagerListener::Second, u_int32_t aTick) throw() {
ConnectionQueueItem::List failPassive;
ConnectionQueueItem::List connecting;
ConnectionQueueItem::List removed;
User::List getDown;
{
Lock l(cs);
{
for(User::Iter k = pendingAdd.begin(); k != pendingAdd.end(); ++k) {
ConnectionQueueItem::Iter i = find(downPool.begin(), downPool.end(), *k);
if(i == downPool.end()) {
// Hm, connection must have failed before it could be collected...
getDown.push_back(*k);
} else {
ConnectionQueueItem* cqi = *i;
downPool.erase(i);
dcassert(find(active.begin(), active.end(), cqi) == active.end());
active.push_back(cqi);
dcassert(cqi->getConnection());
dcassert(cqi->getConnection()->getCQI() == cqi);
cqi->getConnection()->removeListener(this);
DownloadManager::getInstance()->addConnection(cqi->getConnection());
}
}
pendingAdd.clear();
}
bool tooMany = ((SETTING(DOWNLOAD_SLOTS) != 0) && DownloadManager::getInstance()->getDownloads() >= (size_t)SETTING(DOWNLOAD_SLOTS));
bool tooFast = ((SETTING(MAX_DOWNLOAD_SPEED) != 0 && DownloadManager::getInstance()->getAverageSpeed() >= (SETTING(MAX_DOWNLOAD_SPEED)*1024)));
bool startDown = !tooMany && !tooFast;
int attempts = 0;
ConnectionQueueItem::Iter i = pendingDown.begin();
while(i != pendingDown.end()) {
ConnectionQueueItem* cqi = *i;
dcassert(cqi->getUser());
if(!cqi->getUser()->isOnline()) {
// Not online anymore...remove him from the pending...
i = pendingDown.erase(i);
removed.push_back(cqi);
continue;
}
if( ((cqi->getLastAttempt() + 60*1000) < aTick) && (attempts < 2) ) {
cqi->setLastAttempt(aTick);
if(!QueueManager::getInstance()->hasDownload(cqi->getUser())) {
i = pendingDown.erase(i);
removed.push_back(cqi);
continue;
}
if(cqi->getUser()->isSet(User::PASSIVE) && (SETTING(CONNECTION_TYPE) != SettingsManager::CONNECTION_ACTIVE)) {
i = pendingDown.erase(i);
failPassive.push_back(cqi);
continue;
}
// Always start high-priority downloads unless we have 3 more than maxdownslots already...
if(!startDown) {
bool extraFull = (SETTING(DOWNLOAD_SLOTS) != 0) && (DownloadManager::getInstance()->getDownloads() >= (size_t)(SETTING(DOWNLOAD_SLOTS)+3));
startDown = !extraFull && QueueManager::getInstance()->hasDownload(cqi->getUser(), QueueItem::HIGHEST);
}
if(cqi->getState() == ConnectionQueueItem::WAITING) {
if(startDown) {
cqi->setState(ConnectionQueueItem::CONNECTING);
cqi->getUser()->connect();
fire(ConnectionManagerListener::StatusChanged(), cqi);
attempts++;
} else {
cqi->setState(ConnectionQueueItem::NO_DOWNLOAD_SLOTS);
fire(ConnectionManagerListener::Failed(), cqi, STRING(ALL_DOWNLOAD_SLOTS_TAKEN));
}
} else if(cqi->getState() == ConnectionQueueItem::NO_DOWNLOAD_SLOTS && startDown) {
cqi->setState(ConnectionQueueItem::WAITING);
}
} else if(((cqi->getLastAttempt() + 50*1000) < aTick) && (cqi->getState() == ConnectionQueueItem::CONNECTING)) {
fire(ConnectionManagerListener::Failed(), cqi, STRING(CONNECTION_TIMEOUT));
cqi->setState(ConnectionQueueItem::WAITING);
}
++i;
}
}
ConnectionQueueItem::Iter m;
for(m = removed.begin(); m != removed.end(); ++m) {
fire(ConnectionManagerListener::Removed(), *m);
delete *m;
}
for(m = failPassive.begin(); m != failPassive.end(); ++m) {
QueueManager::getInstance()->removeSources((*m)->getUser(), QueueItem::Source::FLAG_PASSIVE);
fire(ConnectionManagerListener::Removed(), *m);
delete *m;
}
for(User::Iter n = getDown.begin(); n != getDown.end(); ++n) {
getDownloadConnection(*n);
}
}
void ConnectionManager::on(TimerManagerListener::Minute, u_int32_t aTick) throw() {
Lock l(cs);
{
for(UserConnection::Iter j = userConnections.begin(); j != userConnections.end(); ++j) {
if(((*j)->getLastActivity() + 180*1000) < aTick) {
(*j)->disconnect();
}
}
}
for_each(pendingDelete.begin(), pendingDelete.end(), DeleteFunction());
pendingDelete.clear();
}
static const u_int32_t FLOOD_TRIGGER = 10000;
static const u_int32_t FLOOD_ADD = 2000;
/**
* Someone's connecting, accept the connection and wait for identification...
* It's always the other fellow that starts sending if he made the connection.
*/
void ConnectionManager::on(ServerSocketListener::IncomingConnection) throw() {
UserConnection* uc = NULL;
u_int32_t now = GET_TICK();
if(now > floodCounter) {
floodCounter = now + FLOOD_ADD;
} else {
if(now + FLOOD_TRIGGER < floodCounter) {
Socket s;
try {
s.accept(socket);
} catch(const SocketException&) {
// ...
}
dcdebug("Connection flood detected!\n");
return;
} else {
floodCounter += 2000;
}
}
try {
uc = getConnection();
uc->setFlag(UserConnection::FLAG_INCOMING);
uc->setState(UserConnection::STATE_NICK);
uc->setLastActivity(GET_TICK());
uc->accept(socket);
} catch(const SocketException& e) {
dcdebug("ConnectionManager::OnIncomingConnection caught: %s\n", e.getError().c_str());
if(uc)
putConnection(uc);
}
}
void ConnectionManager::connect(const string& aServer, short aPort, const string& aNick) {
if(shuttingDown)
return;
UserConnection* uc = NULL;
try {
uc = getConnection();
uc->setNick(aNick);
uc->setState(UserConnection::STATE_CONNECT);
uc->connect(aServer, aPort);
} catch(const SocketException&) {
if(uc)
putConnection(uc);
}
}
void ConnectionManager::on(UserConnectionListener::Connected, UserConnection* aSource) throw() {
dcassert(aSource->getState() == UserConnection::STATE_CONNECT);
aSource->myNick(aSource->getNick());
aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
aSource->setState(UserConnection::STATE_NICK);
}
/**
* Nick received. If it's a downloader, fine, otherwise it must be an uploader.
*/
void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSource, const string& aNick) throw() {
if(aSource->getState() != UserConnection::STATE_NICK) {
// Already got this once, ignore...
dcdebug("CM::onMyNick %p sent nick twice\n", aSource);
return;
}
dcassert(aNick.size() > 0);
dcdebug("ConnectionManager::onMyNick %p, %s\n", aSource, aNick.c_str());
dcassert(!aSource->getUser());
// First, we try looking in the pending downloads...hopefully it's one of them...
{
Lock l(cs);
for(ConnectionQueueItem::Iter i = pendingDown.begin(); i != pendingDown.end(); ++i) {
ConnectionQueueItem* cqi = *i;
if(cqi->getUser()->getNick() == aNick) {
aSource->setUser(cqi->getUser());
// Indicate that we're interested in this file...
aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
}
}
}
if(!aSource->getUser()) {
// Make sure we know who it is, i e that he/she is connected...
if(!ClientManager::getInstance()->isOnline(aNick)) {
dcdebug("CM::onMyNick Incoming connection from unknown user %s\n", aNick.c_str());
putConnection(aSource);
return;
}
aSource->setUser(ClientManager::getInstance()->getUser(aNick));
// We don't need this connection for downloading...make it an upload connection instead...
aSource->setFlag(UserConnection::FLAG_UPLOAD);
}
if( aSource->isSet(UserConnection::FLAG_INCOMING) ) {
aSource->myNick(aSource->getUser()->getClientNick());
aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
}
aSource->setState(UserConnection::STATE_LOCK);
}
void ConnectionManager::on(UserConnectionListener::CLock, UserConnection* aSource, const string& aLock, const string& aPk) throw() {
if(aSource->getState() != UserConnection::STATE_LOCK) {
dcdebug("CM::onLock %p received lock twice, ignoring\n", aSource);
return;
}
if( CryptoManager::getInstance()->isExtended(aLock) ) {
// Alright, we have an extended protocol, set a user flag for this user and refresh his info...
if( (aPk.find("DCPLUSPLUS") != string::npos) && aSource->getUser()) {
aSource->getUser()->setFlag(User::DCPLUSPLUS);
User::updated(aSource->getUser());
}
StringList defFeatures = features;
if(BOOLSETTING(COMPRESS_TRANSFERS)) {
defFeatures.push_back(UserConnection::FEATURE_GET_ZBLOCK);
defFeatures.push_back(UserConnection::FEATURE_ZLIB_GET);
}
aSource->supports(defFeatures);
}
aSource->setState(UserConnection::STATE_DIRECTION);
aSource->direction(aSource->getDirectionString(), aSource->getNumber());
aSource->key(CryptoManager::getInstance()->makeKey(aLock));
}
void ConnectionManager::on(UserConnectionListener::Direction, UserConnection* aSource, const string& dir, const string& num) throw() {
if(aSource->getState() != UserConnection::STATE_DIRECTION) {
dcdebug("CM::onDirection %p received direction twice, ignoring\n", aSource);
return;
}
dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
if(dir == "Upload") {
// Fine, the other fellow want's to send us data...make sure we really want that...
if(aSource->isSet(UserConnection::FLAG_UPLOAD)) {
// Huh? Strange...disconnect...
putConnection(aSource);
return;
}
} else {
if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
int number = Util::toInt(num);
// Damn, both want to download...the one with the highest number wins...
if(aSource->getNumber() < number) {
// Damn! We lost!
aSource->unsetFlag(UserConnection::FLAG_DOWNLOAD);
aSource->setFlag(UserConnection::FLAG_UPLOAD);
} else if(aSource->getNumber() == number) {
putConnection(aSource);
return;
}
}
}
dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD));
aSource->setState(UserConnection::STATE_KEY);
}
void ConnectionManager::on(UserConnectionListener::Key, UserConnection* aSource, const string&/* aKey*/) throw() {
if(aSource->getState() != UserConnection::STATE_KEY) {
dcdebug("CM::onKey Bad state, ignoring");
return;
}
// We don't want any messages while the Up/DownloadManagers are working...
aSource->removeListener(this);
dcassert(aSource->getUser());
{
Lock l(cs);
// Only one connection / user & direction...
for(ConnectionQueueItem::Iter k = active.begin(); k != active.end(); ++k) {
bool sameDirection = (*k)->getConnection()->isSet(UserConnection::FLAG_UPLOAD) == aSource->isSet(UserConnection::FLAG_UPLOAD);
if( sameDirection && (*k == aSource->getUser()) ) {
putConnection(aSource);
return;
}
}
ConnectionQueueItem* cqi = NULL;
if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
// See if we have a matching user in the pending connections...
ConnectionQueueItem::Iter i = find(pendingDown.begin(), pendingDown.end(), aSource->getUser());
if(i == pendingDown.end()) {
putConnection(aSource);
return;
}
cqi = *i;
pendingDown.erase(i);
cqi->setConnection(aSource);
} else {
dcassert(aSource->isSet(UserConnection::FLAG_UPLOAD));
cqi = new ConnectionQueueItem(aSource->getUser());
cqi->setConnection(aSource);
fire(ConnectionManagerListener::Added(), cqi);
}
aSource->setCQI(cqi);
dcassert(find(active.begin(), active.end(), cqi) == active.end());
active.push_back(cqi);
fire(ConnectionManagerListener::Connected(), cqi);
if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
dcdebug("ConnectionManager::onKey, leaving to downloadmanager\n");
DownloadManager::getInstance()->addConnection(aSource);
} else {
dcassert(aSource->isSet(UserConnection::FLAG_UPLOAD));
dcdebug("ConnectionManager::onKey, leaving to uploadmanager\n");
UploadManager::getInstance()->addConnection(aSource);
}
}
}
void ConnectionManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& /*aError*/) throw() {
if(aSource->isSet(UserConnection::FLAG_DOWNLOAD) && aSource->getCQI()) {
{
Lock l(cs);
for(ConnectionQueueItem::Iter i = downPool.begin(); i != downPool.end(); ++i) {
dcassert((*i)->getConnection());
if((*i)->getConnection() == aSource) {
dcdebug("ConnectionManager::onError Removing connection %p to %s from active pool\n", aSource, aSource->getUser()->getNick().c_str());
downPool.erase(i);
break;
}
}
}
}
putConnection(aSource);
}
void ConnectionManager::removeConnection(const User::Ptr& aUser, int isDownload) {
Lock l(cs);
for(UserConnection::Iter i = userConnections.begin(); i != userConnections.end(); ++i) {
UserConnection* uc = *i;
if(uc->getUser() == aUser && uc->isSet(isDownload ? UserConnection::FLAG_DOWNLOAD : UserConnection::FLAG_UPLOAD)) {
uc->disconnect();
break;
}
}
}
void ConnectionManager::shutdown() {
shuttingDown = true;
socket.removeListener(this);
socket.disconnect();
{
Lock l(cs);
for(UserConnection::Iter j = userConnections.begin(); j != userConnections.end(); ++j) {
(*j)->disconnect();
}
}
// Wait until all connections have died out...
while(true) {
{
Lock l(cs);
if(userConnections.empty()) {
break;
}
}
Thread::sleep(50);
}
}
// UserConnectionListener
void ConnectionManager::on(UserConnectionListener::Supports, UserConnection* conn, const StringList& feat) throw() {
for(StringList::const_iterator i = feat.begin(); i != feat.end(); ++i) {
if(*i == UserConnection::FEATURE_GET_ZBLOCK)
conn->setFlag(UserConnection::FLAG_SUPPORTS_GETZBLOCK);
else if(*i == UserConnection::FEATURE_MINISLOTS)
conn->setFlag(UserConnection::FLAG_SUPPORTS_MINISLOTS);
else if(*i == UserConnection::FEATURE_XML_BZLIST)
conn->setFlag(UserConnection::FLAG_SUPPORTS_XML_BZLIST);
else if(*i == UserConnection::FEATURE_ADCGET)
conn->setFlag(UserConnection::FLAG_SUPPORTS_ADCGET);
else if(*i == UserConnection::FEATURE_ZLIB_GET)
conn->setFlag(UserConnection::FLAG_SUPPORTS_ZLIB_GET);
else if(*i == UserConnection::FEATURE_TTHL)
conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHL);
else if(*i == UserConnection::FEATURE_TTHF)
conn->setFlag(UserConnection::FLAG_SUPPORTS_TTHF);
}
}
/**
* @file
* $Id: ConnectionManager.cpp,v 1.78 2004/09/09 09:27:35 arnetheduck Exp $
*/