www.pudn.com > UDP-based_Reliable_Data_Transfer_Library.zip > udt.cc


//
// Author: Yunhong Gu, gu@lac.uic.edu
//
// Description: 
//
// Assumption: This code does NOT process sequence number wrap, which will overflow after 2^31 packets.
//             But I assume that you won't run NS for that long time :)
//
// Last Update: 03/20/2006
//

#include 
#include 
#include "ip.h"
#include "udt.h"

int hdr_udt::off_udt_;

static class UDTHeaderClass : public PacketHeaderClass 
{
public:
   UDTHeaderClass() : PacketHeaderClass("PacketHeader/UDT", sizeof(hdr_udt)) 
   {
      bind_offset(&hdr_udt::off_udt_);
   }
} class_udthdr;

static class UdtClass : public TclClass 
{
public:
   UdtClass() : TclClass("Agent/UDT") {}
   TclObject* create(int, const char*const*) 
   {
      return (new UdtAgent());
   }
} class_udt;


UdtAgent::UdtAgent(): Agent(PT_UDT),
syn_timer_(this),
ack_timer_(this),
nak_timer_(this),
exp_timer_(this),
snd_timer_(this),
syn_interval_(0.01),
mtu_(1500),
max_flow_window_(100000)
{
   bind("mtu_", &mtu_);
   bind("max_flow_window_", &max_flow_window_);

   snd_loss_list_ = new SndLossList(max_flow_window_, 1 << 29, 1 << 30);
   rcv_loss_list_ = new RcvLossList(max_flow_window_, 1 << 29, 1 << 30);

   flow_window_size_ = 2;
   snd_interval_ = 0.000001;

   ack_interval_ = syn_interval_;
   nak_interval_ = syn_interval_;
   exp_interval_ = 1.01;
   
   nak_count_ = 0;
   dec_count_ = 0;
   snd_last_ack_ = 0;
   local_send_ = 0;
   local_loss_ = 0;
   local_ack_ = 0;
   snd_curr_seqno_ = -1;
   curr_max_seqno_ = 0;
   avg_nak_num_ = 2;
   dec_random_ = 2;

   loss_rate_limit_ = 0.01;
   loss_rate_ = 0;

   rtt_ = 1;
   
   rcv_interval_ = snd_interval_;
   rcv_last_ack_ = 0;
   rcv_last_ack_time_ = Scheduler::instance().clock();
   rcv_last_ack2_ = 0;
   ack_seqno_ = -1;
   rcv_curr_seqno_ = -1;
   local_recv_ = 0;
   last_dec_seq_ = -1;
   last_delay_time_ = Scheduler::instance().clock();
   last_dec_int_ = 1.0;

   slow_start_ = true;
   freeze_ = false;

   ack_timer_.resched(ack_interval_);
   nak_timer_.resched(nak_interval_);
}

UdtAgent::~UdtAgent()
{
}

int UdtAgent::command(int argc, const char*const* argv)
{
   return Agent::command(argc, argv);
}

