www.pudn.com > ACEProactor.rar > receiver.cpp, change:2006-01-23,size:12809b


// Receiver.cpp: implementation of the Receiver class. 
// 
////////////////////////////////////////////////////////////////////// 
#include "ace\Global_Macros.h" 
#include "Receiver.h" 
 
////////////////////////////////////////////////////////////////////// 
// Construction/Destruction 
////////////////////////////////////////////////////////////////////// 
int loglevel=1; 
int duplex; 
// *************************************************** 
Receiver::Receiver (Acceptor * acceptor, int index) 
  : acceptor_ (acceptor), 
    index_    (index), 
    handle_   (ACE_INVALID_HANDLE), 
    io_count_ (0), 
    flg_cancel_(0), 
    total_snd_(0), 
    total_rcv_(0), 
    total_w_  (0), 
    total_r_  (0) 
{ 
  if (this->acceptor_ != 0) 
    this->acceptor_->on_new_receiver (*this); 
} 
 
Receiver::~Receiver (void) 
{ 
  ACE_DEBUG ((LM_DEBUG, 
              ACE_TEXT ("(%t) Receiver %d dtor; %d sends (%d bytes); ") 
              ACE_TEXT ("%d recvs (%d bytes)\n"), 
              this->index_, 
              this->total_w_, this->total_snd_, 
              this->total_r_, this->total_rcv_)); 
  if (this->io_count_ != 0) 
    ACE_ERROR ((LM_WARNING, 
                ACE_TEXT ("(%t) Receiver %d deleted with ") 
                ACE_TEXT ("%d I/O outstanding\n"), 
                this->index_, 
                this->io_count_)); 
 
  // This test bounces data back and forth between Senders and Receivers. 
  // Therefore, if there was significantly more data in one direction, that's 
  // a problem. Remember, the byte counts are unsigned values. 
  int issue_data_warning = 0; 
  if (this->total_snd_ > this->total_rcv_) 
    { 
      if (this->total_rcv_ == 0) 
        issue_data_warning = 1; 
      else if (this->total_snd_ / this->total_rcv_ > 2) 
        issue_data_warning = 1; 
    } 
  else 
    { 
      if (this->total_snd_ == 0) 
        issue_data_warning = 1; 
      else if (this->total_rcv_ / this->total_snd_ > 2) 
        issue_data_warning = 1; 
    } 
  if (issue_data_warning) 
    ACE_DEBUG ((LM_WARNING, 
                ACE_TEXT ("(%t) Above byte counts look odd; need review\n"))); 
 
  if (this->acceptor_ != 0) 
    this->acceptor_->on_delete_receiver (*this); 
 
  if (this->handle_ != ACE_INVALID_HANDLE) 
    ACE_OS::closesocket (this->handle_); 
 
  this->index_ = -1; 
  this->handle_= ACE_INVALID_HANDLE; 
} 
 
void 
Receiver::cancel () 
{ 
  ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); 
 
  this->flg_cancel_ = 1; 
  this->ws_.cancel (); 
  this->rs_.cancel (); 
  return; 
} 
 
 
void 
Receiver::addresses (const ACE_INET_Addr& peer, const ACE_INET_Addr&) 
{ 
  ACE_TCHAR str[256]; 
  if (0 == peer.addr_to_string (str, sizeof (str) / sizeof (ACE_TCHAR))) 
    ACE_DEBUG ((LM_DEBUG, 
                ACE_TEXT ("(%t) Receiver %d connection from %s\n"), 
                this->index_, 
                str)); 
  else 
    ACE_ERROR ((LM_ERROR, ACE_TEXT ("(%t) Receiver %d %p\n"), 
                this->index_, 
                ACE_TEXT ("addr_to_string"))); 
  return; 
} 
 
 
void 
Receiver::open (ACE_HANDLE handle, ACE_Message_Block &) 
{ 
  { 
    ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); 
 
    // Don't buffer serial sends. 
    this->handle_ = handle; 
    int nodelay = 1; 
    ACE_SOCK_Stream option_setter (handle); 
    if (-1 == option_setter.set_option (ACE_IPPROTO_TCP, 
                                        TCP_NODELAY, 
                                        &nodelay, 
                                        sizeof (nodelay))) 
      ACE_ERROR ((LM_ERROR, "%p\n", "set_option")); 
 
    if (this->ws_.open (*this, this->handle_) == -1) 
      ACE_ERROR ((LM_ERROR, 
                  ACE_TEXT ("(%t) %p\n"), 
                  ACE_TEXT ("Receiver::ACE_Asynch_Write_Stream::open"))); 
    else if (this->rs_.open (*this, this->handle_) == -1) 
      ACE_ERROR ((LM_ERROR, 
                  ACE_TEXT ("(%t) %p\n"), 
                  ACE_TEXT ("Receiver::ACE_Asynch_Read_Stream::open"))); 
    else 
      this->initiate_read_stream (); 
 
    if (this->io_count_ > 0) 
      return; 
  } 
  delete this; 
} 
 
