www.pudn.com > LF_ThreadPool.rar > LF_ThreadPool.cpp, change:2005-09-22,size:7434b


// LF_ThreadPool.cpp,v 1.5 2005/02/15 19:34:11 olli Exp 
 
#include "ace/config-lite.h" 
#if defined (ACE_HAS_THREADS) 
 
#include "ace/OS_NS_string.h" 
#include "ace/OS_NS_sys_time.h" 
#include "ace/Task.h" 
#include "ace/Containers.h" 
#include "ace/Synch.h" 
 
// Listing 4 code/ch16 
class Follower 
{ 
public: 
  Follower (ACE_Thread_Mutex &leader_lock) 
    : cond_(leader_lock) 
  { 
    owner_ = ACE_Thread::self (); 
  } 
 
  int wait (void) 
  { 
	ACE_DEBUG ((LM_ERROR,ACE_TEXT ("(%d) begin wait\n"),this->owner ())); 
    return this->cond_.wait (); 
  } 
 
  int signal (void) 
  { 
	ACE_DEBUG ((LM_ERROR,ACE_TEXT ("(%d) end wait\n"),this->owner ())); 
    return this->cond_.signal (); 
  } 
 
  ACE_thread_t owner (void) 
  { 
    return this->owner_; 
  } 
 
private: 
  ACE_Condition<ACE_Thread_Mutex> cond_; 
  ACE_thread_t owner_; 
}; 
 
// Listing 4 
// Listing 1 code/ch16 
class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH> 
{ 
public: 
  LF_ThreadPool () : shutdown_(0), current_leader_(0) 
  { 
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP")); 
  } 
 
  virtual int svc (void); 
 
  void shut_down (void) 
  { 
    shutdown_ = 1; 
  } 
 
private: 
  int check_leader (void);                                //become_leader 
  Follower *make_follower (void); 
 
  int elect_new_leader (void); 
 
  int leader_active (void) 
  { 
	  return this->current_leader_ != 0; 
  } 
 
  void leader_active (ACE_thread_t leader) 
  { 
    ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active")); 
    this->current_leader_ = leader; 
  } 
 
  bool isLeader()                                          //check if this thread is leader 
  { 
	  return this->current_leader_ == ACE_Thread::self(); 
  } 
 
  void process_message (ACE_Message_Block *mb); 
 
  int done (void) 
  { 
    return (shutdown_ == 1); 
  } 
 
private: 
  int shutdown_; 
  ACE_thread_t current_leader_; 
  ACE_Thread_Mutex leader_lock_; 
  ACE_Unbounded_Queue<Follower*> followers_; 
  ACE_Thread_Mutex followers_lock_; 
  static long LONG_TIME; 
}; 
// Listing 1 
// Listing 2 code/ch16 
int 
LF_ThreadPool::svc (void) 
{ 
	ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc")); 
	while (!done()) 
	{ 
		check_leader ();  // Block until this thread is the leader. 
 
		ACE_Message_Block *mb = 0; 
		ACE_Time_Value tv (LONG_TIME); 
		tv += ACE_OS::gettimeofday (); 
 
		//Get a message, elect new leader, then process message. 
		if (this->getq (mb, &tv)  0) 
		{ 
			if (elect_new_leader () == 0){ 
				//ACE_DEBUG( (LM_ERROR,"(%t) warning: elect_new_leader () == 0\n") ); 
				//break; 
			} 
		} 
		else{ 
			elect_new_leader (); 
			process_message (mb); 
		} 
	} 
	ACE_DEBUG( (LM_ERROR,"(%t) stopping... \n") ); 
 
	return 0; 
} 
// Listing 2 
// Listing 3 code/ch16 
int 
LF_ThreadPool::check_leader (void) 
{ 
	ACE_TRACE (ACE_TEXT ("LF_ThreadPool::check_leader")); 
 
	ACE_GUARD_RETURN 
	(ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); 
 
	if(leader_active() == 0){                  //if the pool starts from zero		 
		leader_active (ACE_Thread::self ());   // Mark yourself as the active leader.and pass check. 
	} 
	else if (!isLeader())                      //if some Leader exist,just wait 
	{ 
		Follower *fw = make_follower (); 
		{ 
		// Wait until told to do so. 
		while (!isLeader())  
			fw->wait ();                       //进入wait前先退出leader_lock_,而wait结束前先申请leader_lock_	(chinese comment)	 
		} 
		delete fw; 
	} 
 
	ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n"))); 
 
	return 0; 
} 
 