void UdtAgent::recv(Packet *pkt, Handler*)
{
   hdr_udt* udth = hdr_udt::access(pkt);

   double r;

   if (1 == udth->flag())
   {
      switch (udth->type())
      {
      case 2:
         sendCtrl(6, udth->ackseq());

         if (udth->ack() > snd_last_ack_)
         {
            snd_last_ack_ = udth->ack();
            snd_loss_list_->remove((int)snd_last_ack_);
         }
         else
            break;

         snd_timer_.resched(0);

         if (rtt_ == syn_interval_)
            rtt_ = udth->rtt() / 1000000.0;
         else
            rtt_ = rtt_ * 0.875 + udth->rtt() / 1000000.0 * 0.125;

         if (slow_start_)
            flow_window_size_ = snd_last_ack_;
         else if (udth->lrecv() > 0)
            flow_window_size_ = int(ceil(flow_window_size_ * 0.875 + udth->lrecv() * (rtt_ + syn_interval_) * 0.125));

         if (flow_window_size_ > max_flow_window_)
         {
            slow_start_ = false;

            flow_window_size_ = max_flow_window_;
         }

         bandwidth_ = int(bandwidth_ * 0.875 + udth->bandwidth() * 0.125);

         exp_timer_.resched(exp_interval_);

         rateControl();

         if (snd_interval_ > rtt_)
         {
            snd_interval_ = rtt_;
            snd_timer_.resched(0);
         }

         break;

      case 3:
         slow_start_ = false;

         last_dec_int_ = snd_interval_;

         if ((udth->loss()[0] & 0x7FFFFFFF) > last_dec_seq_)
         {
            freeze_ = true;
            snd_interval_ = snd_interval_ * 1.125;

            avg_nak_num_ = 1 + int(ceil(double(avg_nak_num_) * 0.875 + double(nak_count_) * 0.125));

            dec_random_ = int(rand() * double(avg_nak_num_) / (RAND_MAX + 1.0)) + int(ceil(avg_nak_num_/5.0));

            nak_count_ = 1;

            last_dec_seq_ = snd_curr_seqno_;
         }
         else if (0 == (++ nak_count_ % dec_random_))
         {
            snd_interval_ = snd_interval_ * 1.125;

            last_dec_seq_ = snd_curr_seqno_;
         }

         if (snd_interval_ > rtt_)
            snd_interval_ = rtt_;

         local_loss_ ++;

         for (int i = 0, n = udth->losslen(); i < n; ++ i)
         {
            if ((udth->loss()[i] & 0x80000000) && ((udth->loss()[i] & 0x7FFFFFFF) >= snd_last_ack_))
            {
               snd_loss_list_->insert(udth->loss()[i] & 0x7FFFFFFF, udth->loss()[i + 1]);
               ++ i;
            }
            else if (udth->loss()[i] >= snd_last_ack_)
            {

               snd_loss_list_->insert(udth->loss()[i], udth->loss()[i]);
            }
         }

         exp_timer_.resched(exp_interval_);

         snd_timer_.resched(0);

         break;
     
      case 4:
/*
         if (slow_start_)
            slow_start_ = false;

         last_dec_int_ = snd_interval_;

         snd_interval_ = snd_interval_ * 1.125;

         last_dec_seq_ = snd_curr_seqno_;
         nak_count_ = -16;
         dec_count_ = 1;
*/
         break;

      case 6:
      {
         int ack;
         double rtt = ack_window_.acknowledge(udth->ackseq(), ack);

         if (rtt > 0)
         {
            time_window_.ack2arrival(rtt);

//            if ((time_window_.getdelaytrend()) && (Scheduler::instance().clock() - last_delay_time_ > 2 * rtt_))
//               sendCtrl(4);

            if (rtt_ == syn_interval_)
               rtt_ = rtt;
            else
               rtt_ = rtt_ * 0.875 + rtt * 0.125;

            nak_interval_ = rtt_;
            if (nak_interval_ < syn_interval_)
               nak_interval_ = syn_interval_;

            if (rcv_last_ack2_ < ack)
               rcv_last_ack2_ = ack;
         }

         break;
      }

      default:
         break;
      }

      Packet::free(pkt);
      return;
   }
   

   time_window_.pktarrival();

   if (0 == udth->seqno() % 16)
      time_window_.probe1arrival();
   else if (1 == udth->seqno() % 16)
      time_window_.probe2arrival();

   local_recv_ ++;

   int offset = udth->seqno() - rcv_last_ack_;
 
   if (offset < 0)
   {
      Packet::free(pkt);
      return;
   }

   if (udth->seqno() > rcv_curr_seqno_ + 1)
   {
      int c;

      if (rcv_curr_seqno_ + 1 == udth->seqno() - 1)
         c = 1;
      else
         c = 2;

      int* loss = new int[c];

      if (c == 2)
      {
         loss[0] = (rcv_curr_seqno_ + 1) | 0x80000000;
         loss[1] = udth->seqno() - 1;
      }
      else
         loss[0] = rcv_curr_seqno_ + 1;

      sendCtrl(3, c, loss);

      delete [] loss;
   }

   if (udth->seqno() > rcv_curr_seqno_)
   {
      rcv_curr_seqno_ = udth->seqno();
   }
   else
   {
      rcv_loss_list_->remove(udth->seqno());
   }

   Packet::free(pkt);
   return;
}

void UdtAgent::sendmsg(int nbytes, const char* /*flags*/)
{
   if (curr_max_seqno_ == snd_curr_seqno_ + 1)
      exp_timer_.resched(exp_interval_);

   curr_max_seqno_ += nbytes/1468;

   snd_timer_.resched(0);
}

