www.pudn.com > cxThreadPool.zip > cxThreadPool.h


/****************************************************************** 
*  Thread Pool For Win32  
*  VC++ 6, BC++ 5.5(Free), GCC(Free) 
*  Update : 2004.6.9 llBird 
 
Use: 
1): 
void threadfunc(void *p) 
{ 
	//... 
} 
	cxThreadPool tp; 
	for(i=0; i<100; i++) 
		tp.Call(threadfunc); 
 
	cxThreadPool tp(20);//20为初始线程池规模 
	tp.Call(threadfunc, lpPara); 
	tp.AdjustSize(50);//增加50 
	tp.AdjustSize(-30);//减少30 
 
 
2): 
class MyThreadJob : public cxThreadJob //线程对象从ThreadJob扩展 
{ 
public: 
	virtual void DoJob(void *p)//自定义的虚函数 
	{ 
		//.... 
	} 
}; 
	MyThreadJob mt[10]; 
	cxThreadPool tp; 
	for(i=0; i<100 i++) 
		tp.Call(mt + i);//tp.Call(mt + i, para); 
 
*******************************************************************/ 
#ifndef _CX_THREADPOOL_H_ 
#define _CX_THREADPOOL_H_ 
 
#pragma warning(disable: 4530) 
#pragma warning(disable: 4786) 
 
#include  
#include  
#include  
#include  
 
 
class cxThreadJob	 //工作基类 
{ 
public: 
	//供线程池调用的虚函数 
	virtual void DoJob(void *pPara) = 0; 
}; 
 
class cxThreadPool 
{ 
private: 
 
	cxThreadPool(const cxThreadPool&); 
	operator = (const cxThreadPool&); 
 
public: 
	//dwNum 线程池规模 
	cxThreadPool(DWORD dwNum = 4) : _lThreadNum(0), _lRunningNum(0)  
	{ 
		InitializeCriticalSection(&_csThreadVector); 
		InitializeCriticalSection(&_csWorkQueue); 
 
		_EventComplete = CreateEvent(0, false, false, NULL); 
		_EventEnd = CreateEvent(0, true, false, NULL); 
		_SemaphoreCall = CreateSemaphore(0, 0,  0x7FFFFFFF, NULL); 
		_SemaphoreDel =  CreateSemaphore(0, 0,  0x7FFFFFFF, NULL); 
 
		assert(_SemaphoreCall != INVALID_HANDLE_VALUE); 
		assert(_EventComplete != INVALID_HANDLE_VALUE); 
		assert(_EventEnd != INVALID_HANDLE_VALUE); 
		assert(_SemaphoreDel != INVALID_HANDLE_VALUE); 
 
		AdjustSize(dwNum <= 0 ? 4 : dwNum); 
	} 
 
	virtual ~cxThreadPool() 
	{ 
		EndAndWait(100); 
 
		DeleteCriticalSection(&_csWorkQueue); 
 
		CloseHandle(_EventEnd); 
		CloseHandle(_EventComplete); 
		CloseHandle(_SemaphoreCall); 
		CloseHandle(_SemaphoreDel); 
		 
		vector::iterator iter; 
		for(iter = _ThreadVector.begin(); iter != _ThreadVector.end(); iter++) 
		{ 
			if(*iter) 
				delete *iter; 
		} 
 
		DeleteCriticalSection(&_csThreadVector); 
	} 
	//调整线程池规模 
	int AdjustSize(int iNum) 
	{ 
		if(iNum > 0) 
		{ 
			ThreadItem *pNew; 
			EnterCriticalSection(&_csThreadVector); 
			for(int _i=0; _i_Handle = CreateThread(NULL, 0, DefaultJobProc, pNew, 0, NULL); 
				assert(pNew->_Handle); 
			} 
			LeaveCriticalSection(&_csThreadVector); 
		} 
		else 
		{ 
			iNum *= -1; 
			ReleaseSemaphore(_SemaphoreDel,  iNum > _lThreadNum ? _lThreadNum : iNum, NULL); 
		} 
		return (int)_lThreadNum; 
	} 
	//调用线程池 
	void Call(void (*pFunc)(void  *), void *pPara = NULL) 
	{ 
		assert(pFunc); 
 
		EnterCriticalSection(&_csWorkQueue); 
		_JobQueue.push(new JobItem(pFunc, pPara)); 
		LeaveCriticalSection(&_csWorkQueue); 
 
		ReleaseSemaphore(_SemaphoreCall, 1, NULL); 
	} 
	//调用线程池 
	inline void Call(cxThreadJob * p, void *pPara = NULL) 
	{ 
		Call(CallProc, new CallProcPara(p, pPara)); 
	} 
	//结束线程池, 并同步等待 
	bool EndAndWait(DWORD dwWaitTime = INFINITE) 
	{ 
		SetEvent(_EventEnd); 
		return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0; 
	} 
	//结束线程池, 并同步等待 
	bool WaitFor(DWORD dwWaitTime = INFINITE) 
	{ 
		SetEvent(_EventEnd); 
		return WaitForSingleObject(_EventComplete, dwWaitTime) == WAIT_OBJECT_0; 
	} 
	//结束线程池 
	inline void End() 
	{ 
		SetEvent(_EventEnd); 
	} 
	inline DWORD Size() 
	{ 
		return (DWORD)_lThreadNum; 
	} 
	inline DWORD GetRunningSize() 
	{ 
		return (DWORD)_lRunningNum; 
	} 
	bool IsRunning() 
	{ 
		return _lRunningNum > 0; 
	} 
 
protected: 
 
