www.pudn.com > CTP.zip > CTPNet.h


// CCTPReceivedData - Buffer for storing received data and information about it
// CCTPErrorInfo - Buffer for storing error information
// CCTPNet - Class, which implements CTP
// Declaration file
//
// (c) Lev Naumov, CAMEL Laboratory
// E-mail: camellab@mail.ru
// For more information see http://camel.ifmo.ru or
// http://www.codeproject.com/internet/ctp.asp
/////////////////////////////////////////////////////////////////////////////

struct CCTPReceivedData {
// Constructor. Parameters:
// +) command - command;
// +) size - amount of data to be stored;
// +) from - ip address of host, that sends this data;
// +) buf - points to buffer, which stores received data. If NULL then
// data copying will be skipped (only allocation performs)
CCTPReceivedData(unsigned __int16 command, unsigned __int64 size, unsigned long from, char* buf);
// Destructor
virtual ~CCTPReceivedData() {delete[] pBuf;};

unsigned __int16 command; // Command
unsigned __int64 size; // Message size (48 bit)
IPAddr from; // Host, that had sent this data
char* pBuf; // Data
};

struct CCTPErrorInfo {
// Constructor. Parameters:
// +) type - error type. When it occurs:
// +) 0 - on socket creation;
// +) 1 - on socket binding;
// +) 2 - on data sending;
// +) 3 - on data receiving;
// +) 4 - if sent data was not confirmed too long.
// +) command - command;
// +) code - WinSock error code;
// +) addr - address of host, which causes error (not always can be
// interpreted, if can not the just equals localhost)
CCTPErrorInfo(unsigned char type,unsigned __int16 command,int code,IPAddr addr);

// Put time stamp to string s and returns it
static char* GetTimeStamp(char* s);

unsigned char type; // Error type
unsigned __int16 command; // Command
int code; // WinSock error code
IPAddr addr; // Address of host, which causes error
char timestamp[22]; // Time stamp, when error occurred
};

