www.pudn.com > ACEProactor.rar > proactortask.cpp, change:2006-01-23,size:4435b


// ProactorTask.cpp: implementation of the ProactorTask class. 
// 
////////////////////////////////////////////////////////////////////// 
 
#include "ProactorTask.h" 
#include <ace/WIN32_Proactor.h> 
 
////////////////////////////////////////////////////////////////////// 
// Construction/Destruction 
////////////////////////////////////////////////////////////////////// 
int disable_signal (int sigmin, int sigmax) 
{ 
#ifndef ACE_WIN32 
	 
	sigset_t signal_set; 
	if (sigemptyset (&signal_set) == - 1) 
		ACE_ERROR ((LM_ERROR, 
		ACE_TEXT ("Error: (%P|%t):%p\n"), 
		ACE_TEXT ("sigemptyset failed"))); 
	 
	for (int i = sigmin; i = sigmax; i++) 
		sigaddset (&signal_set, i); 
	 
	//  Put the <signal_set>. 
	if (ACE_OS::pthread_sigmask (SIG_BLOCK, &signal_set, 0) != 0) 
		ACE_ERROR ((LM_ERROR, 
		ACE_TEXT ("Error: (%P|%t):%p\n"), 
		ACE_TEXT ("pthread_sigmask failed"))); 
#else 
	ACE_UNUSED_ARG (sigmin); 
	ACE_UNUSED_ARG (sigmax); 
#endif /* ACE_WIN32 */ 
	 
	return 1; 
} 
 
int ProactorTask::delete_proactor () 
{ 
	ACE_GUARD_RETURN ( ACE_SYNCH_RECURSIVE_MUTEX , monitor, this->lock_, -1); 
	 
	ACE_DEBUG ((LM_DEBUG, 
				ACE_TEXT ("(%t) Delete Proactor\n"))); 
	 
	ACE_Proactor::close_singleton (); 
	this->proactor_ = 0; 
	 
	return 0; 
} 
 
int ProactorTask::stop () 
{ 
	if (this->proactor_ != 0) 
	{ 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT ("(%t) Calling End Proactor event loop\n"))); 
		 
		ACE_Proactor::end_event_loop (); 
	} 
 
	if (this->wait () == -1) 
	ACE_ERROR ((LM_ERROR, 
			   ACE_TEXT ("%p.\n"), 
			   ACE_TEXT ("unable to stop thread pool"))); 
 
	return 0; 
} 
 
int ProactorTask::create_proactor (ProactorType type_proactor, size_t max_op) 
{ 
	ACE_GUARD_RETURN (ACE_SYNCH_RECURSIVE_MUTEX, 
		monitor, 
		this->lock_, 
		-1); 
	 
	ACE_ASSERT (this->proactor_ == 0); 
	 
#if defined (ACE_WIN32) && !defined (ACE_HAS_WINCE) 
	 
	ACE_UNUSED_ARG (type_proactor); 
	ACE_UNUSED_ARG (max_op); 
	 
	ACE_WIN32_Proactor *proactor_impl = 0; 
	 
	ACE_NEW_RETURN (proactor_impl, 
		ACE_WIN32_Proactor, 
		-1); 
	 
	ACE_DEBUG ((LM_DEBUG, 
		ACE_TEXT("(%t) Create Proactor Type = WIN32\n"))); 
	 
#elif defined (ACE_HAS_AIO_CALLS) 
	 
	ACE_POSIX_Proactor * proactor_impl = 0; 
	 
	switch (type_proactor) 
    { 
    case AIOCB: 
		ACE_NEW_RETURN (proactor_impl, 
			ACE_POSIX_AIOCB_Proactor (max_op), 
			-1); 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT ("(%t) Create Proactor Type = AIOCB\n"))); 
		break; 
		 
#if defined(ACE_HAS_POSIX_REALTIME_SIGNALS) 
    case SIG: 
		ACE_NEW_RETURN (proactor_impl, 
			ACE_POSIX_SIG_Proactor (max_op), 
			-1); 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT ("(%t) Create Proactor Type = SIG\n"))); 
		break; 
#endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */ 
		 
#  if defined (sun) 
    case SUN: 
		ACE_NEW_RETURN (proactor_impl, 
			ACE_SUN_Proactor (max_op), 
			-1); 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT("(%t) Create Proactor Type = SUN\n"))); 
		break; 
#  endif /* sun */ 
		 
#  if defined (__sgi) 
    case CB: 
		ACE_NEW_RETURN (proactor_impl, 
			ACE_POSIX_CB_Proactor (max_op), 
			-1); 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT ("(%t) Create Proactor Type = CB\n"))); 
		break; 
#  endif 
		 
    default: 
		ACE_DEBUG ((LM_DEBUG, 
			ACE_TEXT ("(%t) Create Proactor Type = DEFAULT\n"))); 
		break; 
	} 
	 
#endif // (ACE_WIN32) && !defined (ACE_HAS_WINCE) 
	 
	// always delete implementation  1 , not  !(proactor_impl == 0) 
	ACE_NEW_RETURN (this->proactor_, 
		ACE_Proactor (proactor_impl, 1 ), 
		-1); 
	// Set new singleton and delete it in close_singleton() 
	ACE_Proactor::instance (this->proactor_, 1); 
	return 0; 
} 
 
 
int ProactorTask::start (int num_threads, ProactorType type_proactor, size_t max_op) 
{ 
	if (this->create_proactor (type_proactor, max_op) == -1) 
		ACE_ERROR_RETURN ((LM_ERROR, 
		ACE_TEXT ("%p.\n"), 
		ACE_TEXT ("unable to create proactor")), 
		-1); 
	 
	if (this->activate (THR_NEW_LWP, num_threads) == -1) 
		ACE_ERROR_RETURN ((LM_ERROR, 
		ACE_TEXT ("%p.\n"), 
		ACE_TEXT ("unable to activate thread pool")), 
		-1); 
	 
	for (; num_threads > 0; num_threads--) 
    { 
		sem_.acquire (); 
    } 
	 
	return 0; 
} 
 
 
int ProactorTask::svc (void) 
{ 
	ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) ProactorTask started\n"))); 
	 
	disable_signal (ACE_SIGRTMIN, ACE_SIGRTMAX); 
	 
	// signal that we are ready 
	sem_.release (1); 
	 
	ACE_Proactor::run_event_loop (); 
	 
	ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) ProactorTask finished\n"))); 
	return 0; 
}