	//工作线程 
	static DWORD WINAPI DefaultJobProc(LPVOID lpParameter = NULL) 
	{ 
		ThreadItem *pThread = static_cast(lpParameter); 
		assert(pThread); 
 
		cxThreadPool *pThreadPoolObj = pThread->_pThis; 
		assert(pThreadPoolObj); 
 
		InterlockedIncrement(&pThreadPoolObj->_lThreadNum); 
 
		HANDLE hWaitHandle[3]; 
		hWaitHandle[0] = pThreadPoolObj->_SemaphoreCall; 
		hWaitHandle[1] = pThreadPoolObj->_SemaphoreDel; 
		hWaitHandle[2] = pThreadPoolObj->_EventEnd; 
 
		JobItem *pJob; 
		bool fHasJob; 
		 
		for(;;) 
		{ 
			DWORD wr = WaitForMultipleObjects(3, hWaitHandle, false, INFINITE); 
 
			//响应删除线程信号 
			if(wr == WAIT_OBJECT_0 + 1)	  
				break; 
			 
			//从队列里取得用户作业 
			EnterCriticalSection(&pThreadPoolObj->_csWorkQueue); 
			if(fHasJob = !pThreadPoolObj->_JobQueue.empty()) 
			{ 
				pJob = pThreadPoolObj->_JobQueue.front(); 
				pThreadPoolObj->_JobQueue.pop(); 
				assert(pJob); 
			} 
			LeaveCriticalSection(&pThreadPoolObj->_csWorkQueue); 
 
			//受到结束线程信号 确定是否结束线程(结束线程信号 && 是否还有工作) 
			if(wr == WAIT_OBJECT_0 + 2 && !fHasJob)	  
				break; 
 
			if(fHasJob && pJob) 
			{ 
				InterlockedIncrement(&pThreadPoolObj->_lRunningNum); 
				pThread->_dwLastBeginTime = GetTickCount(); 
				pThread->_dwCount++; 
				pThread->_fIsRunning = true; 
				pJob->_pFunc(pJob->_pPara); //运行用户作业 
				delete pJob;  
				pThread->_fIsRunning = false; 
				InterlockedDecrement(&pThreadPoolObj->_lRunningNum); 
			} 
		} 
 
		//删除自身结构 
		EnterCriticalSection(&pThreadPoolObj->_csThreadVector); 
		pThreadPoolObj->_ThreadVector.erase(find(pThreadPoolObj->_ThreadVector.begin(), pThreadPoolObj->_ThreadVector.end(), pThread)); 
		LeaveCriticalSection(&pThreadPoolObj->_csThreadVector); 
 
		delete pThread; 
 
		InterlockedDecrement(&pThreadPoolObj->_lThreadNum); 
 
		if(!pThreadPoolObj->_lThreadNum)  //所有线程结束 
			SetEvent(pThreadPoolObj->_EventComplete); 
 
		return 0; 
	} 
	//调用用户对象虚函数 
	static void CallProc(void *pPara)  
	{ 
		CallProcPara *cp = static_cast(pPara); 
		assert(cp); 
		if(cp) 
		{ 
			cp->_pObj->DoJob(cp->_pPara); 
			delete cp; 
		} 
	} 
	//用户对象结构 
	struct CallProcPara   
	{ 
		cxThreadJob* _pObj;//用户对象  
		void *_pPara;//用户参数 
		CallProcPara(cxThreadJob* p, void *pPara) : _pObj(p), _pPara(pPara) { }; 
	}; 
	//用户函数结构 
	struct JobItem  
	{ 
		void (*_pFunc)(void  *);//函数 
		void *_pPara; //参数 
		JobItem(void (*pFunc)(void  *) = NULL, void *pPara = NULL) : _pFunc(pFunc), _pPara(pPara) { }; 
	}; 
	//线程池中的线程结构 
	struct ThreadItem 
	{ 
		HANDLE _Handle;	//线程句柄 
		cxThreadPool *_pThis;	 //线程池的指针 
		DWORD _dwLastBeginTime; //最后一次运行开始时间 
		DWORD _dwCount; //运行次数 
		bool _fIsRunning; 
		ThreadItem(cxThreadPool *pthis) : _pThis(pthis), _Handle(NULL), _dwLastBeginTime(0), _dwCount(0), _fIsRunning(false) { }; 
		~ThreadItem() 
		{ 
			if(_Handle) 
			{ 
				CloseHandle(_Handle); 
				_Handle = NULL; 
			} 
		} 
	}; 
	 
	std::queue _JobQueue;	 //工作队列 
	std::vector  _ThreadVector; //线程数据 
 
	CRITICAL_SECTION _csThreadVector, _csWorkQueue; //工作队列临界, 线程数据临界 
 
	HANDLE	_EventEnd, _EventComplete, _SemaphoreCall, _SemaphoreDel;//结束通知, 完成事件, 工作信号, 删除线程信号 
	long _lThreadNum, _lRunningNum; //线程数, 运行的线程数 
 
}; 
 
#endif //_CX_THREADPOOL_H_