www.pudn.com > ThreadPoolcw.rar > ThreadPool.cpp


#include "ThreadPool.h" 
#include  
 
#include "..\CommonInc\SSLog.h" 
 
static TCHAR * LogType = _TEXT("ThreadPool"); 
static CThreadPool * gpThreadPool = NULL; 
 
using namespace std; 
/*++ 
	CThreadTask Impl Begin ================================================================== 
 --*/ 
_TP_DLL_EXPORT CThreadTask::CThreadTask(void){ 
} 
 
_TP_DLL_EXPORT CThreadTask::~CThreadTask(void){ 
} 
 
_TP_DLL_EXPORT HRESULT CThreadTask::Begin(){ 
	if (gpThreadPool) { 
		gpThreadPool->AddTask(this); 
		return 0; 
	}else{ 
		return 1; 
	} 
} 
 
_TP_DLL_EXPORT HRESULT CThreadTask::Run(){ 
	return 0; 
} 
 
_TP_DLL_EXPORT HRESULT CThreadTask::OnEnd(){ 
	return 0; 
} 
/*++ 
	CThreadTask Impl End ======================================================================= 
 --*/ 
 
_TP_DLL_EXPORT CThreadPool::CThreadPool(void){ 
	if (gpThreadPool) { 
		assert(0 || "Fatal Error Only Single Instance Of CThreadPool Should Exists"); 
	}else{ 
		gpThreadPool = this; 
	} 
	InitializeCriticalSection(&csLock); 
 
	for(int i = 0;i < eMax;i++){ 
		store[i].dwThreadId = store[i].dwExitCode = 0; 
		store[i].hThread = NULL; 
		store[i].bRun = FALSE; 
	} 
	total = run = 0; 
	hRunning = hQuit = NULL; 
} 
 
_TP_DLL_EXPORT CThreadPool::~CThreadPool(void){ 
	Release(); 
 
	DeleteCriticalSection(&csLock); 
	if (gpThreadPool) { 
		gpThreadPool = NULL; 
	}else{ 
		assert(0 || "Fatal Error ThreadPool Got Null Before"); 
	} 
	return ; 
} 
 
_TP_DLL_EXPORT HRESULT CThreadPool::Init(UINT ThreadNum){ 
	if (ThreadNum <= 0) { 
		return 1; 
	} 
	if (run > 0) { 
		return 1; 
	} 
	if (ThreadNum > eMax) { 
		return 1; 
	} 
	total = ThreadNum; 
	if (!hRunning) { 
		hRunning = CreateEvent(NULL,TRUE,FALSE,NULL); 
		if (!hRunning) { 
			return 1; 
		} 
	} 
	if (!hQuit) { 
		hQuit = CreateEvent(NULL,TRUE,FALSE,NULL); 
		if (!hRunning) { 
			return 1; 
		} 
	} 
	for (int i = 0;i < total;i++) { 
		BOOL bFail = TRUE; 
		for (int j = 0;j < eMax;j++) { 
			if (!store[j].hThread) { 
				store[j].bRun = FALSE; 
				store[j].hThread = CreateThread(NULL,0,WorkThread_Proc,(LPVOID)j,0,&store[j].dwThreadId); 
				if (store[j].hThread) { 
					bFail = FALSE; 
				} 
				break; 
			} 
		} 
		if (bFail) { 
			return 1; 
		} 
	} 
	return 0; 
} 
 
_TP_DLL_EXPORT HRESULT CThreadPool::Release(){ 
	if (hQuit) { 
		SetEvent(hQuit); 
	} 
	HANDLE hWait[eMax]; 
	int cnt = 0; 
	for (cnt = 0;cnt < eMax;cnt++) { 
		hWait[cnt] = NULL; 
	} 
	cnt = 0; 
	for (int i = 0;i < eMax;i++) { 
		if (store[i].hThread) { 
			hWait[cnt++] = store[i].hThread; 
		} 
	} 
	if (cnt > 0) { 
		WaitForMultipleObjects(cnt,hWait,TRUE,INFINITE); 
		Sleep(0); 
	} 
	 
	for (int i = 0;i < eMax;i++) { 
		if (store[i].hThread) { 
			TerminateThread(store[i].hThread,0); 
			CloseHandle(store[i].hThread); 
			store[i].hThread = NULL; 
		} 
	} 
	if (hRunning) { 
		CloseHandle(hRunning); 
		hRunning = NULL; 
	} 
	if (hQuit) { 
		CloseHandle(hQuit); 
		hQuit = NULL; 
	} 
	task.clear(); 
	return 0; 
} 
 
CThreadTask * CThreadPool::GetNext(){ 
	CThreadTask * pTask = NULL; 
	Lock(); 
	if (task.size()) { 
		pTask = task.front(); 
		task.pop_front(); 
		run++; 
	} 
	UnLock(); 
	return pTask; 
} 
 
void CThreadPool::AddTask(CThreadTask * pTask){ 
	if (!pTask) { 
		return ; 
	} 
 
	int cnt = 0; 
	for (int i = 0;i < eMax;i++) { 
		if (store[i].bRun) { 
			cnt++; 
		} 
	} 
 
	Lock(); 
	task.push_back(pTask); 
	if (cnt < total) { 
		UnLock(); 
		SetEvent(hRunning); 
	}else{ 
		UnLock(); 
	} 
	return ; 
} 
 
ThreadRecord * CThreadPool::GetThread(int idx){ 
	if (idx < 0 || idx >= eMax) { 
		return NULL; 
	}else{ 
		return &store[idx]; 
	} 
} 
 
DWORD WINAPI CThreadPool::WorkThread_Proc(LPVOID lParam){ 
	ThreadRecord * self = gpThreadPool->GetThread((int)lParam); 
	HANDLE hEvent[2] = {gpThreadPool->GetRH(),gpThreadPool->GetQH()}; 
	for (int i = 0;i < 2;i++) { 
		if (!hEvent[i]) { 
			goto End; 
		} 
	} 
	if (!self) { 
		goto End; 
	} 
 
	while (TRUE) { 
		DWORD ret = WaitForMultipleObjects(2,hEvent,FALSE,INFINITE); 
		if (ret == WAIT_FAILED) { 
			break; 
		} 
		if (ret == WAIT_OBJECT_0 + 0) { 
			while (CThreadTask * pTask = gpThreadPool->GetNext()) { 
				self->bRun = TRUE; 
				pTask->Run(); 
				pTask->OnEnd(); 
				self->bRun = FALSE; 
			} 
			ResetEvent(hEvent[0]); 
		}else { 
			break; 
		} 
	} 
 
End: 
	ExitThread(0); 
	return 0; 
} 
 
void CThreadPool::Lock(){ 
	EnterCriticalSection(&csLock); 
	return ; 
} 
void CThreadPool::UnLock(){ 
	LeaveCriticalSection(&csLock); 
	return ; 
}