Follower* 
LF_ThreadPool::make_follower (void) 
{ 
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower")); 
 
  ACE_GUARD_RETURN 
    (ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0); 
  Follower *fw; 
  ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0); 
  this->followers_.enqueue_tail (fw); 
  //ACE_DEBUG( (LM_ERROR,"(%t)make_follower后:current followers :(%d)\n",followers_.size()) ); 
  return fw; 
} 
 
// Listing 3 
// Listing 5 code/ch16 
int 
LF_ThreadPool::elect_new_leader (void) 
{ 
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader")); 
 
  ACE_GUARD_RETURN 
    (ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1); 
//  leader_active (0);              //never used again 
 
// Wake up a follower 
	if (!followers_.is_empty ()) 
	{ 
		ACE_GUARD_RETURN (ACE_Thread_Mutex, 
						follower_mon, 
						this->followers_lock_, 
						-1); 
		// Get the old follower. 
		Follower *fw; 
		if (this->followers_.dequeue_head (fw) != 0){ 
			return -1; 
		} 
		//ACE_DEBUG( (LM_ERROR,"(%t)elect_new_leader 后:current followers :(%d)\n",followers_.size()) ); 
 
		ACE_DEBUG ((LM_ERROR, 
					ACE_TEXT ("(%t) Resigning and Electing %d\n"), 
					fw->owner ())); 
		 
		//-----become leader right now!  
		//-----maybe,do a atom action will be better? i don't know. 
		leader_active (fw->owner()); 
 
		return( (0 == fw->signal ()) ? 0 : -1); 
	} 
	else 
	{ 
		ACE_DEBUG 
		((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n")));  
 
		//标志为“没有领导者”,这样任何先返回的线程都可成为领导者 
		leader_active(0);  
 
		return -1; 
	} 
} 
// Listing 5 
 
void 
LF_ThreadPool::process_message (ACE_Message_Block *mb) 
{ 
  ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message")); 
  int msgId; 
  ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int)); 
  mb->release (); 
 
  ACE_DEBUG ((LM_DEBUG, 
              ACE_TEXT ("(%t) Started processing message:%d\n"), 
              msgId)); 
 
	ACE_Time_Value tv; 
	tv.msec(1000); 
	ACE_OS::sleep(tv); 
   
  ACE_DEBUG ((LM_DEBUG, 
              ACE_TEXT ("(%t) Finished processing message:%d\n"), 
              msgId)); 
} 
 
long LF_ThreadPool::LONG_TIME = 5L; 
 
int ACE_TMAIN (int, ACE_TCHAR *[]) 
{ 
	LF_ThreadPool tp; 
	tp.activate (THR_NEW_LWP| THR_JOINABLE, 5); 
 
	// Wait for a few seconds... 
	ACE_OS::sleep (2); 
	ACE_Time_Value tv; 
	tv.msec(99); 
 
 
	ACE_Message_Block *mb; 
 
	//注意程序不能保证消息是被顺序处理。 
	for (int i = 0; i  20; i++) 
	{ 
		ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1); 
		ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); 
		ACE_OS::sleep (tv); 
 
		// Add a new work item. 
		tp.putq (mb); 
	} 
 
	ACE_OS::sleep (10); 
 
	for (int i = 0; i  100; i++) 
	{ 
		ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1); 
		ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int)); 
		ACE_OS::sleep (tv); 
 
		// Add a new work item. 
		tp.putq (mb); 
	} 
 
	//粗暴关闭:立即关闭线程池,而不管有无剩余的消息。 
	//若需优雅的关闭,可在消息队列中加入某种特殊消息,当任何线程识别到该消息时,调用this->shut_down()。 
	tp.shut_down(); 
 
	ACE_OS::sleep (10); 
	 
	ACE_Thread_Manager::instance ()->wait (); 
 
  return 0; 
} 
 
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION) 
template class ACE_Condition<ACE_Thread_Mutex>; 
template class ACE_Node<Follower*>; 
template class ACE_Unbounded_Queue<Follower*>; 
template class ACE_Unbounded_Queue_Iterator<Follower*>; 
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA) 
#pragma instantiate ACE_Condition<ACE_Thread_Mutex> 
#pragma instantiate ACE_Node<Follower*> 
#pragma instantiate ACE_Unbounded_Queue<Follower*> 
#pragma instantiate ACE_Unbounded_Queue_Iterator<Follower*> 
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */ 
 
#else 
#include "ace/OS_main.h" 
#include "ace/OS_NS_stdio.h" 
 
int ACE_TMAIN (int, ACE_TCHAR *[]) 
{ 
  ACE_OS::puts (ACE_TEXT ("This example requires threads.")); 
  return 0; 
} 
 
#endif /* ACE_HAS_THREADS */