void UdtAgent::sendCtrl(int pkttype, int lparam, int* rparam)
{
   Packet* p;
   hdr_udt* udth;
   hdr_cmn* ch;

   int ack;

   switch (pkttype)
   {
   case 2:
      if (rcv_loss_list_->getLossLength() == 0)
         ack = rcv_curr_seqno_ + 1;
      else
         ack = rcv_loss_list_->getFirstLostSeq();

      if (ack > rcv_last_ack_)
      {
         rcv_last_ack_ = ack;
      }
      else if (Scheduler::instance().clock() - rcv_last_ack_time_ <= 2 * rtt_)
      {
         ack_timer_.resched(ack_interval_);

         break;
      }

      if (rcv_last_ack_ > rcv_last_ack2_)
      {
         p = allocpkt(40);
         udth = hdr_udt::access(p);

         udth->flag() = 1;
         udth->type() = 2;
         udth->lrecv() = time_window_.getpktspeed();
         udth->bandwidth() = time_window_.getbandwidth();
         udth->rtt() = int(rtt_ * 1000000.0);

         ack_seqno_ ++;
         udth->ackseq() = ack_seqno_;
         udth->ack() = rcv_last_ack_;

         ch = hdr_cmn::access(p);
         ch->size() = 40;
         Agent::send(p, 0);

         ack_window_.store(ack_seqno_, rcv_last_ack_);

         rcv_last_ack_time_ = Scheduler::instance().clock();
      }

      ack_timer_.resched(ack_interval_);

      break;

   case 3:
      if (rparam != NULL)
      {
         p = allocpkt(32 + lparam * 4);
         udth = hdr_udt::access(p);

         udth->flag() = 1;
         udth->type() = 3;
         udth->losslen() = lparam;
         memcpy(udth->loss(), rparam, lparam * 4);

         ch = hdr_cmn::access(p);
         ch->size() = 32 + lparam * 4;
         Agent::send(p, 0);
      }
      else if (rcv_loss_list_->getLossLength() > 0)
      {
         int losslen;
         int* loss = new int[MAX_LOSS_LEN];
         rcv_loss_list_->getLossArray(loss, &losslen, MAX_LOSS_LEN, rtt_);
 
         if (losslen > 0)
         {
            p = allocpkt(32 + losslen);
            udth = hdr_udt::access(p);

            udth->flag() = 1;
            udth->type() = 3;
            udth->losslen() = losslen;
            memcpy(udth->loss(), loss, MAX_LOSS_LEN);

            ch = hdr_cmn::access(p);
            ch->size() = 32 + losslen;
            Agent::send(p, 0);
         }

         delete [] loss;
      }

      nak_timer_.resched(nak_interval_);

      break;

   case 4:
      p = allocpkt(32);
      udth = hdr_udt::access(p);

      udth->flag() = 1;
      udth->type() = 4;

      ch = hdr_cmn::access(p);
      ch->size() = 32;
      Agent::send(p, 0);

      last_delay_time_ = Scheduler::instance().clock();

      break;

   case 6:
      p = allocpkt(32);
      udth = hdr_udt::access(p);

      udth->flag() = 1;
      udth->type() = 6;
      udth->ackseq() = lparam;

      ch = hdr_cmn::access(p);
      ch->size() = 32;
      Agent::send(p, 0);

      break;
   }
}

void UdtAgent::sendData()
{
   bool probe = false;

   if (snd_last_ack_ == curr_max_seqno_)
      snd_timer_.resched(snd_interval_);

   int nextseqno;

   if (snd_loss_list_->getLossLength() > 0)
   {
      nextseqno = snd_loss_list_->getLostSeq();
   }
   else if (snd_curr_seqno_ - snd_last_ack_ < flow_window_size_)
   {
      nextseqno = ++ snd_curr_seqno_;
      if (0 == nextseqno % 16)
         probe = true;
   }
   else
   {
/*
      if (freeze_)
      {
         snd_timer_.resched(syn_interval_ + snd_interval_);
         freeze_ = false;
      }
      else
         snd_timer_.resched(snd_interval_);
*/
      return;
   }

   Packet* p;

   p = allocpkt(mtu_);
   hdr_udt* udth = hdr_udt::access(p);
   udth->flag() = 0;
   udth->seqno() = nextseqno;

   hdr_cmn* ch = hdr_cmn::access(p);
   ch->size() = mtu_;
   Agent::send(p, 0);

   local_send_ ++;

   if (probe)
   {
      snd_timer_.resched(0);
      return;
   }

   if (freeze_)
   {
      snd_timer_.resched(syn_interval_ + snd_interval_);
      freeze_ = false;
   }
   else
      snd_timer_.resched(snd_interval_);
}

void UdtAgent::rateControl()
{
   if (slow_start_)
      return;

   double inc = 0.0;

   if (bandwidth_ < 1.0 / snd_interval_)
      inc = 1.0/mtu_;
   else
   {
      inc = pow(10, ceil(log10((bandwidth_ - 1.0 / snd_interval_) * mtu_ * 8))) * 0.0000015 / mtu_;

      if (inc < 1.0/mtu_)
         inc = 1.0/mtu_;
   }

   snd_interval_ = (snd_interval_ * syn_interval_) / (snd_interval_ * inc + syn_interval_);

   if (snd_interval_ < 0.000001)
      snd_interval_ = 0.000001;
}

void UdtAgent::timeOut()
{
   if (snd_curr_seqno_ >= snd_last_ack_)
   {
      snd_loss_list_->insert(int(snd_last_ack_), int(snd_curr_seqno_));
   }

   exp_interval_ = 1.0; //rtt_ + syn_interval_;

   exp_timer_.resched(exp_interval_);

   snd_timer_.resched(0);
}

/////////////////////////////////////////////////////////////////
void SndTimer::expire(Event*)
{
   a_->sendData();
}

