www.pudn.com > ACEProactor.rar > receiver.cpp, change:2006-01-23,size:12809b
// Receiver.cpp: implementation of the Receiver class.
//
//////////////////////////////////////////////////////////////////////
#include "ace\Global_Macros.h"
#include "Receiver.h"
//////////////////////////////////////////////////////////////////////
// Construction/Destruction
//////////////////////////////////////////////////////////////////////
int loglevel=1;
int duplex;
// ***************************************************
Receiver::Receiver (Acceptor * acceptor, int index)
: acceptor_ (acceptor),
index_ (index),
handle_ (ACE_INVALID_HANDLE),
io_count_ (0),
flg_cancel_(0),
total_snd_(0),
total_rcv_(0),
total_w_ (0),
total_r_ (0)
{
if (this->acceptor_ != 0)
this->acceptor_->on_new_receiver (*this);
}
Receiver::~Receiver (void)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ")
ACE_TEXT ("%d recvs (%d bytes)\n"),
this->index_,
this->total_w_, this->total_snd_,
this->total_r_, this->total_rcv_));
if (this->io_count_ != 0)
ACE_ERROR ((LM_WARNING,
ACE_TEXT ("(%t) Receiver %d deleted with ")
ACE_TEXT ("%d I/O outstanding\n"),
this->index_,
this->io_count_));
// This test bounces data back and forth between Senders and Receivers.
// Therefore, if there was significantly more data in one direction, that's
// a problem. Remember, the byte counts are unsigned values.
int issue_data_warning = 0;
if (this->total_snd_ > this->total_rcv_)
{
if (this->total_rcv_ == 0)
issue_data_warning = 1;
else if (this->total_snd_ / this->total_rcv_ > 2)
issue_data_warning = 1;
}
else
{
if (this->total_snd_ == 0)
issue_data_warning = 1;
else if (this->total_rcv_ / this->total_snd_ > 2)
issue_data_warning = 1;
}
if (issue_data_warning)
ACE_DEBUG ((LM_WARNING,
ACE_TEXT ("(%t) Above byte counts look odd; need review\n")));
if (this->acceptor_ != 0)
this->acceptor_->on_delete_receiver (*this);
if (this->handle_ != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this->handle_);
this->index_ = -1;
this->handle_= ACE_INVALID_HANDLE;
}
void
Receiver::cancel ()
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
this->flg_cancel_ = 1;
this->ws_.cancel ();
this->rs_.cancel ();
return;
}
void
Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&)
{
ACE_TCHAR str[256];
if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR)))
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d connection from %s\n"),
this->index_,
str));
else
ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"),
this->index_,
ACE_TEXT ("addr_to_string")));
return;
}
void
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
// Don't buffer serial sends.
this->handle_ = handle;
int nodelay = 1;
ACE_SOCK_Stream option_setter (handle);
if (-1 == option_setter.set_option (ACE_IPPROTO_TCP,
TCP_NODELAY,
&nodelay,
sizeof (nodelay)))
ACE_ERROR ((LM_ERROR, "%p\n", "set_option"));
if (this->ws_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open")));
else if (this->rs_.open (*this, this->handle_) == -1)
ACE_ERROR ((LM_ERROR,
ACE_TEXT ("(%t) %p\n"),
ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open")));
else
this->initiate_read_stream ();
if (this->io_count_ > 0)
return;
}
delete this;
}
int
Receiver::initiate_read_stream (void)
{
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
return -1;
ACE_Message_Block *mb = 0;
ACE_NEW_RETURN (mb,
ACE_Message_Block (1024), //BUFSIZ + 1),
-1);
// Inititiate read
if (this->rs_.read (*mb, mb->size () - 1) == -1)
{
mb->release ();
#if defined (ACE_WIN32)
// On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get
// a 0-byte read as we would if underlying calls used WSARecv.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d, peer closed\n"),
this->index_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN ((LM_ERROR,
ACE_TEXT ("(%t) Receiver %d, %p\n"),
this->index_,
ACE_TEXT ("read")),
-1);
}
this->io_count_++;
this->total_r_++;
return 0;
}
int
Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes)
{
if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE)
{
mb.release ();
return -1;
}
if (nbytes == 0)
{
mb.release ();
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")),
-1);
}
if (this->ws_.write (mb, nbytes) == -1)
{
mb.release ();
#if defined (ACE_WIN32)
// On peer close, WriteFile will yield ERROR_NETNAME_DELETED.
if (ACE_OS::last_error () == ERROR_NETNAME_DELETED)
ACE_ERROR_RETURN ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d, peer gone\n"),
this->index_),
-1);
#endif /* ACE_WIN32 */
ACE_ERROR_RETURN((LM_ERROR,
ACE_TEXT ("(%t) Receiver %d, %p\n"),
this->index_,
ACE_TEXT ("write")),
-1);
}
this->io_count_++;
this->total_w_++;
return 0;
}
void
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ );
ACE_Message_Block & mb = result.message_block ();
// Reset pointers.
mb.rd_ptr ()[result.bytes_transferred ()] = '\0';
if (loglevel > 1)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_read"),
result.bytes_to_read ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("handle"),
result.handle ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_transfered"),
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %@\n"),
ACE_TEXT ("act"),
result.act ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("success"),
result.success ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %@\n"),
ACE_TEXT ("completion_key"),
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("error"),
result.error ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %s\n"),
ACE_TEXT ("message_block"),
mb.rd_ptr ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
else if (result.error () != 0)
{
ACE_Log_Priority prio;
#if defined (ACE_WIN32)
if (result.error () == ERROR_OPERATION_ABORTED)
prio = LM_DEBUG;
#else
if (result.error () == ECANCELED)
prio = LM_DEBUG;
#endif /* ACE_WIN32 */
else
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
ACE_TEXT ("(%t) Receiver %d; %p\n"),
this->index_,
ACE_TEXT ("read"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"),
this->index_,
result.bytes_transferred ()));
}
if (result.error () == 0 && result.bytes_transferred () > 0)
{
this->total_rcv_ += result.bytes_transferred ();
if (this->initiate_write_stream (mb,
result.bytes_transferred ()) == 0)
{
if (duplex != 0) // Initiate new read from the stream.
this->initiate_read_stream ();
}
}
else
mb.release ();
this->io_count_--;
if (this->io_count_ > 0)
return;
}
delete this;
}
void
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result)
{
{
ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_);
ACE_Message_Block & mb = result.message_block ();
if (loglevel > 1)
{
//mb.rd_ptr () [0] = '\0';
mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ());
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"),
this->index_));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_to_write"),
result.bytes_to_write ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("handle"),
result.handle ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("bytes_transfered"),
result.bytes_transferred ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %@\n"),
ACE_TEXT ("act"),
result.act ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("success"),
result.success ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %@\n"),
ACE_TEXT ("completion_key"),
result.completion_key ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %d\n"),
ACE_TEXT ("error"),
result.error ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("%s = %s\n"),
ACE_TEXT ("message_block"),
mb.rd_ptr ()));
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("**** end of message ****************\n")));
}
else if (result.error () != 0)
{
ACE_Log_Priority prio;
#if defined (ACE_WIN32)
if (result.error () == ERROR_OPERATION_ABORTED)
prio = LM_DEBUG;
#else
if (result.error () == ECANCELED)
prio = LM_DEBUG;
#endif /* ACE_WIN32 */
else
prio = LM_ERROR;
ACE_Log_Msg::instance ()->errnum (result.error ());
ACE_Log_Msg::instance ()->log (prio,
ACE_TEXT ("(%t) Receiver %d; %p\n"),
this->index_,
ACE_TEXT ("write"));
}
else if (loglevel > 0)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"),
this->index_,
result.bytes_transferred ()));
}
mb.release ();
if (result.error () == 0 && result.bytes_transferred () > 0)
{
this->total_snd_ += result.bytes_transferred ();
if (duplex == 0)
this->initiate_read_stream ();
}
this->io_count_--;
if (this->io_count_ > 0)
return;
}
delete this;
}