www.pudn.com > ThreadLibrary.zip > ThreadPool.cpp
/** * @file */ #include#include #include "ThreadPool.h" #include "ThreadRequest.h" #include using std::for_each; namespace { // Any other data that we wish to transfer to the thread for performance monitoring or whatever can // be added to this structure later struct ThreadData { ThreadPool *threadPool; ///< Pointer to this object unsigned int threadId; ///< Id of the thread }; /** * SuicideException used by the suicide pill to break out of the accept loop */ class SuicideException { }; /** * SuicidePill is a do nothing request that forces the thread to loop through once, checking * the alive flag, before shutting itself down cleanly. * It performs this by throwing a ThreadPoolShutdownException which actually terminates */ class SuicidePill : public ThreadRequest { public: void operator()(int) { throw SuicideException(); // Terminate this thread cleanly } }; } /** * Initialize the thread pool. * @param threadSize Number of threads to initialize the pool to. * @param queueSize Size of the queue before pool will stop entries. */ ThreadPool::ThreadPool(int threadSize, int queueSize) : queueGuard(), notFull(), queueAccess(queueSize, 0), alive(false), threads(threadSize) { // Initialize the queue notFull.signal(); // Queue is not full pool.reserve(threadSize); } /// Static function to fire off all of the accept handlers for each thread unsigned int __stdcall ThreadPool::internalThreadProc(void* lpParam) { // This is ALL running in a new thread... ThreadData *data = static_cast (lpParam); // convert the lpParam back to the data structure try { data->threadPool->acceptHandler(data->threadId); // And start this threads accept handler. } catch(...) { } delete data; _endthreadex(0); return 0; } /** * Internal function to handle the thread requests * @param threadId Id of the thread - debugging purposes only really * @warning This will NOT propagate exceptions out of the function. Exceptions must be caught * by the command object placed in the queue. */ void ThreadPool::acceptHandler(unsigned int threadId) throw() { onThreadStart(threadId); for(;;) { // Ensure that only one thread is actually waiting on the notEmpty signal at a time. // Eliminates race conditions to get at the queue ::WaitForSingleObject(queueAccess, INFINITE); try { queueGuard.enter(); if(!jobQueue.empty()) { std::auto_ptr request(jobQueue.front()); // Autopointer takes ownership of request jobQueue.pop(); notFull.signal(); // Ensure that it's not full, coz we just took something out! queueGuard.leave(); try { request->operator()(threadId); // Now do your stuff!!! } catch(SuicideException&) // Occurs if we've swallowed the suicide pill { break; // exit the acceptHandler loop to terminate thread } // The following exception should NEVER occur because the request->operator()() should NOT // be propogating requests. catch(...) { // Just swallow the exception and continue on. } } else { queueGuard.leave(); } } catch(...) { queueGuard.leave(); // Release the accessor and continue on. } } // for(;;) onThreadFinish(threadId); } /** * @name Thread Start Finish functions * ThreadSpecific Thread initialization and termination functions. * Provide thread specific initialization for EACH thread. * Override this function to provide thread specific initialization. For instance, to initialize a COM component * from each thread you would derive a new ThreadPool, and override onThreadStart() with a ::CoInitialize(NULL) and * override onThreadFinish() with a ::CoUninitialize() */ //@{ /** * Thread Start function. * Derived classes should override this to perform initialization for EACH thread. * @param threadId ThreadID for this threadstart routine. Useful for debugging. * @throw NOTHING Note that this function is declared as throwing no exceptions. You MUST handle the derived * function exceptions yourself. */ void ThreadPool::onThreadStart(int) throw() { } /** * Thread Finish function. * Derived classes should override this to cleanup any thread stuff that may have been initialized in onThreadStart() * @param threadId ThreadID for this threadstart routine. Useful for debugging. * @throw NOTHING Note that this function is declared as throwing no exceptions. You MUST handle the derived * function exceptions yourself. */ void ThreadPool::onThreadFinish(int) throw() { } //@} /** * Initialize all of the threads and then set them into action... * @return number of threads started */ int ThreadPool::accept() { if(!alive) // Cannot accept twice! { for(int i = 0 ; i < threads ; i++) { try { void *handle = NULL; ThreadData *data = new ThreadData; // Deleted on thread destruction in internalThreadProc assert(data); // ensure we've allocated it. data->threadPool = this; handle = reinterpret_cast (_beginthreadex(NULL, 0, internalThreadProc, data, CREATE_SUSPENDED, &(data->threadId))); if(handle != NULL) // Ignore threads that aren't created pool.push_back(handle); // and push the others onto the pool. } catch(...) { assert(false); // code shouldn't reach here! // I think we can ignore this. If the thread doesn't get created, we go on to the next. // this should be a pretty damn rare experience, and one that needs to be tested // and investigated further in case it does happen. } } for_each(pool.begin(), pool.end(), ::ResumeThread); alive=true; } return static_cast (pool.size()); } /** * Adds a request onto the queue for servicing by the thread pool. * @param request Pointer to an object derived from thread request, overriding operator() * @exception ThreadPoolShutdownException Thrown if the threadpool is closing down */ void ThreadPool::submitRequest(ThreadRequest *request) { if(!alive) throw ThreadPoolShutdownException(); ::WaitForSingleObject(notFull, INFINITE); try { queueGuard.enter(); // Need to ensure that the queue is locked for this - STL queue isn't threadsafe jobQueue.push(request); if(jobQueue.size() >= MAX_QUEUE_SIZE) notFull.reset(); // Clear the notfull signal to block the thread for future insertions queueAccess.release(); // Release 1 slot to allow a thread in to service the queue queueGuard.leave(); } catch(std::exception&) { queueGuard.leave(); // Must release the exception first... throw; // Then we can propogate it out. } } /** * Shutsdown all of the threads. * Forces each thread to swallow a suicide pill to terminate itself. Once the queue has * enough suicide pills for the number of active threads, the queue is marked as * no longer servicing requests. * Upon completion of the suicide pill, shutdown blocks for 10 seconds to ensure that all * threads have been terminated, and then goes and closes the handles. */ void ThreadPool::shutdown() { for(std::vector ::const_iterator j = pool.begin() ; j != pool.end() ; ++j) this->submitRequest(new SuicidePill()); // Feed thread a suicide pill alive = false; // Stop the queue receiving for(std::vector ::const_iterator i = pool.begin() ; i != pool.end() ; ++i) { switch(::WaitForSingleObject(*i, 10000)) { case WAIT_OBJECT_0: // This is what we WANT to happen. break; case WAIT_TIMEOUT: // Something bad happened - force the termination ::TerminateThread(*i, 1); break; case WAIT_ABANDONED: // The thread has been terminated already!? maybe... break; } ::CloseHandle(*i); } } ThreadPool::~ThreadPool(void) { }