www.pudn.com > 2003090514065121890.zip > spConnPool.h
#ifndef __CONNPOOL_H_
#define __CONNPOOL_H_
//2003/05/13 添加长连接池管理功能
/*
2003/06/23
添加关于命名管道的长连接实现
关于Socket方式连接的错误检查,请参考文档中的附录部分
*/
#include "spIPCComm.h"
#include "spBaseLock.h"
#include "spServer.h"
namespace spBase
{//
class CCPBase;
class CConnImplement;
class CConnImplement //连接实现类必须实现的函数
{
public:
CConnImplement(){};
~CConnImplement(){};
virtual BOOL Connect(const CCPBase* pCP){return FALSE;};
virtual BOOL IsOK(void){return TRUE;};
virtual BOOL Close(void){return FALSE;};
};
class CCPBase //连接参数类必须实现的函数
{
friend CConnImplement;
public:
CCPBase(){};
~CCPBase(){};
CCPBase& operator = (const CCPBase& cp){return *this;};
};
//用于Socket连接的连接参数类
class CSocketConnPara : public CCPBase
{
public:
CSocketConnPara():m_szLocalAddr(""),m_szRemoteAddr(""),m_iLocalPort(0),m_iRemotePort(0){};
~CSocketConnPara(){};
CSocketConnPara& operator = (const CSocketConnPara& cp)
{
m_szLocalAddr=cp.m_szLocalAddr;
m_szRemoteAddr=cp.m_szRemoteAddr;
m_iLocalPort=cp.m_iLocalPort;
m_iRemotePort=cp.m_iRemotePort;
return *this;
};
public:
CString m_szLocalAddr,m_szRemoteAddr;
int m_iLocalPort,m_iRemotePort;
};
//实现Socket连接的长连接类
class CSocketImplement : public CConnImplement
{
public:
CSocketImplement();
~CSocketImplement();
virtual BOOL Connect(const CSocketConnPara* pCP);
virtual BOOL IsOK(void);
virtual BOOL Close(void);
commIPC::CTCPSocket* GetSocket(void){return m_psockConn;};
protected:
// CSocketConnPara m_cpConnection;
commIPC::CTCPSocket *m_psockConn; //Socket连接对象
};
template class CAliveConnectionPool
{//客户端网络连接池,保存所有的到服务器方的网络连接
//连接一定数量的SOCKET,并且将连接情况保存在数组中
//每个连接都有自己对应的服务器地址和端口,如果没有单独指定则使用默认地址和端口
//关于SocketID的说明:SocketID 对应着建立连接时所对应的顺序号码,一个连接的 SocketID = 0
public:
enum enumSocketStatus
{
SS_ERROR=0, //线路故障
SS_FREE, //线路可用
SS_USING, //线路正在被使用
SS_CONNECTING, //线路正在被连接
SS_PICKOUT //线路被取出进行检查
};
CAliveConnectionPool(LPCSTR pszSvrName="wacp",COutputDisplay* pDis=NULL):m_lockMap(pszSvrName,"maplk"),m_seqAccess(pszSvrName,"mapsq")
{
m_pConnection = NULL;
m_peUsedMap = NULL;
m_pdisOutput= pDis;
}
~CAliveConnectionPool()
{
for(int i=0;iPutLine(COutputDisplay::ErrorLevel::Message,szMsg);
}
}
protected:
//连接类
T *m_pConnection;
//显示类
COutputDisplay* m_pdisOutput;
//使用情况表:0:无效,1:空闲,2:使用中 3:连接中 4:已被取出
enumSocketStatus *m_peUsedMap;
CRWAccessLock m_lockMap;
CSequenceGenerator m_seqAccess;
int m_iMaxConnection;
};
template class CAliveConnectionPoolAdmin;
//客户端Socket连接池管理类
template
class CAliveConnectionPoolAdmin
{
public:
CAliveConnectionPoolAdmin(LPCSTR pszSvrName="dfspl",COutputDisplay* pDis=NULL,int iWaitInt=10):m_poolSocket(pszSvrName,pDis)
{
m_pConnPara = NULL;
m_hReConnHandle = m_hCheckHandle = NULL;
m_fRunning = FALSE;
m_iSleepInterval = iWaitInt;
}
~CAliveConnectionPoolAdmin()
{
StopDaemonThread();
if(m_pConnPara)
delete []m_pConnPara;
}
public:
//初始化所有连接数据,并建立连接
//必须第一个调用,pCP为每个连接所需要的参数数组
BOOL InitAllConnectionPara(int iMaxConn,CP* pCP)
{
if(m_pConnPara)
delete []m_pConnPara;
m_pConnPara = new CP[iMaxConn];
for(int i=0;i::enumSocketStatus* peMap,int iMaxLen)
{
return m_poolSocket.GetSocketUseMap(peMap,iMaxLen);
};
//得到最大连接数
int GetMaxConnection(void){return m_poolSocket.GetMaxConnection();};
//得到显示类指针
COutputDisplay* GetOutput(void){return m_poolSocket.GetOutput();};
//显示功能
void PrintfX(const char *pszFormat,...)
{
if(m_poolSocket.GetOutput())
{
char szMsg[1024*4];
va_list arg_ptr;
va_start(arg_ptr,pszFormat);
memset(szMsg,0,sizeof(szMsg));
vsprintf(szMsg,pszFormat,arg_ptr);
m_poolSocket.GetOutput()->PutLine(COutputDisplay::ErrorLevel::Message,szMsg);
}
}
protected:
CP *m_pConnPara;//连接参数数组
CAliveConnectionPool m_poolSocket; //连接池
HANDLE m_hReConnHandle,m_hCheckHandle;
BOOL m_fRunning;//是否继续运行
int m_iSleepInterval;//守护线程休息的时间,秒
};
// Socket连接方式,守护线程
DWORD SPConnPool_ReConnectThread_Socket(void* pNULL);
DWORD SPConnPool_CheckConnectionThread_Socket(void* pNULL);
// 对于其他连接方式,需要定义其他得守护线程函数,或者从 CAliveConnectionPoolAdmin 派生新类,并重载 CreateDaemonThread
template
class CSocketAliveConnectionPoolAdmin : public CAliveConnectionPoolAdmin
{
public:
CSocketAliveConnectionPoolAdmin(LPCSTR pszSvrName="dfsspl",COutputDisplay* pDis=NULL,int iWaitInt=10):CAliveConnectionPoolAdmin(pszSvrName,pDis,iWaitInt)
{
}
//创建相关守护线程,包括重连线程和检查线程
//如果创建失败,需要调用:StopDaemonThread 对可能创建的线程进行停止
virtual BOOL CreateDaemonThread(void) //创建相关的线程
{
DWORD dwID;
m_fRunning = TRUE;//开始运行
ASSERT(m_hReConnHandle==NULL);
if(threadReConn)
m_hReConnHandle= CreateThread(NULL,0,threadReConn,(void*)this,0,&dwID);
ASSERT(m_hCheckHandle==NULL);
if(threadCheck)
m_hCheckHandle= CreateThread(NULL,0,threadCheck,(void*)this,0,&dwID);
return (m_hReConnHandle!=NULL && m_hReConnHandle!=NULL);
}
//停止相关守护线程,直到线程停止退出后才返回
virtual BOOL StopDaemonThread(void)
{
if(m_fRunning)
{
m_fRunning = FALSE;
if(m_hReConnHandle)
{
WaitForSingleObject(m_hReConnHandle,INFINITE);
CloseHandle(m_hReConnHandle);
}
if(m_hCheckHandle)
{
WaitForSingleObject(m_hCheckHandle,INFINITE);
CloseHandle(m_hCheckHandle);
}
m_hReConnHandle = m_hCheckHandle = NULL;
}
return TRUE;
}
};
typedef CAliveConnectionPool CSocketPool;
typedef CSocketAliveConnectionPoolAdmin CSocketPoolAdmin;
//守护线程实现 Socket
//对错误 Socket 连接进行重连的线程
inline DWORD SPConnPool_ReConnectThread_Socket(void* pNULL)
{
CSocketPoolAdmin* pD = (CSocketPoolAdmin* )pNULL;
while(pD->IsRunning())
{
//find error connection
// & reconnect it
int iSocketID = 0;
if(pD->GetErrorSocket(iSocketID))
{
pD->PrintfX("[%d] CSocketPoolAdmin 检查到连接错误,准备进行重连接",iSocketID);
CSocketConnPara cpErr = pD->GetConnectionPara(iSocketID);
BOOL fReConn = pD->ReConnectSocket(iSocketID,&cpErr);
pD->PrintfX("[%d] CSocketPoolAdmin 错误连接重连接完成 ret= %s ",iSocketID,(fReConn)?"OK":"Failed");
}
Sleep(pD->GetSleepInterval()*1000);
}
return 0;
}
//检查连接是否正常的线程
inline DWORD SPConnPool_CheckConnectionThread_Socket(void* pNULL)
{
CSocketPoolAdmin* pD = (CSocketPoolAdmin* )pNULL;
while(pD->IsRunning())
{
int iSocketID;
CSocketImplement* pSock = pD->GetFreeSocket(iSocketID);
if(pSock)
{
if(pSock->IsOK())
{//正常
pD->FreeSocket(iSocketID);
}
else
{//连接出现错误
pD->SetErrorToSocket(iSocketID);
pD->PrintfX("[%d] CSocketPoolAdmin 检查到连接出现错误",iSocketID);
}
}
Sleep(pD->GetSleepInterval()*1000);
}
return 0;
}
//关于命名管道长连接的相关实现2003/06/23
//分别实现参数类,连接类,利用模板定义新的连接管理类和重定义守护线程
//用于命名管道连接的连接参数类
class CPipeConnPara : public CCPBase
{
public:
CPipeConnPara():m_szPipeName(""),m_iTimeout(0){};
~CPipeConnPara(){};
CPipeConnPara& operator = (const CPipeConnPara& cp)
{
m_szPipeName=cp.m_szPipeName;
m_iTimeout=cp.m_iTimeout;
return *this;
};
public:
CString m_szPipeName;
int m_iTimeout;
};
//实现命名管道连接的长连接类
class CPipeImplement : public CConnImplement
{
public:
CPipeImplement();
~CPipeImplement();
virtual BOOL Connect(const CPipeConnPara* pCP);
virtual BOOL IsOK(void){return TRUE;};//在外部对管道状态进行检查
virtual BOOL Close(void);
commIPC::CIPCNamedPipe* GetPipe(void){return m_ppipeConn;};
protected:
commIPC::CIPCNamedPipe *m_ppipeConn; //Socket连接对象
};
// Pipe连接方式,守护线程
DWORD SPConnPool_ReConnectThread_Pipe(void* pNULL);
DWORD SPConnPool_CheckConnectionThread_Pipe(void* pNULL);
typedef CAliveConnectionPool CPipePool;
typedef CSocketAliveConnectionPoolAdmin CPipePoolAdmin;
//守护线程实现 Pipe
//对错误 Pipe 连接进行重连的线程
inline DWORD SPConnPool_ReConnectThread_Pipe(void* pNULL)
{
CPipePoolAdmin* pD = (CPipePoolAdmin* )pNULL;
while(pD->IsRunning())
{
//find error connection
// & reconnect it
int iSocketID = 0;
if(pD->GetErrorSocket(iSocketID))
{
pD->PrintfX("[%d] CPipePoolAdmin 检查到连接错误,准备进行重连接",iSocketID);
CPipeConnPara cpErr = pD->GetConnectionPara(iSocketID);
BOOL fReConn = pD->ReConnectSocket(iSocketID,&cpErr);
pD->PrintfX("[%d] CPipePoolAdmin 错误连接重连接完成 ret= %s ",iSocketID,(fReConn)?"OK":"Failed");
}
Sleep(pD->GetSleepInterval()*1000);
}
return 0;
}
//检查连接是否正常的线程
inline DWORD SPConnPool_CheckConnectionThread_Pipe(void* pNULL)
{
CPipePoolAdmin* pD = (CPipePoolAdmin* )pNULL;
while(pD->IsRunning())
{
int iSocketID;
CPipeImplement* pSock = pD->GetFreeSocket(iSocketID);
if(pSock)
{
if(pSock->IsOK())
{//正常
pD->FreeSocket(iSocketID);
}
else
{//连接出现错误
pD->SetErrorToSocket(iSocketID);
pD->PrintfX("[%d] CPipePoolAdmin 检查到连接出现错误",iSocketID);
}
}
Sleep(pD->GetSleepInterval()*1000);
}
return 0;
}
}
#endif