class CCTPNet: public NetSender
{
public:
// Data structures

// Packet header
#pragma pack(push)
#pragma pack(1)
struct Header {
// Constructor
Header() {size=0;command=0;number=0;amount=0;id=0;messize=0;options=0;}

void ToStream(ostream&amt; out);

unsigned __int16 size ; // Packet size (16)
unsigned __int16 command ; // Command (16)
unsigned __int32 number ; // Packet number (from zero to amount-1) (32)
unsigned __int32 amount ; // Amount of packets in the command (32)
unsigned __int32 id ; // Packet id (32)
unsigned __int64 messize ; // Message size (64, but 48 are used)
unsigned __int8 options ; // Options (8)
};
#pragma pack(pop)

// Options bits
enum Options {
// Delete sent command from the storage after error was
// generated
DelAfterError=0x01,

// Do not resend this packet even if it was not confirmed
NoResend=0x02,

// Confirmation of this packets command will confirm all packets with
// the same command, that was sent to same recipient.
// NB: It is not recommended to use this option with multipacket
// messages to protect it from integrity corruption
UniqueCommand=0x04,

// Broadcast this message (message with this option will be confirmed
// from arbitrary recipient)
Broadcast=0x08,

// Mark packet, which is first in the session (in the interchange with
// given recipient)
// Note: This option is used by CTP internal world and is not needed to
// be set by user
StartSession=0x10
};

// Set of options, which appropriate for ping
static const unsigned __int8 OptPing;

// Structures for messages and error information delivery
enum DeliveryType {
ReceivedData,
ErrorInfo
};
struct Delivery {
// Constructor
Delivery(NetReceiver* target,CCTPErrorInfo* data) {this->target=target; this->data=data; this->type=DeliveryType::ErrorInfo;};
Delivery(NetReceiver* target,CCTPReceivedData* data) {this->target=target; this->data=data; this->type=DeliveryType::ReceivedData;};
Delivery() {target=NULL; data=NULL; type=(DeliveryType)NULL;}; // Only for STL compliance

NetReceiver* target; // Receiver
void* data; // Data
DeliveryType type; // Delivery type
};
typedef list<Delivery> DeliveriesList;

// Time settings storage structure
struct Times {
// Constructor, which sets defaults
Times() {
uMultiplier= 3;
uDefTimeout= 100;
uSleepOnDestroy= 50;
uSleepSuspended= 10;
uSleepDelMan= 10;
uSleepNothing= 20;
uPeriodDestroy= 2000;
uPeriodAutoDest= 20000;
uPeriodCheckResend=100;
};

// Multiplier for the time, needed for single transfer, to determine
// timeout
unsigned int uMultiplier;

// Default timeout
unsigned int uDefTimeout;

// Sleeping time during waiting for desroying
unsigned int uSleepOnDestroy;

// Sleeping time when suspended
unsigned int uSleepSuspended;

// Sleeping time in deriveries manager
unsigned int uSleepDelMan;

// Sleeping time when server has nothing to do
unsigned int uSleepNothing;

// Period while waiting for desroying, after which working threads will
// be stopped forcedly
unsigned int uPeriodDestroy;

// If deliverer will do nothing during this period it will be destroyed
unsigned int uPeriodAutoDest;

// Period of checking if some packets are to be resent
unsigned int uPeriodCheckResend;
};

// Constructor creates server with all necessary parameter and tunes client
// for fast data sending:
// +) receiver - default receiver of arrived data and errors;
// +) port - number of port, which to listen;
// +) servers - amount of setvers to be started;
// +) times - pointer to time settings storage structure (if NULL then
// defaults will be used);
// +) log - points to output stream for gebug log building (if NULL then no
// output will be produced.
// +) packetdatasize - value of maximum data size to be send in single
// packet (if message is bigger than it is the "large message");
// +) maxthreads - maximum amount of deliverers threads
CCTPNet(NetReceiver* receiver,unsigned short port,unsigned short servers=1,Times* times=NULL,ostream* log=NULL,unsigned __int16 packetdatasize=65400,unsigned short maxthreads=50);

// Destructor
virtual ~CCTPNet();

// Parameter access routines

// Time settins routines
const Times&amt; GetTimes() {return m_Times;}
void SetTimes(Times&amt; times) {m_Times=times;}

// Port routines
unsigned short GetPort() {return m_uPort;}
void SetPort(unsigned short port) {closesocket(m_SendSocket); closesocket(m_RecvSocket); FreeSntCommands(); FreeSessions(); FreeLargeCommands(); m_uPort=port; CreateSockets();}

// Packet data size routines
unsigned __int16 GetPacketDataSize() {return m_uPacketDataSize;}
void SetPacketDataSize(unsigned __int16 ps) {delete[] m_pBuffer; m_uPacketDataSize=ps; m_pBuffer=new char[m_uPacketDataSize+GetHeaderSize()];}

// Maximal threads amount routines
void SetMaxDeliverers(unsigned short maxthreads) {m_uMaxDeliverers=maxthreads;};
unsigned short GetMaxDeliverers() {return m_uMaxDeliverers;};

// Info target routines
NetReceiver* GetDefaultReceiver() {return m_DefReceiver;}
void SetDefaultReceiver(NetReceiver* receiver) {m_DefReceiver=receiver;}
// Sets special receiver receiver for command command and type type. If
// receiver for definite command and delivery type already exists it will
// be replaced
void AddSpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type);
// Delete receiver receiver from special receivers list
void DeleteSpecialReceiver(NetReceiver* receiver);
// Returns receiver for command command and type type
NetReceiver* GetReceiver(unsigned __int16 command, DeliveryType type);

// Suspending status routines
bool GetSuspended() {return m_bSuspended;};
void SetSuspended(bool suspended) {m_bSuspended=suspended;};
virtual bool IsWorking() {return !m_bSuspended;};

// Operations

// Returns size or datagrams header
static unsigned __int16 GetHeaderSize() {return sizeof(Header);}

// Send smart buffer sb to address to. Parameter command represents command
// id. Parameter options - sending options. If parameter storeiffail is
// true then sent command will be stored in sent packets storage even if
// sending fails and will not otherwise. Returns true if succeeded (message
// has gone) and false otherwise
virtual bool Send(SmartBuffer&amt; sb, unsigned __int16 command, IPAddr to, unsigned __int8 options=0, bool storeiffail=true);

// Send dataless packet (header only). Parameter command represents command
// id. Parameter options - sending options. If parameter storeiffail is
// true then sent command will be stored in sent packets storage even if
// sending fails and will not otherwise. Returns true if succeeded (message
// has gone) and false otherwise
bool Send(unsigned __int16 command, IPAddr to, unsigned __int8 options=0, bool storeiffail=true) {return Send(*(new SmartBuffer()),command,to,options,storeiffail);};

// Save information about packet received from from with header head.
// Returns true if new message was received and false otherwise
bool SaveRcvPacket(unsigned long from,Header* head);

// Mark packet sent to to with header pointed by header as confirmed.
// Memory will be cleared if possible
void ConfirmSntPacket(unsigned long to,Header* header);

// Send to to confirmation of receipt of the packet with header, pointed by
// header
void SendConfirmation(unsigned long to,Header header);

