www.pudn.com > ChatUseIOCP.rar > DtIpSocket.cpp


#include "StdAfx.h" 
#include "dtipsocket.h" 
 
Datatal::DtIpSocket::DtIpSocket(void) 
{ 
	m_sdClient = INVALID_SOCKET; 
	m_nRemotePort = NULL; 
	strcpy(m_szRemoteHost, "127.0.0.1"); 
 
	//Create events 
	m_hNewDataEvent = CreateEvent(NULL, TRUE, FALSE, NULL); 
 
	m_bDisableNagle = false; 
	m_bDisableRecvBuf = false; 
	m_bDisableSendBuf = false; 
 
} 
 
Datatal::DtIpSocket::~DtIpSocket(void) 
{ 
	WSACleanup(); 
} 
 
void Datatal::DtIpSocket::Connect() 
{ 
	if (m_szRemoteHost[0] == NULL) 
		throw DtSocketException(ERROR_INVALID_DATA, "Connect failed", "Remotehost was not specified"); 
 
	if (m_nRemotePort == NULL) 
		throw DtSocketException(ERROR_INVALID_DATA, "Connect failed", "Remove port was not specified"); 
 
	DisableNagle(); 
 
	try 
	{ 
		Connect(m_szRemoteHost, m_nRemotePort); 
	} 
	catch (DtSocketException& ex) 
	{ 
		WriteLog(LP_HIGH, "Connect", "Connect failed: %s", ex.ToString()); 
		Disconnect(); 
	} 
	 
} 
 
void Datatal::DtIpSocket::DisableNagle() 
{ 
	m_bDisableNagle = true; 
} 
 
/// Disable winsocks internal sendbuffer and use own buffering only. 
void Datatal::DtIpSocket::DisableSendBuffer() 
{ 
	m_bDisableSendBuf = true; 
} 
 
/// Disable winsocks internal recvbuffer and use own buffering only. 
void Datatal::DtIpSocket::DisableRecvBuffer() 
{ 
	m_bDisableRecvBuf = true; 
} 
 
 
void Datatal::DtIpSocket::SetRemoteHost(const char* szHostName) 
{ 
	if (strlen(szHostName) > 128) 
		return; 
 
	strcpy(m_szRemoteHost, szHostName); 
} 
 
void Datatal::DtIpSocket::SetRemotePort(int nRemotePort) 
{ 
	m_nRemotePort = nRemotePort; 
} 
 
