www.pudn.com > LF_ThreadPool.rar > LF_ThreadPool.cpp, change:2005-09-22,size:7434b
// LF_ThreadPool.cpp,v 1.5 2005/02/15 19:34:11 olli Exp
#include "ace/config-lite.h"
#if defined (ACE_HAS_THREADS)
#include "ace/OS_NS_string.h"
#include "ace/OS_NS_sys_time.h"
#include "ace/Task.h"
#include "ace/Containers.h"
#include "ace/Synch.h"
// Listing 4 code/ch16
class Follower
{
public:
Follower (ACE_Thread_Mutex &leader_lock)
: cond_(leader_lock)
{
owner_ = ACE_Thread::self ();
}
int wait (void)
{
ACE_DEBUG ((LM_ERROR,ACE_TEXT ("(%d) begin wait\n"),this->owner ()));
return this->cond_.wait ();
}
int signal (void)
{
ACE_DEBUG ((LM_ERROR,ACE_TEXT ("(%d) end wait\n"),this->owner ()));
return this->cond_.signal ();
}
ACE_thread_t owner (void)
{
return this->owner_;
}
private:
ACE_Condition<ACE_Thread_Mutex> cond_;
ACE_thread_t owner_;
};
// Listing 4
// Listing 1 code/ch16
class LF_ThreadPool : public ACE_Task<ACE_MT_SYNCH>
{
public:
LF_ThreadPool () : shutdown_(0), current_leader_(0)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::TP"));
}
virtual int svc (void);
void shut_down (void)
{
shutdown_ = 1;
}
private:
int check_leader (void); //become_leader
Follower *make_follower (void);
int elect_new_leader (void);
int leader_active (void)
{
return this->current_leader_ != 0;
}
void leader_active (ACE_thread_t leader)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::leader_active"));
this->current_leader_ = leader;
}
bool isLeader() //check if this thread is leader
{
return this->current_leader_ == ACE_Thread::self();
}
void process_message (ACE_Message_Block *mb);
int done (void)
{
return (shutdown_ == 1);
}
private:
int shutdown_;
ACE_thread_t current_leader_;
ACE_Thread_Mutex leader_lock_;
ACE_Unbounded_Queue<Follower*> followers_;
ACE_Thread_Mutex followers_lock_;
static long LONG_TIME;
};
// Listing 1
// Listing 2 code/ch16
int
LF_ThreadPool::svc (void)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::svc"));
while (!done())
{
check_leader (); // Block until this thread is the leader.
ACE_Message_Block *mb = 0;
ACE_Time_Value tv (LONG_TIME);
tv += ACE_OS::gettimeofday ();
//Get a message, elect new leader, then process message.
if (this->getq (mb, &tv) 0)
{
if (elect_new_leader () == 0){
//ACE_DEBUG( (LM_ERROR,"(%t) warning: elect_new_leader () == 0\n") );
//break;
}
}
else{
elect_new_leader ();
process_message (mb);
}
}
ACE_DEBUG( (LM_ERROR,"(%t) stopping... \n") );
return 0;
}
// Listing 2
// Listing 3 code/ch16
int
LF_ThreadPool::check_leader (void)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::check_leader"));
ACE_GUARD_RETURN
(ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
if(leader_active() == 0){ //if the pool starts from zero
leader_active (ACE_Thread::self ()); // Mark yourself as the active leader.and pass check.
}
else if (!isLeader()) //if some Leader exist,just wait
{
Follower *fw = make_follower ();
{
// Wait until told to do so.
while (!isLeader())
fw->wait (); //进入wait前先退出leader_lock_,而wait结束前先申请leader_lock_ (chinese comment)
}
delete fw;
}
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Becoming the leader\n")));
return 0;
}
Follower*
LF_ThreadPool::make_follower (void)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::make_follower"));
ACE_GUARD_RETURN
(ACE_Thread_Mutex, follower_mon, this->followers_lock_, 0);
Follower *fw;
ACE_NEW_RETURN (fw, Follower (this->leader_lock_), 0);
this->followers_.enqueue_tail (fw);
//ACE_DEBUG( (LM_ERROR,"(%t)make_follower后:current followers :(%d)\n",followers_.size()) );
return fw;
}
// Listing 3
// Listing 5 code/ch16
int
LF_ThreadPool::elect_new_leader (void)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::elect_new_leader"));
ACE_GUARD_RETURN
(ACE_Thread_Mutex, leader_mon, this->leader_lock_, -1);
// leader_active (0); //never used again
// Wake up a follower
if (!followers_.is_empty ())
{
ACE_GUARD_RETURN (ACE_Thread_Mutex,
follower_mon,
this->followers_lock_,
-1);
// Get the old follower.
Follower *fw;
if (this->followers_.dequeue_head (fw) != 0){
return -1;
}
//ACE_DEBUG( (LM_ERROR,"(%t)elect_new_leader 后:current followers :(%d)\n",followers_.size()) );
ACE_DEBUG ((LM_ERROR,
ACE_TEXT ("(%t) Resigning and Electing %d\n"),
fw->owner ()));
//-----become leader right now!
//-----maybe,do a atom action will be better? i don't know.
leader_active (fw->owner());
return( (0 == fw->signal ()) ? 0 : -1);
}
else
{
ACE_DEBUG
((LM_ERROR, ACE_TEXT ("(%t) Oops no followers left\n")));
//标志为“没有领导者”,这样任何先返回的线程都可成为领导者
leader_active(0);
return -1;
}
}
// Listing 5
void
LF_ThreadPool::process_message (ACE_Message_Block *mb)
{
ACE_TRACE (ACE_TEXT ("LF_ThreadPool::process_message"));
int msgId;
ACE_OS::memcpy (&msgId, mb->rd_ptr (), sizeof(int));
mb->release ();
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Started processing message:%d\n"),
msgId));
ACE_Time_Value tv;
tv.msec(1000);
ACE_OS::sleep(tv);
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Finished processing message:%d\n"),
msgId));
}
long LF_ThreadPool::LONG_TIME = 5L;
int ACE_TMAIN (int, ACE_TCHAR *[])
{
LF_ThreadPool tp;
tp.activate (THR_NEW_LWP| THR_JOINABLE, 5);
// Wait for a few seconds...
ACE_OS::sleep (2);
ACE_Time_Value tv;
tv.msec(99);
ACE_Message_Block *mb;
//注意程序不能保证消息是被顺序处理。
for (int i = 0; i 20; i++)
{
ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
ACE_OS::sleep (tv);
// Add a new work item.
tp.putq (mb);
}
ACE_OS::sleep (10);
for (int i = 0; i 100; i++)
{
ACE_NEW_RETURN (mb, ACE_Message_Block (sizeof(int)), -1);
ACE_OS::memcpy (mb->wr_ptr (), &i, sizeof(int));
ACE_OS::sleep (tv);
// Add a new work item.
tp.putq (mb);
}
//粗暴关闭:立即关闭线程池,而不管有无剩余的消息。
//若需优雅的关闭,可在消息队列中加入某种特殊消息,当任何线程识别到该消息时,调用this->shut_down()。
tp.shut_down();
ACE_OS::sleep (10);
ACE_Thread_Manager::instance ()->wait ();
return 0;
}
#if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
template class ACE_Condition<ACE_Thread_Mutex>;
template class ACE_Node<Follower*>;
template class ACE_Unbounded_Queue<Follower*>;
template class ACE_Unbounded_Queue_Iterator<Follower*>;
#elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
#pragma instantiate ACE_Condition<ACE_Thread_Mutex>
#pragma instantiate ACE_Node<Follower*>
#pragma instantiate ACE_Unbounded_Queue<Follower*>
#pragma instantiate ACE_Unbounded_Queue_Iterator<Follower*>
#endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
#else
#include "ace/OS_main.h"
#include "ace/OS_NS_stdio.h"
int ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_OS::puts (ACE_TEXT ("This example requires threads."));
return 0;
}
#endif /* ACE_HAS_THREADS */