void SynTimer::expire(Event*)
{
   a_->rateControl();
}

void AckTimer::expire(Event*)
{
   a_->sendCtrl(2);
}

void NakTimer::expire(Event*)
{
   a_->sendCtrl(3);
}

void ExpTimer::expire(Event*)
{
   a_->timeOut();
}

////////////////////////////////////////////////////////////////////

// Definition of >, <, >=, and <= with sequence number wrap

inline const bool LossList::greaterthan(const int& seqno1, const int& seqno2) const
{
   if ((seqno1 > seqno2) && (seqno1 - seqno2 < seq_no_th_))
      return true;
 
   if (seqno1 < seqno2 - seq_no_th_)
      return true;

   return false;
}

inline const bool LossList::lessthan(const int& seqno1, const int& seqno2) const
{
   return greaterthan(seqno2, seqno1);
}

inline const bool LossList::notlessthan(const int& seqno1, const int& seqno2) const
{
   if (seqno1 == seqno2)
      return true;

   return greaterthan(seqno1, seqno2);
}

inline const bool LossList::notgreaterthan(const int& seqno1, const int& seqno2) const
{
   if (seqno1 == seqno2)
      return true;

   return lessthan(seqno1, seqno2);
}

// return the distance between two sequence numbers, parameters are pre-checked
inline const int LossList::getLength(const int& seqno1, const int& seqno2) const
{
   if (seqno2 >= seqno1)
      return seqno2 - seqno1 + 1;
   else if (seqno2 < seqno1 - seq_no_th_)
      return seqno2 - seqno1 + max_seq_no_ + 1;
   else
      return 0;
}

//Definition of ++, and -- with sequence number wrap

inline const int LossList::incSeqNo(const int& seqno) const
{
   return (seqno + 1) % max_seq_no_;
}

inline const int LossList::decSeqNo(const int& seqno) const
{
   return (seqno - 1 + max_seq_no_) % max_seq_no_;
}


SndLossList::SndLossList(const int& size, const int& th, const int& max):
size_(size)
{
   seq_no_th_ = th;
   max_seq_no_ = max;

   data1_ = new int [size_];
   data2_ = new int [size_];
   next_ = new int [size_];

   // -1 means there is no data in the node
   for (int i = 0; i < size; ++ i)
   {
      data1_[i] = -1;
      data2_[i] = -1;
   }

   length_ = 0;
   head_ = -1;
   last_insert_pos_ = -1;
}

SndLossList::~SndLossList()
{
   delete [] data1_;
   delete [] data2_;
   delete [] next_;
}

int SndLossList::insert(const int& seqno1, const int& seqno2)
{
   if (0 == length_)
   {
      // insert data into an empty list

      head_ = 0;
      data1_[head_] = seqno1;
      if (seqno2 != seqno1)
         data2_[head_] = seqno2;

      next_[head_] = -1;
      last_insert_pos_ = head_;

      length_ += getLength(seqno1, seqno2);

      return length_;
   }

   // otherwise find the position where the data can be inserted
   int origlen = length_;

   int offset = seqno1 - data1_[head_];

   if (offset < -seq_no_th_)
      offset += max_seq_no_;
   else if (offset > seq_no_th_)
      offset -= max_seq_no_;

   int loc = (head_ + offset + size_) % size_;

   if (offset < 0)
   {
      // Insert data prior to the head pointer

      data1_[loc] = seqno1;
      if (seqno2 != seqno1)
         data2_[loc] = seqno2;

      // new node becomes head
      next_[loc] = head_;
      head_ = loc;
      last_insert_pos_ = loc;

      length_ += getLength(seqno1, seqno2);
   }
   else if (offset > 0)
   {
      if (seqno1 == data1_[loc])
      {
         last_insert_pos_ = loc;

         // first seqno is equivlent, compare the second
         if (-1 == data2_[loc])
         {
            if (seqno2 != seqno1)
            {
               length_ += getLength(seqno1, seqno2) - 1;
               data2_[loc] = seqno2;
            }
         }
         else if (greaterthan(seqno2, data2_[loc]))
         {
            // new seq pair is longer than old pair, e.g., insert [3, 7] to [3, 5], becomes [3, 7]
            length_ += getLength(data2_[loc], seqno2) - 1;
            data2_[loc] = seqno2;
         }
         else
            // Do nothing if it is already there
            return 0;
      }
      else
      {
         // searching the prior node
         int i;
         if ((-1 != last_insert_pos_) && lessthan(data1_[last_insert_pos_], seqno1))
            i = last_insert_pos_;
         else
            i = head_;

         while ((-1 != next_[i]) && lessthan(data1_[next_[i]], seqno1))
            i = next_[i];

         if ((-1 == data2_[i]) || lessthan(data2_[i], seqno1))
         {
            last_insert_pos_ = loc;

            // no overlap, create new node
            data1_[loc] = seqno1;
            if (seqno2 != seqno1)
               data2_[loc] = seqno2;

            next_[loc] = next_[i];
            next_[i] = loc;

            length_ += getLength(seqno1, seqno2);
         }
         else
         {
            last_insert_pos_ = i;

            // overlap, coalesce with prior node, insert(3, 7) to [2, 5], ... becomes [2, 7]
            if (lessthan(data2_[i], seqno2))
            {
               length_ += getLength(data2_[i], seqno2) - 1;
               data2_[i] = seqno2;

               loc = i;
            }
            else
               return 0;
         }
      }
   }
   else
   {
      last_insert_pos_ = head_;

      // insert to head node
      if (seqno2 != seqno1)
      {
         if (-1 == data2_[loc])
         {
            length_ += getLength(seqno1, seqno2) - 1;
            data2_[loc] = seqno2;
         }
         else if (greaterthan(seqno2, data2_[loc]))
         {
            length_ += getLength(data2_[loc], seqno2) - 1;
            data2_[loc] = seqno2;
         }
         else 
            return 0;
      }
      else
         return 0;
   }

   // coalesce with next node. E.g., [3, 7], ..., [6, 9] becomes [3, 9] 
   while ((-1 != next_[loc]) && (-1 != data2_[loc]))
   {
      int i = next_[loc];

      if (notgreaterthan(data1_[i], incSeqNo(data2_[loc])))
      {
         // coalesce if there is overlap
         if (-1 != data2_[i])
         {
            if (greaterthan(data2_[i], data2_[loc]))
            {
               if (notlessthan(data2_[loc], data1_[i]))
                  length_ -= getLength(data1_[i], data2_[loc]);

               data2_[loc] = data2_[i];
            }
            else
               length_ -= getLength(data1_[i], data2_[i]);
         }
         else
         {
            if (data1_[i] == incSeqNo(data2_[loc]))
               data2_[loc] = data1_[i];
            else
               length_ --;
         }

         data1_[i] = -1;
         data2_[i] = -1;
         next_[loc] = next_[i];
      }
      else
         break;
   }

   return length_ - origlen;
}