//Init everything. Start our workerthread when we are done 
void Datatal::DtIpSocket::Connect(const char* szHostName, int nRemotePort) 
{ 
	DWORD				dwFlags;			//socket flags 
	struct sockaddr_in	SockAddr;	//used to map ipbased address 
	struct hostent FAR	*pHostEnt;	//used to map stringbased address 
	HANDLE				hConnect;	//Temporary connect event 
	HANDLE				hEvents[2];	//Create a array of the events to wait for: 
	DWORD				dwRes = 0; 
 
	if (GetState() == DTSS_CONNECTING) 
		return; 
 
	if (GetState() == DTSS_CONNECTED) 
		return; 
 
	if (GetState() == DTSS_ERROR) 
		Disconnect(); 
 
	SetState(DTSS_CONNECTING); 
 
	SetRemoteHost(szHostName); 
	SetRemotePort(nRemotePort); 
 
 
	WSADATA	Wsa; 
	if ( int i = WSAStartup(MAKEWORD(2,0), &Wsa) ) 
	{ 
		SetState(DTSS_DISCONNECTED); 
		throw DtSocketException(i, "Connect failed", "WSAStartup failed"); 
	} 
 
 
	//Create the  socket 
	dwFlags = WSA_FLAG_OVERLAPPED; 
	if (m_sdClient == INVALID_SOCKET) 
	{ 
		m_sdClient = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, NULL, WSA_FLAG_OVERLAPPED); 
		if (m_sdClient == INVALID_SOCKET) 
		{ 
			SetState(DTSS_DISCONNECTED); 
			throw DtSocketException(WSAGetLastError(), "Connect failed", "WSASocket failed."); 
		} 
	} 
 
	//Set our port 
	SockAddr.sin_family=AF_INET; 
	SockAddr.sin_port=htons((u_short)m_nRemotePort); 
	if (m_szRemoteHost[0]>='0' && m_szRemoteHost[0]<='9') 
	{ 
		//It's an address of type xxx.xxx.xxx.xxx 
		SockAddr.sin_addr.s_addr=inet_addr(m_szRemoteHost); 
		if (SockAddr.sin_addr.s_addr==INADDR_NONE) 
		{ 
			char szError[256]; 
			sprintf(szError, "Connect, inet_addr(%s) failed", m_szRemoteHost); 
 
			SetState(DTSS_DISCONNECTED); 
			throw DtSocketException(GetLastError(), "Connect failed", szError); 
		} 
	} 
	else 
	{ 
		if ((pHostEnt=gethostbyname(m_szRemoteHost))==NULL)  
		{ 
			char szError[256]; 
			sprintf(szError, "Connect, gethostbyname(%s) failed", m_szRemoteHost); 
			SetState(DTSS_DISCONNECTED); 
			throw DtSocketException(GetLastError(), "Connect failed", szError); 
		} 
		else 
			memcpy(&SockAddr.sin_addr,pHostEnt->h_addr,pHostEnt->h_length); 
	} 
 
	//Create the event for Socket Connect operations 
	hConnect = WSACreateEvent(); 
	if ( !hConnect ) 
	{ 
		SetState(DTSS_DISCONNECTED); 
		throw DtSocketException(GetLastError(), "Connect failed", "WSACreateEvent() for hConnect failed."); 
	} 
 
	//wait for connect or stop 
	hEvents[0] = m_hStopEvent; 
	hEvents[1] = hConnect; 
	 
	ResetEvent(hConnect); 
 
	if (m_bDisableNagle) 
	{ 
		INT option_value = 1; 
		if (setsockopt(m_sdClient,SOL_SOCKET, TCP_NODELAY, (char *)&option_value, sizeof(option_value)) == SOCKET_ERROR ) 
		{ 
			WriteLog(1, "Connect", "Failed to disable Nagle algoritm: %d", WSAGetLastError()); 
		} 
	} // if (m_bDisableNagle) 
 
	if (m_bDisableSendBuf) 
	{ 
		int nLen = 0; 
		dwRes = setsockopt (m_sdClient, SOL_SOCKET, SO_SNDBUF, (char*)&nLen, sizeof (nLen)); 
		if (dwRes != 0) 
			throw DtSocketException(0, "Socket buffer", "Failed to disable winsock outbuffer"); 
	} // if (m_bDisableRecvBuf) 
	 
	if (m_bDisableRecvBuf) 
	{ 
		int nLen = 0; 
		dwRes = setsockopt (m_sdClient, SOL_SOCKET, SO_RCVBUF, (char*)&nLen, sizeof (nLen)); 
		if (dwRes != 0) 
			throw DtSocketException(0, "Socket buffer", "Failed to disable winsock inbuffer"); 
	} 
 
 
	//Select which events who will Signal our Event 
	if(WSAEventSelect( m_sdClient, hConnect, FD_CONNECT|FD_CLOSE  ) == SOCKET_ERROR) 
	{ 
		SetState(DTSS_DISCONNECTED); 
		throw DtSocketException(WSAGetLastError(), "Connect failed", "WSACreateEvent() for hConnect failed."); 
	} 
 
	//Connect to Host... 
	if (WSAConnect( m_sdClient, (const struct sockaddr *)&SockAddr , sizeof(SockAddr), NULL, NULL, NULL, NULL ) == SOCKET_ERROR) 
	{ 
		DWORD dwError = (DWORD)WSAGetLastError(); 
		if (dwError == WSAEWOULDBLOCK) 
		{ 
			dwRes = WaitForMultipleObjects(2, hEvents, FALSE, INFINITE); 
			switch(dwRes) 
			{ 
			case WAIT_OBJECT_0: 
				WriteLog(Datatal::LP_LOW, "Connect", "Die event was signalled"); 
				WSACloseEvent(hConnect); 
				if (IsThreadRunning()) StopThread(); 
				return; 
 
			case WAIT_OBJECT_0 + 1: 
				WriteLog(Datatal::LP_NORMAL, "Connect", "WSAConnect() OK!"); 
				break; 
 
			default: 
				WSACloseEvent(hConnect); 
				SetState(DTSS_DISCONNECTED); 
				throw DtSocketException(GetLastError(), "Connect failed", "WaitForMultipleObjects() failed."); 
			} 
		} 
		else 
		{ 
			SetState(DTSS_ERROR); 
			throw DtSocketException(WSAGetLastError(), "Connect failed", "WSAConnect failed."); 
		} 
	} 
 
	WSANETWORKEVENTS	NetworkEvents; 
	memset(&NetworkEvents, 0, sizeof(NetworkEvents)); 
	if ( WSAEnumNetworkEvents ( m_sdClient, hConnect, &NetworkEvents) == SOCKET_ERROR) 
	{ 
		SetState(DTSS_ERROR); 
		throw DtSocketException(WSAGetLastError(), "Connect failed", "WSAEnumNetworkEvents() failed."); 
	} 
 
	if (NetworkEvents.lNetworkEvents & FD_CONNECT) 
	{ 
		if (NetworkEvents.iErrorCode[FD_CONNECT_BIT] == 0) 
		{ 
			if (!IsThreadRunning()) StartThread(); 
			SetState(DTSS_CONNECTED); 
			HandleConnect(); 
			return; 
		} 
		else 
		{ 
			SetState(DTSS_ERROR); 
			if (!IsThreadRunning() && GetReconnect()) StartThread();  // Start thread if reconnect is set. 
			throw DtSocketException(NetworkEvents.iErrorCode[FD_CONNECT_BIT], "Connect failed", "FD_CONNECT failed."); 
		} 
		 
	} 
	if (NetworkEvents.lNetworkEvents & FD_CLOSE) 
	{ 
		SetState(DTSS_DISCONNECTED); 
		throw DtSocketException(NetworkEvents.iErrorCode[FD_CLOSE_BIT], "Connect failed", "FD_CLOSE set!"); 
	} 
 
	SetState(DTSS_ERROR); 
} 
 
