www.pudn.com > 2003090514065121890.zip > spConnPool.h


#ifndef __CONNPOOL_H_ 
#define __CONNPOOL_H_ 
 
//2003/05/13 添加长连接池管理功能 
/* 
2003/06/23 
添加关于命名管道的长连接实现 
关于Socket方式连接的错误检查,请参考文档中的附录部分 
*/ 
 
#include "spIPCComm.h" 
#include "spBaseLock.h" 
#include "spServer.h" 
 
namespace spBase 
{// 
	 
	class CCPBase; 
	class CConnImplement; 
	 
	class CConnImplement //连接实现类必须实现的函数 
	{ 
	public: 
		CConnImplement(){}; 
		~CConnImplement(){}; 
		virtual BOOL Connect(const CCPBase* pCP){return FALSE;}; 
		virtual BOOL IsOK(void){return TRUE;}; 
		virtual BOOL Close(void){return FALSE;}; 
	}; 
	 
	class CCPBase //连接参数类必须实现的函数 
	{ 
		friend CConnImplement; 
	public: 
		CCPBase(){}; 
		~CCPBase(){}; 
		CCPBase& operator = (const CCPBase& cp){return *this;}; 
	}; 
	 
	//用于Socket连接的连接参数类 
	class CSocketConnPara : public CCPBase 
	{ 
	public: 
		CSocketConnPara():m_szLocalAddr(""),m_szRemoteAddr(""),m_iLocalPort(0),m_iRemotePort(0){}; 
		~CSocketConnPara(){}; 
		CSocketConnPara& operator = (const CSocketConnPara& cp) 
		{ 
			m_szLocalAddr=cp.m_szLocalAddr; 
			m_szRemoteAddr=cp.m_szRemoteAddr; 
			m_iLocalPort=cp.m_iLocalPort; 
			m_iRemotePort=cp.m_iRemotePort; 
			return *this; 
		}; 
	public: 
		CString m_szLocalAddr,m_szRemoteAddr; 
		int m_iLocalPort,m_iRemotePort; 
	}; 
	//实现Socket连接的长连接类 
	class CSocketImplement : public CConnImplement 
	{ 
	public: 
		CSocketImplement(); 
		~CSocketImplement(); 
		virtual BOOL Connect(const CSocketConnPara* pCP); 
		virtual BOOL IsOK(void); 
		virtual BOOL Close(void); 
		commIPC::CTCPSocket* GetSocket(void){return m_psockConn;}; 
	protected: 
		//	CSocketConnPara m_cpConnection; 
		commIPC::CTCPSocket *m_psockConn; //Socket连接对象 
	}; 
	 
	template  class CAliveConnectionPool 
	{//客户端网络连接池,保存所有的到服务器方的网络连接 
		//连接一定数量的SOCKET,并且将连接情况保存在数组中 
		//每个连接都有自己对应的服务器地址和端口,如果没有单独指定则使用默认地址和端口 
		//关于SocketID的说明:SocketID 对应着建立连接时所对应的顺序号码,一个连接的 SocketID = 0 
	public: 
		enum enumSocketStatus 
		{ 
			SS_ERROR=0, //线路故障 
				SS_FREE, //线路可用 
				SS_USING, //线路正在被使用 
				SS_CONNECTING, //线路正在被连接 
				SS_PICKOUT //线路被取出进行检查 
		}; 
		CAliveConnectionPool(LPCSTR pszSvrName="wacp",COutputDisplay* pDis=NULL):m_lockMap(pszSvrName,"maplk"),m_seqAccess(pszSvrName,"mapsq") 
		{ 
			m_pConnection = NULL; 
			m_peUsedMap = NULL; 
			m_pdisOutput= pDis; 
		} 
		~CAliveConnectionPool() 
		{ 
			for(int i=0;iPutLine(COutputDisplay::ErrorLevel::Message,szMsg); 
			} 
		} 
	protected: 
		//连接类 
		T *m_pConnection; 
		//显示类 
		COutputDisplay* m_pdisOutput; 
		//使用情况表:0:无效,1:空闲,2:使用中 3:连接中 4:已被取出 
		enumSocketStatus *m_peUsedMap; 
		CRWAccessLock m_lockMap; 
		CSequenceGenerator m_seqAccess; 
		int m_iMaxConnection; 
	}; 
 
	template  class CAliveConnectionPoolAdmin; 
 