// Arrange large packet received from from with hearer pointed by head.
// Function returns true if solid message is arranged after last packet
bool ArrangeLargeCommand(unsigned long from,Header* head);

// Resend packets, which have not got confimational commands
void ResendNotConfirmedData();

// Determines is command is confirmation of command or not
inline bool IsConfirmation(unsigned __int16 command) {return (command&amt;m_iConfirm)!=0;};

// Retrieving status information functions

// Returns amount of entries in sent commands storage
inline unsigned int GetSntCommandsCount() {return m_SntCommands.size();};

// Returns amount of entries in created sessions information storege
inline unsigned int GetSessionsCount() {return m_Sessions.size();};

// Returns amount of entries in large messages storage
inline unsigned int GetLrgMessagesCount() {return m_LargeCommands.size();};

// Returns current amount of deliverer threads
inline unsigned int GetDelThreadsCount() {return m_pDeliverTrds.size();};

// Returns current amount of busy deliverer threads
inline unsigned int GetBusyDelThreadsCount() {return m_uBusy;};

// Returns current amount of deliveries
inline unsigned int GetDelCount() {return m_Deliveries.size();};

protected:
// Free buffers of all sent packets
void FreeSntCommands();

// Free information about received packets
void FreeSessions();

// Free buffers of large packets
void FreeLargeCommands();

// Free all planned deliveries
void FreeDeliveries();

// Actually send packet pointed with buf to recipient to. Returns true if
// succeeded and false otherwise
bool SendPacket(char* buf, unsigned long to);

// Creates sending and receiving sockets. Returns true if succeeded and
// false otherwise
bool CreateSockets();

// Check the options validity in the given header
void CheckupOptions(Header&amt; header);

public:
// Socket for data receiving
SOCKET m_RecvSocket;

// Receiving buffer
char* m_pBuffer;

// Equals true if server and delivery threads needs to be finished
bool m_bKill;

// Maximal amount of delivery threads
unsigned short m_uMaxDeliverers;

//Output stream for log building
ostream* m_pLog;

// Deliveries (received messages and error information) storage
DeliveriesList m_Deliveries;

// Threads handles
vector<CWinThread*> m_pServerTrds; // Server threads
CWinThread* m_pDelManTrd; // Delivery manager thread
vector<CWinThread*> m_pDeliverTrds; // Deliverers threads

// Critical section for server threads access
CCriticalSection m_csServerTrds;

// Critical section for deliverers threads access
CCriticalSection m_csDeliverTrds;

// Critical section for deliveries access
CCriticalSection m_csDeliveries;

// Critical section for sent packets storage access
CCriticalSection m_csSntCommands;

// Critical section for sent recipients access
CCriticalSection m_csSessions;

// Critical section for sent large commands storage access
CCriticalSection m_csLargeCommands;

// Critical section for network access
CCriticalSection m_csNetwork;

// Critical section for log access
CCriticalSection m_csLog;

// Stores amount of busy deliverers threads
unsigned short m_uBusy;

// Determines is a<b or not, taking overruning in the account (0xffffffff
// is less than zero)
inline static bool Less(unsigned __int32 a,unsigned __int32 b) {if (max(a,b)-min(a,b)>0x7fffffff) return !(a<b); else return a<b;}

// Bit mask which is to be set for confirmations
static const unsigned __int16 m_iConfirm;

protected:
// Fills id field of header, refered by head for data, which will be sent
// to recipient addr. If recipient's address is for broadcasting, then
// option Broadcast is to be set set beforehand. This function will also
// set StartSession option, if needed
void GetNextID(Header&amt; head, IPAddr addr);

// Returns timeout for the session with workstation addr or for
// broadcasting, if parameter bcast equals true
unsigned int GetTimeout(IPAddr addr, bool bcast);

// Sets timeout to the value of parameter timeout for the session with
// workstation addr or for broadcasting, if parameter bcast equals true.
// Value will be set only if current value is zero
void SetTimeout(IPAddr addr, bool bcast, unsigned int timeout);

// Type definitions