int 
Receiver::initiate_read_stream (void) 
{ 
  if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) 
    return -1; 
 
  ACE_Message_Block *mb = 0; 
  ACE_NEW_RETURN (mb, 
                  ACE_Message_Block (1024), //BUFSIZ + 1), 
                  -1); 
 
  // Inititiate read 
  if (this->rs_.read (*mb, mb->size () - 1) == -1) 
    { 
      mb->release (); 
#if defined (ACE_WIN32) 
      // On peer close, ReadFile will yield ERROR_NETNAME_DELETED; won't get 
      // a 0-byte read as we would if underlying calls used WSARecv. 
      if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) 
        ACE_ERROR_RETURN ((LM_DEBUG, 
                           ACE_TEXT ("(%t) Receiver %d, peer closed\n"), 
                           this->index_), 
                          -1); 
#endif /* ACE_WIN32 */ 
      ACE_ERROR_RETURN ((LM_ERROR, 
                         ACE_TEXT ("(%t) Receiver %d, %p\n"), 
                         this->index_, 
                         ACE_TEXT ("read")), 
                        -1); 
    } 
 
  this->io_count_++; 
  this->total_r_++; 
  return 0; 
} 
 
int 
Receiver::initiate_write_stream (ACE_Message_Block &mb, size_t nbytes) 
{ 
  if (this->flg_cancel_ != 0 || this->handle_ == ACE_INVALID_HANDLE) 
    { 
      mb.release (); 
      return -1; 
    } 
 
  if (nbytes == 0) 
    { 
      mb.release (); 
      ACE_ERROR_RETURN((LM_ERROR, 
                        ACE_TEXT ("(%t) Receiver::ACE_Asynch_Write_Stream::write nbytes <0 ")), 
                       -1); 
    } 
 
  if (this->ws_.write (mb, nbytes) == -1) 
    { 
      mb.release (); 
#if defined (ACE_WIN32) 
      // On peer close, WriteFile will yield ERROR_NETNAME_DELETED. 
      if (ACE_OS::last_error () == ERROR_NETNAME_DELETED) 
        ACE_ERROR_RETURN ((LM_DEBUG, 
                           ACE_TEXT ("(%t) Receiver %d, peer gone\n"), 
                           this->index_), 
                          -1); 
#endif /* ACE_WIN32 */ 
      ACE_ERROR_RETURN((LM_ERROR, 
                        ACE_TEXT ("(%t) Receiver %d, %p\n"), 
                        this->index_, 
                        ACE_TEXT ("write")), 
                       -1); 
    } 
 
  this->io_count_++; 
  this->total_w_++; 
  return 0; 
} 
 