void SndLossList::remove(const int& seqno)
{
   if (0 == length_)
      return;

   // Remove all from the head pointer to a node with a larger seq. no. or the list is empty

   int offset = seqno - data1_[head_];

   if (offset < -seq_no_th_)
      offset += max_seq_no_;
   else if (offset > seq_no_th_)
      offset -= max_seq_no_;

   int loc = (head_ + offset + size_) % size_;

   if (0 == offset)
   {
      // It is the head. Remove the head and point to the next node
      loc = (loc + 1) % size_;

      if (-1 == data2_[head_])
         loc = next_[head_];
      else
      {
         data1_[loc] = incSeqNo(seqno);
         if (greaterthan(data2_[head_], incSeqNo(seqno)))
            data2_[loc] = data2_[head_];

         data2_[head_] = -1;

         next_[loc] = next_[head_];
      }

      data1_[head_] = -1;

      if (last_insert_pos_ == head_)
         last_insert_pos_ = -1;

      head_ = loc;

      length_ --;
   }
   else if (offset > 0)
   {
      int h = head_;

      if (seqno == data1_[loc])
      {
         // target node is not empty, remove part/all of the seqno in the node.
         int temp = loc;
         loc = (loc + 1) % size_;         

         if (-1 == data2_[temp])
            head_ = next_[temp];
         else
         {
            // remove part, e.g., [3, 7] becomes [], [4, 7] after remove(3)
            data1_[loc] = incSeqNo(seqno);
            if (greaterthan(data2_[temp], incSeqNo(seqno)))
               data2_[loc] = data2_[temp];
            head_ = loc;
            next_[loc] = next_[temp];
            next_[temp] = loc;
            data2_[temp] = -1;
         }
      }
      else
      {
         // targe node is empty, check prior node
         int i = head_;
         while ((-1 != next_[i]) && lessthan(data1_[next_[i]], seqno))
            i = next_[i];

         loc = (loc + 1) % size_;

         if (-1 == data2_[i])
            head_ = next_[i];
         else if (greaterthan(data2_[i], seqno))
         {
            // remove part seqno in the prior node
            data1_[loc] = incSeqNo(seqno);
            if (greaterthan(data2_[i], incSeqNo(seqno)))
               data2_[loc] = data2_[i];

            data2_[i] = seqno;

            next_[loc] = next_[i];
            next_[i] = loc;

            head_ = loc;
         }
         else
            head_ = next_[i];
      }

      // Remove all nodes prior to the new head
      while (h != head_)
      {
         if (data2_[h] != -1)
         {
            length_ -= getLength(data1_[h], data2_[h]);
            data2_[h] = -1;
         }
         else
            length_ --;

         data1_[h] = -1;

         if (last_insert_pos_ == h)
            last_insert_pos_ = -1;

         h = next_[h];
      }
   }
}