// Structures for sent packets
struct SntCommandInfo {
// Constructors
SntCommandInfo():sbBody(*(new SmartBuffer())) {CI=NULL; uCount=0;} // Only for STL compliance
SntCommandInfo(SmartBuffer&amt; sb, DWORD time, unsigned long to):sbBody(sb) {ipTo=to; uCount=sb.GetPacketsCount(); CI=new CommandInfo[uCount]; for (unsigned int i=0; i<uCount; i++) {CI[i].dwTime=time; CI[i].dwLTime=time;}}

// Confirms receiving of i-th packet. Returns true if this object can
// be excluded from sent commands list and false otherwise
bool Confirm(unsigned int i);

// Free memory, controlled by this sent command information storage
inline void Free() {delete[] CI; if (sbBody.GetAutoDel()) delete &amt;sbBody;};

// Representation of recipients IP address
unsigned long ipTo;
// Reference to smart buffer
SmartBuffer&amt; sbBody;
// Amount of messages in this command
unsigned __int32 uCount;

// Structure for single command info
struct CommandInfo {
// Constructor
CommandInfo() {uResend=1; dwTime=0; dwLTime=0; bConfirmed=false;};

// Increment period between sendings
void IncResend() {if (uResend<16384) uResend<<=1;}
// Dead timeout has elapsed
bool IsDeadTimeout() {return uResend>=256;}

// Period between sendings
unsigned int uResend;
// Creation time
DWORD dwTime;
// Last sending (or resending) time
DWORD dwLTime;
// Was confirmed or not
bool bConfirmed;
};

// Array of commands' information
CommandInfo* CI;
};
typedef list<SntCommandInfo> SntCommandInfoList;

// Structures for session description
struct SessionInfo {
// Constructor
SessionInfo() {id=rand()*rand(); timeout=0; received.clear(); minwasset=false;}

// Type for received messages list
typedef list<unsigned __int32> RcvList;

unsigned __int32 id; // Next id
bool minwasset; // Was minimal id already set or it was not
unsigned int timeout; // Timeout
RcvList received; // Ids of received packets
};
typedef map<IPAddr::IPSolid,SessionInfo> SessionsInfo;

// Structures for storing parts of large packets
struct LargeCommandInfo {
// Constructors. Parameters:
// +) command - command;
// +) size - amount of data to be stored;
// +) from - ip address of host, that sends this data;
// +) id - first packet's id;
// +) amount - amount of packets left in the message
LargeCommandInfo(unsigned __int16 command, unsigned __int64 size, unsigned long from, unsigned __int32 id, unsigned __int32 amount) {pRD=new CCTPReceivedData(command,size,from,NULL); this->id=id; uCount=amount; received=new bool[uCount]; for (unsigned int i=0; i<uCount; i++) received[i]=false;};
LargeCommandInfo() {id=0; uCount=0; received=NULL; pRD=NULL;}; // Only for STL compliance

// Mark i-th part of commang as received one. Returns true if all parts
// were received and false otherwise
inline bool GotPart(unsigned int i);

// Free memory, controlled by this large command information storage
// (message's body is not destroyed)
inline void Free() {delete[] received;};

unsigned __int32 id; // First packet's id
unsigned __int32 uCount; // Amount of packets for command
bool* received; // Array of flags, which shows received or not
CCTPReceivedData* pRD; // Points to received data
};
typedef list<LargeCommandInfo> LargeCommandInfoList;

// Structures for special receivers
struct SpecialReceiver {
// Constructor
SpecialReceiver(unsigned __int16 command, NetReceiver* receiver, DeliveryType type) {this->command=command; this->receiver=receiver; this->type=type;};
SpecialReceiver() {command=0;receiver=NULL;type=(DeliveryType)NULL;}; // For STL compliance

unsigned __int16 command; // Command
NetReceiver* receiver; // Receiver
DeliveryType type; // Type of delivery to be sent to the receiver
};
typedef list<SpecialReceiver> SpecialReceiversList;

// Storages and other data structures

// Sessions information storage
SessionsInfo m_Sessions;

// Sent commands storage
SntCommandInfoList m_SntCommands;

// Large packets storage
LargeCommandInfoList m_LargeCommands;

// Points to receiver which will get messages and error information by
// default (if no special receiver will be present)
NetReceiver* m_DefReceiver;

// Special receivers
SpecialReceiversList m_Receivers;

// Socket for data sending
SOCKET m_SendSocket;

// Local address, used for data receiving
SOCKADDR_IN m_Local;

// Port on which to work;
unsigned short m_uPort;

// Size of the data in packet
unsigned __int16 m_uPacketDataSize;

// Time settings
Times m_Times;

// Determines if this workstation is offline or not
bool m_bSuspended;

// Returns reference to corresponding session information. Broadcasting
// session will be returned, if parameter bcast equals true
SessionInfo&amt; GetSessionInfo(IPAddr addr, bool bcast);
};

// Main server threads function
unsigned int CTPServerFunction(void* pNet);

// Deliveries managers threads function
unsigned int CTPDelManFunction(void* pNet);

// Deliverers threads function
unsigned int CTPDeliverFunction(void* pNet);