www.pudn.com > ThreadPoolcw.rar > ThreadPool.cpp
#include "ThreadPool.h" #include#include "..\CommonInc\SSLog.h" static TCHAR * LogType = _TEXT("ThreadPool"); static CThreadPool * gpThreadPool = NULL; using namespace std; /*++ CThreadTask Impl Begin ================================================================== --*/ _TP_DLL_EXPORT CThreadTask::CThreadTask(void){ } _TP_DLL_EXPORT CThreadTask::~CThreadTask(void){ } _TP_DLL_EXPORT HRESULT CThreadTask::Begin(){ if (gpThreadPool) { gpThreadPool->AddTask(this); return 0; }else{ return 1; } } _TP_DLL_EXPORT HRESULT CThreadTask::Run(){ return 0; } _TP_DLL_EXPORT HRESULT CThreadTask::OnEnd(){ return 0; } /*++ CThreadTask Impl End ======================================================================= --*/ _TP_DLL_EXPORT CThreadPool::CThreadPool(void){ if (gpThreadPool) { assert(0 || "Fatal Error Only Single Instance Of CThreadPool Should Exists"); }else{ gpThreadPool = this; } InitializeCriticalSection(&csLock); for(int i = 0;i < eMax;i++){ store[i].dwThreadId = store[i].dwExitCode = 0; store[i].hThread = NULL; store[i].bRun = FALSE; } total = run = 0; hRunning = hQuit = NULL; } _TP_DLL_EXPORT CThreadPool::~CThreadPool(void){ Release(); DeleteCriticalSection(&csLock); if (gpThreadPool) { gpThreadPool = NULL; }else{ assert(0 || "Fatal Error ThreadPool Got Null Before"); } return ; } _TP_DLL_EXPORT HRESULT CThreadPool::Init(UINT ThreadNum){ if (ThreadNum <= 0) { return 1; } if (run > 0) { return 1; } if (ThreadNum > eMax) { return 1; } total = ThreadNum; if (!hRunning) { hRunning = CreateEvent(NULL,TRUE,FALSE,NULL); if (!hRunning) { return 1; } } if (!hQuit) { hQuit = CreateEvent(NULL,TRUE,FALSE,NULL); if (!hRunning) { return 1; } } for (int i = 0;i < total;i++) { BOOL bFail = TRUE; for (int j = 0;j < eMax;j++) { if (!store[j].hThread) { store[j].bRun = FALSE; store[j].hThread = CreateThread(NULL,0,WorkThread_Proc,(LPVOID)j,0,&store[j].dwThreadId); if (store[j].hThread) { bFail = FALSE; } break; } } if (bFail) { return 1; } } return 0; } _TP_DLL_EXPORT HRESULT CThreadPool::Release(){ if (hQuit) { SetEvent(hQuit); } HANDLE hWait[eMax]; int cnt = 0; for (cnt = 0;cnt < eMax;cnt++) { hWait[cnt] = NULL; } cnt = 0; for (int i = 0;i < eMax;i++) { if (store[i].hThread) { hWait[cnt++] = store[i].hThread; } } if (cnt > 0) { WaitForMultipleObjects(cnt,hWait,TRUE,INFINITE); Sleep(0); } for (int i = 0;i < eMax;i++) { if (store[i].hThread) { TerminateThread(store[i].hThread,0); CloseHandle(store[i].hThread); store[i].hThread = NULL; } } if (hRunning) { CloseHandle(hRunning); hRunning = NULL; } if (hQuit) { CloseHandle(hQuit); hQuit = NULL; } task.clear(); return 0; } CThreadTask * CThreadPool::GetNext(){ CThreadTask * pTask = NULL; Lock(); if (task.size()) { pTask = task.front(); task.pop_front(); run++; } UnLock(); return pTask; } void CThreadPool::AddTask(CThreadTask * pTask){ if (!pTask) { return ; } int cnt = 0; for (int i = 0;i < eMax;i++) { if (store[i].bRun) { cnt++; } } Lock(); task.push_back(pTask); if (cnt < total) { UnLock(); SetEvent(hRunning); }else{ UnLock(); } return ; } ThreadRecord * CThreadPool::GetThread(int idx){ if (idx < 0 || idx >= eMax) { return NULL; }else{ return &store[idx]; } } DWORD WINAPI CThreadPool::WorkThread_Proc(LPVOID lParam){ ThreadRecord * self = gpThreadPool->GetThread((int)lParam); HANDLE hEvent[2] = {gpThreadPool->GetRH(),gpThreadPool->GetQH()}; for (int i = 0;i < 2;i++) { if (!hEvent[i]) { goto End; } } if (!self) { goto End; } while (TRUE) { DWORD ret = WaitForMultipleObjects(2,hEvent,FALSE,INFINITE); if (ret == WAIT_FAILED) { break; } if (ret == WAIT_OBJECT_0 + 0) { while (CThreadTask * pTask = gpThreadPool->GetNext()) { self->bRun = TRUE; pTask->Run(); pTask->OnEnd(); self->bRun = FALSE; } ResetEvent(hEvent[0]); }else { break; } } End: ExitThread(0); return 0; } void CThreadPool::Lock(){ EnterCriticalSection(&csLock); return ; } void CThreadPool::UnLock(){ LeaveCriticalSection(&csLock); return ; }