int SndLossList::getLossLength()
{
   return length_;
}

int SndLossList::getLostSeq()
{
   if (0 == length_)
     return -1;

   if (last_insert_pos_ == head_)
      last_insert_pos_ = -1;

   // return the first loss seq. no.
   int seqno = data1_[head_];

   // head moves to the next node
   if (-1 == data2_[head_])
   {
      //[3, -1] becomes [], and head moves to next node in the list
      data1_[head_] = -1;
      head_ = next_[head_];
   }
   else
   {
      // shift to next node, e.g., [3, 7] becomes [], [4, 7]
      int loc = (head_ + 1) % size_;

      data1_[loc] = incSeqNo(seqno);
      if (greaterthan(data2_[head_], incSeqNo(seqno)))
         data2_[loc] = data2_[head_];

      data1_[head_] = -1;
      data2_[head_] = -1;

      next_[loc] = next_[head_];
      head_ = loc;
   }

   length_ --;

   return seqno;
}


//
RcvLossList::RcvLossList(const int& size, const int& th, const int& max):
size_(size)
{
   seq_no_th_ = th;
   max_seq_no_ = max;

   data1_ = new int [size_];
   data2_ = new int [size_];
   last_feedback_time_ = new double [size_];
   count_ = new int [size_];
   next_ = new int [size_];
   prior_ = new int [size_];

   // -1 means there is no data in the node
   for (int i = 0; i < size; ++ i)
   {
      data1_[i] = -1;
      data2_[i] = -1;
   }

   length_ = 0;
   head_ = -1;
   tail_ = -1;
}

RcvLossList::~RcvLossList()
{
   delete [] data1_;
   delete [] data2_;
   delete [] last_feedback_time_;
   delete [] count_;
   delete [] next_;
   delete [] prior_;
}

void RcvLossList::insert(const int& seqno1, const int& seqno2)
{
   // Data to be inserted must be larger than all those in the list
   // guaranteed by the UDT receiver

   if (0 == length_)
   {
      // insert data into an empty list
      head_ = 0;
      tail_ = 0;
      data1_[head_] = seqno1;
      if (seqno2 != seqno1)
         data2_[head_] = seqno2;

      last_feedback_time_[head_] = Scheduler::instance().clock();
      count_[head_] = 2;

      next_[head_] = -1;
      prior_[head_] = -1;
      length_ += getLength(seqno1, seqno2);

      return;
   }

   // otherwise searching for the position where the node should be

   int offset = seqno1 - data1_[head_];

   if (offset < -seq_no_th_)
      offset += max_seq_no_;

   int loc = (head_ + offset) % size_;

   if ((-1 != data2_[tail_]) && (incSeqNo(data2_[tail_]) == seqno1))
   {
      // coalesce with prior node, e.g., [2, 5], [6, 7] becomes [2, 7]
      loc = tail_;
      data2_[loc] = seqno2;
   }
   else
   {
      // create new node
      data1_[loc] = seqno1;

      if (seqno2 != seqno1)
         data2_[loc] = seqno2;

      next_[tail_] = loc;
      prior_[loc] = tail_;
      next_[loc] = -1;
      tail_ = loc;
   }

   // Initilize time stamp
   last_feedback_time_[loc] = Scheduler::instance().clock();
   count_[loc] = 2;

   length_ += getLength(seqno1, seqno2);
}