	//客户端Socket连接池管理类 
	template  
	class CAliveConnectionPoolAdmin 
	{ 
	public: 
		CAliveConnectionPoolAdmin(LPCSTR pszSvrName="dfspl",COutputDisplay* pDis=NULL,int iWaitInt=10):m_poolSocket(pszSvrName,pDis) 
		{ 
			m_pConnPara = NULL; 
			m_hReConnHandle = m_hCheckHandle = NULL; 
			m_fRunning = FALSE; 
			m_iSleepInterval = iWaitInt; 
		} 
		~CAliveConnectionPoolAdmin() 
		{ 
			StopDaemonThread(); 
			if(m_pConnPara) 
				delete []m_pConnPara; 
		} 
	public: 
		//初始化所有连接数据,并建立连接 
		//必须第一个调用,pCP为每个连接所需要的参数数组 
		BOOL InitAllConnectionPara(int iMaxConn,CP* pCP) 
		{ 
			if(m_pConnPara) 
				delete []m_pConnPara; 
			 
			m_pConnPara = new CP[iMaxConn]; 
			for(int i=0;i::enumSocketStatus* peMap,int iMaxLen) 
		{ 
			return m_poolSocket.GetSocketUseMap(peMap,iMaxLen); 
		}; 
		//得到最大连接数 
		int GetMaxConnection(void){return m_poolSocket.GetMaxConnection();}; 
		//得到显示类指针 
		COutputDisplay* GetOutput(void){return m_poolSocket.GetOutput();}; 
		//显示功能 
		void PrintfX(const char *pszFormat,...) 
		{ 
			if(m_poolSocket.GetOutput()) 
			{ 
				char szMsg[1024*4]; 
				va_list arg_ptr; 
				va_start(arg_ptr,pszFormat); 
				memset(szMsg,0,sizeof(szMsg)); 
				vsprintf(szMsg,pszFormat,arg_ptr); 
				m_poolSocket.GetOutput()->PutLine(COutputDisplay::ErrorLevel::Message,szMsg); 
			} 
		} 
	protected: 
		CP *m_pConnPara;//连接参数数组 
		CAliveConnectionPool m_poolSocket; //连接池 
		HANDLE m_hReConnHandle,m_hCheckHandle; 
		BOOL m_fRunning;//是否继续运行 
		int m_iSleepInterval;//守护线程休息的时间,秒 
	}; 
 
	// Socket连接方式,守护线程 
	DWORD SPConnPool_ReConnectThread_Socket(void* pNULL); 
	DWORD SPConnPool_CheckConnectionThread_Socket(void* pNULL); 
	// 对于其他连接方式,需要定义其他得守护线程函数,或者从 CAliveConnectionPoolAdmin 派生新类,并重载 CreateDaemonThread 
 
	template   
	class CSocketAliveConnectionPoolAdmin : public CAliveConnectionPoolAdmin 
	{ 
	public: 
		CSocketAliveConnectionPoolAdmin(LPCSTR pszSvrName="dfsspl",COutputDisplay* pDis=NULL,int iWaitInt=10):CAliveConnectionPoolAdmin(pszSvrName,pDis,iWaitInt) 
		{ 
		} 
		//创建相关守护线程,包括重连线程和检查线程 
		//如果创建失败,需要调用:StopDaemonThread 对可能创建的线程进行停止 
		virtual BOOL CreateDaemonThread(void) //创建相关的线程 
		{ 
			DWORD dwID; 
			m_fRunning = TRUE;//开始运行 
			 
			ASSERT(m_hReConnHandle==NULL); 
			if(threadReConn) 
				m_hReConnHandle= CreateThread(NULL,0,threadReConn,(void*)this,0,&dwID); 
			 
			ASSERT(m_hCheckHandle==NULL); 
			if(threadCheck) 
				m_hCheckHandle= CreateThread(NULL,0,threadCheck,(void*)this,0,&dwID); 
			 
			return (m_hReConnHandle!=NULL && m_hReConnHandle!=NULL); 
		} 
		//停止相关守护线程,直到线程停止退出后才返回 
		virtual BOOL StopDaemonThread(void) 
		{ 
			if(m_fRunning) 
			{ 
				m_fRunning = FALSE; 
				if(m_hReConnHandle) 
				{ 
					WaitForSingleObject(m_hReConnHandle,INFINITE); 
					CloseHandle(m_hReConnHandle); 
				} 
				if(m_hCheckHandle) 
				{ 
					WaitForSingleObject(m_hCheckHandle,INFINITE); 
					CloseHandle(m_hCheckHandle); 
				} 
				m_hReConnHandle = m_hCheckHandle = NULL; 
			} 
			return TRUE; 
		} 
	}; 
 
	typedef CAliveConnectionPool CSocketPool; 
	typedef CSocketAliveConnectionPoolAdmin CSocketPoolAdmin; 
 