bool Datatal::DtIpSocket::Disconnect(bool bStopThread) 
{ 
	Datatal::DtSocketBase::Disconnect(bStopThread); 
 
	// Not connected = dont do anything. 
	if (m_sdClient != INVALID_SOCKET) 
	{ 
		shutdown(m_sdClient, SD_BOTH); 
		closesocket( m_sdClient ); 
		m_sdClient = INVALID_SOCKET; 
	} // if (m_bConnected) 
 
	return true; 
} 
 
void Datatal::DtIpSocket::ThreadFunc(HANDLE hStopEvent) 
{ 
	//Async operations 
	OVERLAPPED			osReader;			// Read operations 
	OVERLAPPED			osWriter;			// Write operations 
	bool				bCanRun = true;		// Set to false if server should stop 
	bool				bPendingRead = false;	//Pending read operations 
	bool				bPendingWrite = false;	//Pending write operations 
	bool				bNewData = false;		// Got new data to send. 
	bool				bReadCompleted = false;	// true if a read have been completed. 
	DWORD				dwReadbytes = 0;		//Number of bytes read 
	WSABUF				wsaInBuf;			//Temporary buffer used for reading 
	WSABUF				wsaOutBuf;			//Tempbuf used for writing 
	DWORD				dwRes;				//Resultcode 
	DWORD				dwFlags;			//WSARecv/WSASend flags 
	DWORD				dwWritten;			//Number of bytes that have been written 
	HANDLE				hEvents[4] = {NULL, NULL, NULL, NULL}; 
	BOOL				bSkipWait = false; 
 
	// Create the event for Socket Read operations 
	memset(&osReader, 0, sizeof(OVERLAPPED)); 
	osReader.hEvent = WSACreateEvent(); 
	if ( !osReader.hEvent ) 
	{ 
		WriteLog(1, "misc", "WSACreateEvent() for osReader failed! Error %d.",GetLastError()); 
		return; 
	} 
 
	// Create the event for Socket Write operations 
	memset(&osWriter, 0, sizeof(OVERLAPPED)); 
	osWriter.hEvent = WSACreateEvent(); 
	if ( !osWriter.hEvent ) 
	{ 
		WriteLog(1, "misc", "WSACreateEvent() for osWriter failed! Error %d.",GetLastError()); 
		return; 
	} 
 
	//Init our buffers 
	wsaInBuf.buf = new char[_SOCKETLIB_WORK_SIZE_]; 
	wsaInBuf.len = _SOCKETLIB_WORK_SIZE_; 
	memset(wsaInBuf.buf, 0, _SOCKETLIB_WORK_SIZE_); 
	wsaOutBuf.buf = new char[_SOCKETLIB_WORK_SIZE_]; 
	memset(wsaOutBuf.buf, 0, _SOCKETLIB_WORK_SIZE_); 
	wsaOutBuf.len = 0; 
 
	hEvents[0] = hStopEvent; 
	hEvents[1] = m_hNewDataEvent; 
	hEvents[2] = osWriter.hEvent; 
	hEvents[3] = osReader.hEvent; 
 
 
	//Should only run if we can 
	while (bCanRun) 
	{ 
 
 
		// If an error occured, wait for user action. 
		if (GetState() == DTSS_ERROR || GetState() == DTSS_CONNECTING) 
		{ 
 
			WriteLog(LP_NORMAL, "Error", "In wainting loop"); 
			if (GetReconnect() && GetState() == DTSS_ERROR) 
			{ 
				WriteLog(LP_NORMAL, "Error", "Disconnecting due to error"); 
				Disconnect(); 
			} 
 
			dwRes = WaitForSingleObject(m_hStopEvent, 1000); 
			if (dwRes == WAIT_OBJECT_0) 
				bCanRun = false; 
 
			continue; 
		} 
 
		//	We've been disconnected, wait until disconnect is done and then connect. 
		if (GetState() == DTSS_DISCONNECTED) 
		{ 
			bReadCompleted	= false; 
			bPendingRead	= false; 
			bPendingWrite	= false; 
			bNewData		= false; 
 
			WriteLog(LP_NORMAL, "Error", "In waiting loop2"); 
 
			//Check if we should reconnect. 
			if (GetReconnect()) 
			{ 
				WriteLog(LP_NORMAL, "Connect", "Trying to (re)connect..."); 
				Connect(); 
			} 
 
			if (!IsConnected()) 
			{ 
				/// Let's wait 15 seconds before we try again. 
				WriteLog(3, "misc", "ThreadFunc, Waiting 15 seconds or on die event.."); 
				dwRes = WaitForSingleObject(m_hStopEvent, 15000); 
				if (dwRes == WAIT_OBJECT_0) 
					bCanRun = false; 
 
				continue; 
			} // if (!IsConnected()) 
		}	 
 
 
		if (bCanRun && IsConnected()) 
		{ 
 
			//Wait if we got some data, else init a read. 
			if (bPendingRead) 
				dwRes = WaitForMultipleObjects(4, hEvents, FALSE, INFINITE); 
			else 
				dwRes = 1000; //used to invoke a WSARecv. 
 
			DWORD dwBytesWritten = 0; 
			switch(dwRes) 
			{ 
 
				//stopEvent 
				//======================================================= 
			case WAIT_OBJECT_0: 
				bCanRun = false; 
				continue; 
				break; 
 
 
				// SendData event 
				//======================================================= 
			case WAIT_OBJECT_0 + 1: 
				WriteLog(3, "Send", "New data triggered."); 
				ResetEvent(m_hNewDataEvent); 
				bNewData = true; 
				break; 
 
 
				// Overlapped write is done. 
				//======================================================= 
			case WAIT_OBJECT_0 + 2: 
				WSAGetOverlappedResult(m_sdClient, &osWriter, &dwBytesWritten, FALSE, &dwFlags); 
				WriteLog(Datatal::LP_LOW, "Send", "Write completed, %d bytes", dwBytesWritten); 
				ResetEvent(osWriter.hEvent); 
				HandleSendComplete(); 
				bPendingWrite = false; 
 
				if (m_lOutBuffers.pFirst) 
					bNewData = true; 
 
				break; 
 
				//osreader, pending read is completed 
				//======================================================= 
			case WAIT_OBJECT_0 + 3: 
				WriteLog(3, "Read", "Read completed."); 
				ResetEvent(osReader.hEvent); 
				bReadCompleted = true;		 
				break; 
 
			case 1000: 
				break; 
 
			default: 
				WriteLog(1, "misc", "ThreadFunc, Incorrect dwRes: %d, error: %d", dwRes, GetLastError()); 
				break; 
 
			} 
			if (!bCanRun) 
				break; 
 
			// no pending read. Initiate one. 
			dwReadbytes = 0; 
			if (!bPendingRead) 
			{ 
				dwFlags = 0; 
				WriteLog(Datatal::LP_LOW, "Read", "WSARecv"); 
				memset(wsaInBuf.buf, 0, _SOCKETLIB_WORK_SIZE_); 
				dwRes = WSARecv(m_sdClient, &wsaInBuf, 1, &dwReadbytes , &dwFlags , &osReader, NULL); 
				if (dwRes == SOCKET_ERROR) 
				{ 
 
					//What Error? 
					DWORD dwError = WSAGetLastError(); 
					if (dwError != ERROR_IO_PENDING)  
					{ 
						char szLog[256]; 
						sprintf(szLog, "WSARecv() failed! Error %d.", dwError); 
						SetState(DTSS_ERROR); 
						HandleError(dwError, szLog); 
						continue; 
					} 
 
					bPendingRead = true; 
					bReadCompleted = false; 
					WriteLog(Datatal::LP_LOW, "Read", "pending read..."); 
				} //if (dwRes == SOCKET_ERROR)	 
				else 
				{ 
 
					// 0 bytes recieved = closed socket 
					if (!dwReadbytes) 
					{ 
						SetState(DTSS_ERROR); 
						HandleError(0, "0 bytes recieved, closing socket..."); 
						continue; 
					} 
 
					bPendingRead = false; 
					bReadCompleted = true; 
					WriteLog(Datatal::LP_LOW, "Read", "Completed directly %d bytes", dwReadbytes); 
				} 
			} 
 
			// No pending writes and new data to send. 
			// ======================================================================= 
			if (bReadCompleted) 
			{ 
 
				//If a read was not completed directly, dwReadBytes = 0, fetch overlapped result. 
				if (dwReadbytes == 0) 
				{ 
					dwFlags = 0; 
					WriteLog(Datatal::LP_LOW, "Read", "WSAGetOverlappedResult"); 
					if(!WSAGetOverlappedResult(m_sdClient, &osReader, &dwReadbytes, FALSE, &dwFlags)) 
					{ 
						char szLog[512]; 
						sprintf(szLog, "GetOverlappedResult() failed on Read! Bytes read is %d. Flags=%d.", dwReadbytes, dwFlags); 
						SetState(DTSS_ERROR); 
						HandleError(GetLastError(), szLog); 
						continue; 
					} 
				} 
 
				//Closed? 
				if ( !dwReadbytes ) 
				{ 
					SetState(DTSS_ERROR); 
					HandleError(WSAECONNRESET, "Socket closed by Host when waiting for Read.."); 
					continue; 
				} 
 
				//Lock our inputbuffer 
				m_CritRead.Lock(); 
				WriteLog(Datatal::LP_NORMAL, "Read", "%d bytes read", dwReadbytes); 
				HandleReceive(wsaInBuf.buf, dwReadbytes);; 
				m_CritRead.Unlock(); 
 
				bReadCompleted = false; 
				bPendingRead = false; 
			} //if (!bPendingRead && bReadCompleted) 
 
 
			// No pending writes and new data to send. 
			// ======================================================================= 
			if (!bPendingWrite && bNewData) 
			{ 
				//Lock buffer and fetch data 
				m_CritWrite.Lock(); 
				if (m_lOutBuffers.pFirst && !bPendingWrite) 
				{ 
 
					Outbuffer* pBuffer = m_lOutBuffers.pFirst; 
					if (pBuffer->nSize - m_nBufferPos <= _SOCKETLIB_WORK_SIZE_) 
					{ 
						memcpy(wsaOutBuf.buf, pBuffer->pBuffer + m_nBufferPos, pBuffer->nSize - m_nBufferPos); 
						wsaOutBuf.len = (int)pBuffer->nSize - m_nBufferPos; 
						m_lOutBuffers.RemoveFirst(); 
						WriteLog(Datatal::LP_LOW, "Send", "First/Last outbuffers: %X/%X", m_lOutBuffers.pFirst, m_lOutBuffers.pLast); 
						m_nBufferPos = 0; 
					} 
					else 
					{ 
						memcpy(wsaOutBuf.buf, pBuffer->pBuffer + m_nBufferPos, _SOCKETLIB_WORK_SIZE_); 
						m_nBufferPos += _SOCKETLIB_WORK_SIZE_; 
						wsaOutBuf.len = _SOCKETLIB_WORK_SIZE_; 
					} 
 
					dwWritten	= 0; 
					dwFlags		= 0; 
					if (WSASend(m_sdClient, &wsaOutBuf, 1, &dwWritten , dwFlags , &osWriter, NULL) == SOCKET_ERROR) 
					{ 
						//Overlapped? 
						if (GetLastError() != ERROR_IO_PENDING)  
						{ 
							m_CritWrite.Unlock(); 
							SetState(DTSS_ERROR); 
							HandleError(WSAGetLastError(), "WSASend() failed!"); 
							continue; 
						} 
 
						bPendingWrite = true; //only set pending write if we do not complete directly. 
					} 
					else 
					{ 
						// 0 bytes recieved = closed socket 
						if (!dwWritten) 
						{ 
							SetState(DTSS_ERROR); 
							HandleError(WSAECONNRESET, "0 bytes sent"); 
							m_CritWrite.Unlock(); 
							continue; 
						} 
						HandleSendComplete(); 
					} 
				}  
				m_CritWrite.Unlock(); 
 
				bNewData = false; 
			} //if (!bPendingWrite && bNewData) 
 
 
		} //valid socket 
 
	} // while 
 
	if (wsaInBuf.buf) 
		delete[] wsaInBuf.buf; 
 
	if (wsaOutBuf.buf) 
		delete[] wsaOutBuf.buf; 
 
	WSACloseEvent(osWriter.hEvent); 
	WSACloseEvent(osReader.hEvent); 
 
}