bool RcvLossList::remove(const int& seqno)
{
   if (0 == length_)
      return false; 

   // locate the position of "seqno" in the list
   int offset = seqno - data1_[head_];

   if (offset < -seq_no_th_)
      offset += max_seq_no_;

   if (offset < 0)
      return false;

   int loc = (head_ + offset) % size_;

   if (seqno == data1_[loc])
   {
      // This is a seq. no. that starts the loss sequence

      if (-1 == data2_[loc])
      {
         // there is only 1 loss in the sequence, delete it from the node
         if (head_ == loc)
         {
            head_ = next_[head_];
            if (-1 != head_)
               prior_[head_] = -1;
         }
         else
         {
            next_[prior_[loc]] = next_[loc];
            if (-1 != next_[loc])
               prior_[next_[loc]] = prior_[loc];
            else
               tail_ = prior_[loc];
         }

         data1_[loc] = -1;
      }
      else
      {
         // there are more than 1 loss in the sequence
         // move the node to the next and update the starter as the next loss inSeqNo(seqno)

         // find next node
         int i = (loc + 1) % size_;

         // remove the "seqno" and change the starter as next seq. no.
         data1_[i] = incSeqNo(data1_[loc]);

         // process the sequence end
         if (greaterthan(data2_[loc], incSeqNo(data1_[loc])))
            data2_[i] = data2_[loc];

         // replicate the time stamp and report counter
         last_feedback_time_[i] = last_feedback_time_[loc];
         count_[i] = count_[loc];

         // remove the current node
         data1_[loc] = -1;
         data2_[loc] = -1;
 
         // update list pointer
         next_[i] = next_[loc];
         prior_[i] = prior_[loc];

         if (head_ == loc)
            head_ = i;
         else
            next_[prior_[i]] = i;

         if (tail_ == loc)
            tail_ = i;
         else
            prior_[next_[i]] = i;
      }

      length_ --;

      return true;
   }

   // There is no loss sequence in the current position
   // the "seqno" may be contained in a previous node

   // searching previous node
   int i = (loc - 1 + size_) % size_;
   while (-1 == data1_[i])
      i = (i - 1 + size_) % size_;

   // not contained in this node, return
   if ((-1 == data2_[i]) || greaterthan(seqno, data2_[i]))
       return false;

   if (seqno == data2_[i])
   {
      // it is the sequence end

      if (seqno == incSeqNo(data1_[i]))
         data2_[i] = -1;
      else
         data2_[i] = decSeqNo(seqno);
   }
   else
   {
      // split the sequence

      // construct the second sequence from incSeqNo(seqno) to the original sequence end
      // located at "loc + 1"
      loc = (loc + 1) % size_;

      data1_[loc] = incSeqNo(seqno);
      if (greaterthan(data2_[i], incSeqNo(seqno)))
         data2_[loc] = data2_[i];

      // the first (original) sequence is between the original sequence start to decSeqNo(seqno)
      if (seqno == incSeqNo(data1_[i]))
         data2_[i] = -1;
      else
         data2_[i] = decSeqNo(seqno);

      // replicate the time stamp and report counter
      last_feedback_time_[loc] = last_feedback_time_[i];
      count_[loc] = count_[i];

      // update the list pointer
      next_[loc] = next_[i];
      next_[i] = loc;
      prior_[loc] = i;

      if (tail_ == i)
         tail_ = loc;
      else
         prior_[next_[loc]] = loc;
   }

   length_ --;

   return true;
}

int RcvLossList::getLossLength() const
{
   return length_;
}

int RcvLossList::getFirstLostSeq() const
{
   if (0 == length_)
      return -1;

   return data1_[head_];
}

void RcvLossList::getLossArray(int* array, int* len, const int& limit, const double& threshold)
{
   double currtime = Scheduler::instance().clock();

   int i  = head_;

   len = 0;

   while ((*len < limit - 1) && (-1 != i))
   {
      if (currtime - last_feedback_time_[i] > count_[i] * threshold)
      {
         array[*len] = data1_[i];
         if (-1 != data2_[i])
         {
            // there are more than 1 loss in the sequence
            array[*len] |= 0x80000000;
            ++ *len;
            array[*len] = data2_[i];
         }

         ++ *len;

         // update the timestamp
         last_feedback_time_[i] = Scheduler::instance().clock();
         // update how many times this loss has been fed back, the "k" in UDT paper
         ++ count_[i];
      }

      i = next_[i];
   }
}





////////////////////////////////////////////////////////////////////////////
//
AckWindow::AckWindow():
size_(1024),
head_(0),
tail_(0)
{
   ack_seqno_ = new int[size_];
   ack_ = new int[size_];
   ts_ = new double[size_];

   ack_seqno_[0] = -1;
}

AckWindow::~AckWindow()
{
   delete [] ack_seqno_;
   delete [] ack_;
   delete [] ts_;
}

void AckWindow::store(const int& seq, const int& ack)
{
   head_ = (head_ + 1) % size_;
   ack_seqno_[head_] = seq;
   ack_[head_] = ack;
   *(ts_ + head_) = Scheduler::instance().clock();

   // overwrite the oldest ACK since it is not likely to be acknowledged
   if (head_ == tail_)
      tail_ = (tail_ + 1) % size_;
}

double AckWindow::acknowledge(const int& seq, int& ack)
{
   if (head_ >= tail_)
   {
      // Head has not exceeded the physical boundary of the window
      for (int i = tail_; i <= head_; i ++)
         // looking for indentical ACK Seq. No.
         if (seq == ack_seqno_[i])
         {
            // return the Data ACK it carried
            ack = ack_[i];

            // calculate RTT
            double rtt = Scheduler::instance().clock() - ts_[i];
            if (i == head_)
               tail_ = head_ = 0;
            else
               tail_ = (i + 1) % size_;

            return rtt;
         }

      // Bad input, the ACK node has been overwritten
      return -1;
   }

   // head has exceeded the physical window boundary, so it is behind to tail
   for (int i = tail_; i <= head_ + size_; i ++)
      // looking for indentical ACK seq. no.
      if (seq == ack_seqno_[i % size_])
      {
         // return Data ACK
         i %= size_;
         ack = ack_[i];

         // calculate RTT
         double currtime = Scheduler::instance().clock();
         double rtt = currtime - ts_[i];
         if (i == head_)
            tail_ = head_ = 0;
         else
            tail_ = (i + 1) % size_;

         return rtt;
      }

   // bad input, the ACK node has been overwritten
   return -1;
}