void 
Receiver::handle_read_stream (const ACE_Asynch_Read_Stream::Result &result) 
{ 
  { 
    ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_ ); 
 
    ACE_Message_Block & mb = result.message_block (); 
 
    // Reset pointers. 
    mb.rd_ptr ()[result.bytes_transferred ()] = '\0'; 
 
    if (loglevel > 1) 
      { 
 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("(%t) **** Receiver %d: handle_read_stream() ****\n"), 
                    this->index_)); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("bytes_to_read"), 
                    result.bytes_to_read ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("handle"), 
                    result.handle ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("bytes_transfered"), 
                    result.bytes_transferred ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %@\n"), 
                    ACE_TEXT ("act"), 
                    result.act ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("success"), 
                    result.success ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %@\n"), 
                    ACE_TEXT ("completion_key"), 
                    result.completion_key ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("error"), 
                    result.error ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %s\n"), 
                    ACE_TEXT ("message_block"), 
                    mb.rd_ptr ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("**** end of message ****************\n"))); 
      } 
    else if (result.error () != 0) 
      { 
        ACE_Log_Priority prio; 
#if defined (ACE_WIN32) 
        if (result.error () == ERROR_OPERATION_ABORTED) 
          prio = LM_DEBUG; 
#else 
        if (result.error () == ECANCELED) 
          prio = LM_DEBUG; 
#endif /* ACE_WIN32 */ 
        else 
          prio = LM_ERROR; 
        ACE_Log_Msg::instance ()->errnum (result.error ()); 
        ACE_Log_Msg::instance ()->log (prio, 
                                       ACE_TEXT ("(%t) Receiver %d; %p\n"), 
                                       this->index_, 
                                       ACE_TEXT ("read")); 
      } 
    else if (loglevel > 0) 
      { 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("(%t) Receiver %d: read %d bytes\n"), 
                    this->index_, 
                    result.bytes_transferred ())); 
      } 
 
    if (result.error () == 0 && result.bytes_transferred () > 0) 
      { 
        this->total_rcv_ += result.bytes_transferred (); 
 
        if (this->initiate_write_stream (mb, 
                                         result.bytes_transferred ()) == 0) 
          { 
            if (duplex != 0)  // Initiate new read from the stream. 
              this->initiate_read_stream (); 
          } 
      } 
    else 
      mb.release (); 
 
    this->io_count_--; 
    if (this->io_count_ > 0) 
      return; 
  } 
  delete this; 
} 
 
void 
Receiver::handle_write_stream (const ACE_Asynch_Write_Stream::Result &result) 
{ 
  { 
    ACE_GUARD (ACE_SYNCH_MUTEX, monitor, this->lock_); 
 
    ACE_Message_Block & mb = result.message_block (); 
 
    if (loglevel > 1) 
      { 
 
        //mb.rd_ptr () [0] = '\0'; 
        mb.rd_ptr (mb.rd_ptr () - result.bytes_transferred ()); 
 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("(%t) **** Receiver %d: handle_write_stream() ****\n"), 
                    this->index_)); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("bytes_to_write"), 
                    result.bytes_to_write ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("handle"), 
                    result.handle ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("bytes_transfered"), 
                    result.bytes_transferred ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %@\n"), 
                    ACE_TEXT ("act"), 
                    result.act ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("success"), 
                    result.success ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %@\n"), 
                    ACE_TEXT ("completion_key"), 
                    result.completion_key ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %d\n"), 
                    ACE_TEXT ("error"), 
                    result.error ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("%s = %s\n"), 
                    ACE_TEXT ("message_block"), 
                    mb.rd_ptr ())); 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("**** end of message ****************\n"))); 
      } 
    else if (result.error () != 0) 
      { 
        ACE_Log_Priority prio; 
#if defined (ACE_WIN32) 
        if (result.error () == ERROR_OPERATION_ABORTED) 
          prio = LM_DEBUG; 
#else 
        if (result.error () == ECANCELED) 
          prio = LM_DEBUG; 
#endif /* ACE_WIN32 */ 
        else 
          prio = LM_ERROR; 
        ACE_Log_Msg::instance ()->errnum (result.error ()); 
        ACE_Log_Msg::instance ()->log (prio, 
                                       ACE_TEXT ("(%t) Receiver %d; %p\n"), 
                                       this->index_, 
                                       ACE_TEXT ("write")); 
      } 
    else if (loglevel > 0) 
      { 
        ACE_DEBUG ((LM_DEBUG, 
                    ACE_TEXT ("(%t) Receiver %d: wrote %d bytes ok\n"), 
                    this->index_, 
                    result.bytes_transferred ())); 
      } 
 
    mb.release (); 
 
    if (result.error () == 0 && result.bytes_transferred () > 0) 
      { 
        this->total_snd_ += result.bytes_transferred (); 
 
        if (duplex == 0) 
          this->initiate_read_stream (); 
      } 
 
    this->io_count_--; 
    if (this->io_count_ > 0) 
      return; 
  } 
  delete this; 
}