	//守护线程实现 Socket 
	//对错误 Socket 连接进行重连的线程 
	inline DWORD SPConnPool_ReConnectThread_Socket(void* pNULL) 
	{ 
		CSocketPoolAdmin* pD = (CSocketPoolAdmin* )pNULL; 
		while(pD->IsRunning()) 
		{ 
			//find error connection 
			// & reconnect it 
			int iSocketID = 0; 
			if(pD->GetErrorSocket(iSocketID)) 
			{ 
				pD->PrintfX("[%d] CSocketPoolAdmin 检查到连接错误,准备进行重连接",iSocketID); 
				CSocketConnPara cpErr = pD->GetConnectionPara(iSocketID); 
				BOOL fReConn = pD->ReConnectSocket(iSocketID,&cpErr); 
				pD->PrintfX("[%d] CSocketPoolAdmin 错误连接重连接完成 ret= %s ",iSocketID,(fReConn)?"OK":"Failed"); 
			} 
			Sleep(pD->GetSleepInterval()*1000); 
		} 
		return 0; 
	} 
	//检查连接是否正常的线程 
	inline DWORD SPConnPool_CheckConnectionThread_Socket(void* pNULL) 
	{ 
		CSocketPoolAdmin* pD = (CSocketPoolAdmin* )pNULL; 
		while(pD->IsRunning()) 
		{ 
			int iSocketID; 
			CSocketImplement* pSock = pD->GetFreeSocket(iSocketID); 
			if(pSock) 
			{ 
				if(pSock->IsOK()) 
				{//正常 
					pD->FreeSocket(iSocketID); 
				} 
				else 
				{//连接出现错误 
					pD->SetErrorToSocket(iSocketID); 
					pD->PrintfX("[%d] CSocketPoolAdmin 检查到连接出现错误",iSocketID); 
				} 
			} 
			Sleep(pD->GetSleepInterval()*1000); 
		} 
		return 0; 
	} 
 
	//关于命名管道长连接的相关实现2003/06/23 
	//分别实现参数类,连接类,利用模板定义新的连接管理类和重定义守护线程 
	//用于命名管道连接的连接参数类 
	class CPipeConnPara : public CCPBase 
	{ 
	public: 
		CPipeConnPara():m_szPipeName(""),m_iTimeout(0){}; 
		~CPipeConnPara(){}; 
		CPipeConnPara& operator = (const CPipeConnPara& cp) 
		{ 
			m_szPipeName=cp.m_szPipeName; 
			m_iTimeout=cp.m_iTimeout; 
			return *this; 
		}; 
	public: 
		CString m_szPipeName; 
		int m_iTimeout; 
	}; 
	//实现命名管道连接的长连接类 
	class CPipeImplement : public CConnImplement 
	{ 
	public: 
		CPipeImplement(); 
		~CPipeImplement(); 
		virtual BOOL Connect(const CPipeConnPara* pCP); 
		virtual BOOL IsOK(void){return TRUE;};//在外部对管道状态进行检查 
		virtual BOOL Close(void); 
		commIPC::CIPCNamedPipe* GetPipe(void){return m_ppipeConn;}; 
	protected: 
		commIPC::CIPCNamedPipe *m_ppipeConn; //Socket连接对象 
	}; 
	// Pipe连接方式,守护线程 
	DWORD SPConnPool_ReConnectThread_Pipe(void* pNULL); 
	DWORD SPConnPool_CheckConnectionThread_Pipe(void* pNULL); 
	 
	typedef CAliveConnectionPool CPipePool; 
	typedef CSocketAliveConnectionPoolAdmin CPipePoolAdmin; 
 
	//守护线程实现 Pipe 
	//对错误 Pipe 连接进行重连的线程 
	inline DWORD SPConnPool_ReConnectThread_Pipe(void* pNULL) 
	{ 
		CPipePoolAdmin* pD = (CPipePoolAdmin* )pNULL; 
		while(pD->IsRunning()) 
		{ 
			//find error connection 
			// & reconnect it 
			int iSocketID = 0; 
			if(pD->GetErrorSocket(iSocketID)) 
			{ 
				pD->PrintfX("[%d] CPipePoolAdmin 检查到连接错误,准备进行重连接",iSocketID); 
				CPipeConnPara cpErr = pD->GetConnectionPara(iSocketID); 
				BOOL fReConn = pD->ReConnectSocket(iSocketID,&cpErr); 
				pD->PrintfX("[%d] CPipePoolAdmin 错误连接重连接完成 ret= %s ",iSocketID,(fReConn)?"OK":"Failed"); 
			} 
			Sleep(pD->GetSleepInterval()*1000); 
		} 
		return 0; 
	} 
	//检查连接是否正常的线程 
	inline DWORD SPConnPool_CheckConnectionThread_Pipe(void* pNULL) 
	{ 
		CPipePoolAdmin* pD = (CPipePoolAdmin* )pNULL; 
		while(pD->IsRunning()) 
		{ 
			int iSocketID; 
			CPipeImplement* pSock = pD->GetFreeSocket(iSocketID); 
			if(pSock) 
			{ 
				if(pSock->IsOK()) 
				{//正常 
					pD->FreeSocket(iSocketID); 
				} 
				else 
				{//连接出现错误 
					pD->SetErrorToSocket(iSocketID); 
					pD->PrintfX("[%d] CPipePoolAdmin 检查到连接出现错误",iSocketID); 
				} 
			} 
			Sleep(pD->GetSleepInterval()*1000); 
		} 
		return 0; 
	} 
} 
#endif