//
TimeWindow::TimeWindow():
size_(16)
{
   pkt_window_ = new double[size_];
   rtt_window_ = new double[size_];
   pct_window_ = new double[size_];
   pdt_window_ = new double[size_];
   probe_window_ = new double[size_];

   pkt_window_ptr_ = 0;
   rtt_window_ptr_ = 0;
   probe_window_ptr_ = 0;

   first_round_ = true;

   for (int i = 0; i < size_; ++ i)
   {
      pkt_window_[i] = 1.0;
      rtt_window_[i] = pct_window_[i] = pdt_window_[i] = 0.0;
      probe_window_[i] = 1000.0;
   }

   last_arr_time_ = Scheduler::instance().clock();
}

TimeWindow::~TimeWindow()
{
   delete [] pkt_window_;
   delete [] rtt_window_;
   delete [] pct_window_;
   delete [] pdt_window_;
}

int TimeWindow::getbandwidth() const
{
   double temp;
   for (int i = 0; i < ((size_ >> 1) + 1); ++ i)
      for (int j = i; j < size_; ++ j)
         if (probe_window_[i] > probe_window_[j])
         {
            temp = probe_window_[i];
            probe_window_[i] = probe_window_[j];
            probe_window_[j] = temp;
         }

   if (0 == probe_window_[size_ >> 1])
      return 0;

   return int(ceil(1.0 / probe_window_[size_ >> 1]));
}

int TimeWindow::getpktspeed() const
{
   if ((first_round_) && (pkt_window_ptr_ > 0))
   {
      if ((pkt_window_ptr_ > 1) && (pkt_window_[pkt_window_ptr_ - 1] < 2 * pkt_window_[pkt_window_ptr_ - 2]))
         return (int)ceil(1.0 / pkt_window_[pkt_window_ptr_ - 1]);

      return 0;
   }

   double temp;
   for (int i = 0; i < ((size_ >> 1) + 1); ++ i)
      for (int j = i; j < size_; ++ j)
         if (pkt_window_[i] > pkt_window_[j])
         {
            temp = pkt_window_[i];
            pkt_window_[i] = pkt_window_[j];
            pkt_window_[j] = temp;
         }

   double median = pkt_window_[size_ >> 1];
   int count = 0;
   double sum = 0.0;

   for (int i = 0; i < size_; ++ i)
      if ((pkt_window_[i] < (median * 2)) && (pkt_window_[i] > (median / 2)))
      {
         ++ count;
         sum += pkt_window_[i];
      }

   if (count > (size_ >> 1))
      return (int)ceil(1.0 / (sum / count));
   else
      return 0;
}

bool TimeWindow::getdelaytrend() const
{
   double pct = 0.0;
   double pdt = 0.0;

   for (int i = 0; i < size_; ++i)
      if (i != rtt_window_ptr_)
      {
         pct += pct_window_[i];
         pdt += pdt_window_[i];
      }

   pct /= size_ - 1.0;
   if (0.0 != pdt)
      pdt = (rtt_window_[(rtt_window_ptr_ - 1 + size_) % size_] - rtt_window_[rtt_window_ptr_]) / pdt;

   return ((pct > 0.66) && (pdt > 0.45)) || ((pct > 0.54) && (pdt > 0.55));
}

void TimeWindow::pktarrival()
{
   curr_arr_time_ = Scheduler::instance().clock();

   pkt_window_[pkt_window_ptr_] = curr_arr_time_ - last_arr_time_;

   pkt_window_ptr_ = (pkt_window_ptr_ + 1) % size_;
 
   if (0 == pkt_window_ptr_)
      first_round_ = false;

   last_arr_time_ = curr_arr_time_;
}

void TimeWindow::probe1arrival()
{
   probe_time_ = Scheduler::instance().clock();
}

void TimeWindow::probe2arrival()
{
   probe_window_[probe_window_ptr_] = Scheduler::instance().clock() - probe_time_;;

   probe_window_ptr_ = (probe_window_ptr_ + 1) % size_;

   last_arr_time_ = Scheduler::instance().clock();
}

void TimeWindow::ack2arrival(const double& rtt)
{
   rtt_window_[rtt_window_ptr_] = rtt;
   pct_window_[rtt_window_ptr_] = (rtt > rtt_window_[(rtt_window_ptr_ - 1 + size_) % size_]) ? 1 : 0;
   pdt_window_[rtt_window_ptr_] = fabs(rtt - rtt_window_[(rtt_window_ptr_ - 1 + size_) % size_]);

   rtt_window_ptr_ = (rtt_window_ptr_ + 1) % size_;
}