www.pudn.com > mysee.zip > SPClient.cpp


/* 
*  Openmysee 
* 
*  This program is free software; you can redistribute it and/or modify 
*  it under the terms of the GNU General Public License as published by 
*  the Free Software Foundation; either version 2 of the License, or 
*  (at your option) any later version. 
* 
*  This program is distributed in the hope that it will be useful, 
*  but WITHOUT ANY WARRANTY; without even the implied warranty of 
*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
*  GNU General Public License for more details. 
* 
*  You should have received a copy of the GNU General Public License 
*  along with this program; if not, write to the Free Software 
*  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
* 
*/ 
#include "StdAfx.h" 
#include "SPClient.h" 
#include "CaptureServer.h" 
#include "MD5.h" 
 
SPClient::SPClient(CaptureServer* cServer, NormalAddress address, BufferMgr* bufferMgr, LogMgr* log) : m_freeList(8, 1) { 
	cs = cServer; 
	m_bufferMgr = bufferMgr; 
	logFile = log; 
	addr = address; 
	isRunning = TRUE; 
	isLogin = FALSE; 
	m_socket = INVALID_SOCKET; 
	sendPointer = NULL; 
	recvPointer = NULL; 
	recvOff = 0; 
	errStr[0] = 0; 
 
	lastSentBlockID = 0; 
	memset(&header, 0, sizeof(header)); 
	readData = 0; 
 	 
	DWORD threadID; 
	hThread = CreateThread( 
		NULL, 0, (LPTHREAD_START_ROUTINE)(SPClient::RunReceiver), this, 0, &threadID); 
} 
 
SPClient::~SPClient() { 
	isRunning = FALSE; 
	Disconnect(); 
	if(hThread) { 
		// 如果一段时间内不能正常停止,就强行终止线程 
		DWORD ret = WaitForSingleObject(hThread, 5000); 
		if(ret == WAIT_TIMEOUT) { 
			TerminateThread(hThread, 0); 
		} 
		CloseHandle(hThread); 
		hThread = NULL; 
	} 
} 
 
BOOL SPClient::SendRegister() { 
	UINT8 nameSize = cs->cfgData.chnlStr.size(); 
	if(nameSize >= 0xff) 
		return FALSE; 
 
	TCPPacket* packet; 
	if(!SendBegin(packet, CS2SP_REGISTER)) 
		return FALSE; 
 
	//2.write channel name 
	CopyMoveDes(sendPointer, &nameSize, sizeof(nameSize)); 
	CopyMoveDes(sendPointer, cs->cfgData.chnlStr.data(), nameSize); 
 
	//3.write userinfo. 
	CopyMoveDes(sendPointer, &cs->cfgData.userID, sizeof(UINT)); 
	CopyMoveDes(sendPointer, cs->cfgData.password.data(), MD5_LEN); 
 
	//4. ratio ,maxblocksize 
	UINT maxblockSize = BLOCK_SIZE; 
	CopyMoveDes(sendPointer, &maxblockSize, sizeof(maxblockSize)); 
 
	float ratio = cs->GetSpeedInKBPS(); 
	CopyMoveDes(sendPointer, &ratio, sizeof(ratio)); 
	 
	//5. write video & audio 
 	TVMEDIATYPESECTION videoTV, audioTV; 
	PBYTE videoData = NULL; 
	PBYTE audioData = NULL; 
	do{ 
		delete [] videoData; 
		delete [] audioData; 
		Sleep(100); 
		if(!isRunning) { 
			m_freeList.Release(packet); 
			return FALSE; 
		} 
	} 
	while(!cs->GetFormatData(videoTV, videoData, FALSE) || !cs->GetFormatData(audioTV, audioData, TRUE) ); 
 
	// 检查媒体类型数据是否正确,关键是是否为空 
	if(videoTV.cbFormat <= 0 && audioTV.cbFormat <= 0) { 
		MessageBox(cs->parentWindow, "媒体数据为空!请重新启动采集端。", "错误", MB_OK|MB_ICONSTOP); 
		return FALSE; 
	} 
		 
	USHORT channelinfoLen = 2*sizeof(TVMEDIATYPESECTION)+videoTV.cbFormat + audioTV.cbFormat; 
	CopyMoveDes(sendPointer, &channelinfoLen, sizeof(channelinfoLen)); 
 
	CopyMoveDes(sendPointer, &videoTV, sizeof(TVMEDIATYPESECTION)); 
	if(videoData) 
		CopyMoveDes(sendPointer, videoData, videoTV.cbFormat); 
 
	CopyMoveDes(sendPointer, &audioTV, sizeof(TVMEDIATYPESECTION)); 
	if(audioData) 
		CopyMoveDes(sendPointer, audioData, audioTV.cbFormat); 
 
	SendEnd(packet); 
	return TRUE; 
} 
 
/**********发送消息的函数**********/ 
BOOL SPClient::SendBlock() { 
	if(m_sendList.size() > 10) 
		return TRUE; 
	TCPPacket* packet; 
	if(!SendBegin(packet, CS2SP_BLOCK)) 
		return FALSE; 
     
	//2.write ID. 
	CopyMoveDes(sendPointer, &lastSentBlockID, sizeof(lastSentBlockID)); 
 
	//4.write data 
	UINT size = 0; 
	if(!m_bufferMgr->GetBlock(lastSentBlockID, reinterpret_cast(sendPointer+sizeof(size)), size)) { 
		// 一定要释放这个packet 
		m_freeList.Release(packet); 
		return TRUE; 
	} 
	char* hashcode = ""; 
#ifdef DEBUG 
	MD5 md5(reinterpret_cast(sendPointer+sizeof(size)), size); 
	hashcode = md5.hex_digest(); 
#endif 
 
	// verify block 
	char* startPos = sendPointer+sizeof(size); 
	char* tempPos = startPos + 8; 
 
	if(lastSentBlockID == 0) { 
		if(*((UINT*)startPos + 1) != UINT_MAX) 
			tempPos = startPos + *((UINT*)startPos + 1); 
		else 
			tempPos = startPos + size; 
	} 
 
	while(header.size < 1638400) { 
		//写header信息 
		if((UINT)readData < sizeof(header)) { 
			if(startPos+size - tempPos < sizeof(header)-readData){ 
				memcpy((char*)&header + readData, tempPos, startPos+size - tempPos); 
				readData += startPos+size - tempPos; 
				break; 
			} 
			else { 
				memcpy((char*)&header + readData, tempPos, sizeof(header)-readData); 
				tempPos += sizeof(header)-readData;//头的位置tempPos 
				readData = sizeof(header); 
 
				//assert(header.size < 1638400); 
				if(header.size >= 1638400) { 
					logFile->StatusOut("Header size %d, blockID %d, offset %d",  
						header.size, lastSentBlockID, readData); 
					break; 
				} 
			} 
		} 
		if(readData >= sizeof(header)) { 
			if(startPos+size - tempPos < header.size - readData) { 
				readData += startPos+size - tempPos; 
				break; 
			} 
			else { 
				tempPos += header.size - readData; 
				readData = 0; 
			} 
		} 
	} 
	 
    //3.write data size  
	CopyMoveDes(sendPointer, &size, sizeof(size)); 
 
	int firstKeySampleOffset; 
	memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset)); 
	LONGLONG keySample; 
	memcpy(&keySample, packet->buf+sizeof(UINT)*3+sizeof(char)+firstKeySampleOffset, sizeof(keySample)); 
	memcpy(&firstKeySampleOffset, packet->buf+sizeof(UINT)*3+sizeof(char), sizeof(firstKeySampleOffset)); 
	if(firstKeySampleOffset == 0) 
		keySample = 0; 
	char temp[64]; 
	_i64toa(keySample, temp, 10); 
	logFile->StatusOut("Queue Block. ID:%d/%d, offset: %d, keysample: %s, hash %s",  
		lastSentBlockID, m_bufferMgr->GetMaxBlockID(), firstKeySampleOffset, temp, hashcode); 
#ifdef DEBUG 
	delete [] hashcode; 
#endif 
 
	sendPointer += size; 
 
	SendEnd(packet); 
 
	lastSentBlockID++; 
	return TRUE;	 
} 
 
BOOL SPClient::SendUpdate(){ 
	TCPPacket* packet; 
	if(!SendBegin(packet, CS2SP_UPDATE)) 
		return FALSE; 
 
	float ratio = cs->GetSpeedInKBPS(); 
	CopyMoveDes(sendPointer, &ratio, sizeof(ratio)); 
	 
	SendEnd(packet); 
	return TRUE; 
} 
 
BOOL SPClient::SendMediaType() { 
    MediaData mediaData; 
    if(!m_bufferMgr->GetMediaData(lastSentBlockID, mediaData)) 
        return FALSE; 
 
	TCPPacket* packet; 
	if(!SendBegin(packet, CS2SP_MEDIA_TYPE)) 
		return FALSE; 
 
    UINT size = mediaData.audioType.cbFormat + mediaData.videoType.cbFormat +  
        sizeof(mediaData.audioType) + sizeof(mediaData.videoType); 
    CopyMoveDes(sendPointer, &size, sizeof(UINT)); 
 
    CopyMoveDes(sendPointer, &mediaData.videoType, sizeof(mediaData.videoType)); 
    CopyMoveDes(sendPointer, mediaData.videoData, mediaData.videoType.cbFormat); 
    CopyMoveDes(sendPointer, &mediaData.audioType, sizeof(mediaData.audioType)); 
    CopyMoveDes(sendPointer, mediaData.audioData, mediaData.audioType.cbFormat); 
 
	logFile->StatusOut("Queue Block media data. ID:%d/%d", lastSentBlockID, m_bufferMgr->GetMaxBlockID()); 
	 
	SendEnd(packet); 
	return TRUE; 
} 
 
CONNECT_RESULT SPClient::Connecting() { 
	logFile->StatusOut("Connecting to %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); 
 
	CONNECT_RESULT ret = CR_ERROR; 
 
	// Create a TCP/IP socket that is bound to the server. 
	// Microsoft Knowledge Base: WSA_FLAG_OVERLAPPED Is Needed for Non-Blocking Sockets 
	// http://support.microsoft.com/default.aspx?scid=kb;EN-US;179942 
	m_socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); 
	if(m_socket == INVALID_SOCKET) { 
		logFile->StatusErr("Creating socket", WSAGetLastError()); 
		return ret; 
	} 
 
	// 不使用Nagle算法 
	BOOL bNoDelay = TRUE; 
	if(setsockopt(m_socket, SOL_SOCKET, TCP_NODELAY, (const char*)&bNoDelay, sizeof(bNoDelay)) == SOCKET_ERROR) { 
		logFile->StatusErr("Setting UDP socket as TCP_NODELAY", WSAGetLastError()); 
		return ret; 
	} 
 
	// Set this socket as a Non-blocking socket. 
	ULONG flag = 1; 
	if(ioctlsocket(m_socket, FIONBIO, &flag) == SOCKET_ERROR) { 
		logFile->StatusErr("Setting socket as non-blocking", WSAGetLastError()); 
		return ret; 
	} 
 
	// Connect to remote address 
	if(WSAConnect(m_socket, (sockaddr*)&addr, sizeof(sockaddr), NULL, NULL, NULL, NULL) == SOCKET_ERROR) { 
		if(WSAGetLastError() != WSAEWOULDBLOCK) { 
			logFile->StatusErr("Connecting socket", WSAGetLastError()); 
			return ret; 
		} 
		else { 
			ret = CR_WOULDBLOCK; 
			logFile->StatusOut("%s:%d is blocking.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); 
		} 
	} 
	else { 
		ret = CR_CONNECTED; 
		logFile->StatusOut("%s:%d is connected.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); 
	} 
	return ret; 
} 
 
void SPClient::Disconnect() { 
	isLogin = FALSE; 
	m_bufferMgr->StopSave(); 
	ClearTransferInfo(); 
	m_sendList.clear(); 
	if(m_socket != INVALID_SOCKET) { 
		::TE_CloseSocket(m_socket, FALSE); 
		logFile->StatusOut("SPClient: disconnected from %s:%d.", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); 
		m_socket = INVALID_SOCKET; 
	} 
} 
 
BOOL SPClient::BaseRecv() { 
	int ret = recv(m_socket, recvBuf+recvOff, P2P_BUF_SIZE-recvOff, 0); 
	if(ret < 0) { 
		DWORD lastError = ::WSAGetLastError(); 
		if (WSAEWOULDBLOCK != lastError) { 
			logFile->StatusErr("Receiving data on TCP", lastError); 
			return FALSE; 
		} 
		else 
			return TRUE; 
	} 
	else if(0 == ret) { 
		logFile->StatusOut("Connection has been disconnected gracefully."); 
		return FALSE; 
	} 
 
	recvOff += ret; 
	totalDownBytes += ret; 
 
	BOOL retVal = FALSE; 
 
	for(;;) { 
		// because multiple msgs can be received at once. 
		// keep call parseMsg() till !MSG_COMPLETE 
		MSG_STATE ms = ParseMsg();  
 
		switch(ms) { 
		case MSG_COMPLETE: 
			continue; 
		case MSG_UNCOMPLETE: 
			retVal = TRUE; 
			break; 
		case MSG_ERR_SIZE: 
			sprintf(errStr, "错误的消息大小!"); 
			break; 
		case MSG_ERR_TYPE:  
			sprintf(errStr, "错误的消息类型!"); 
			break; 
			break; 
		case MSG_REMOTE_ERR: 
			// 错误信息已经在errStr中了 
			break; 
		default: 
			sprintf(errStr, "未知错误类型!"); 
			break; 
		} 
		if(strlen(errStr) > 0) 
			logFile->StatusOut("错误: %s", errStr); 
		break; 
	} 
	return retVal; 
} 
 
MSG_STATE SPClient::ParseMsg() { 
	// 如果过小,则不是正常的包 
	if(recvOff < sizeof(int)+sizeof(BYTE)) 
		return MSG_UNCOMPLETE; 
 
	// 把移动指针放到数据的起始地址 
	recvPointer = recvBuf; 
 
	// 读取消息大小 
	UINT msgSize; 
	CopyMoveSrc(&msgSize, recvPointer, sizeof(msgSize)); 
 
	// 读取消息类型 
	UINT8 msgType; 
	CopyMoveSrc(&msgType, recvPointer, sizeof(msgType)); 
 
	// msgSize是否正常 
	if(msgSize > P2P_BUF_SIZE || msgSize < sizeof(int)+sizeof(BYTE)) 
		return MSG_ERR_SIZE; 
 
	// 是否包含完成的消息 
	if(recvOff < msgSize) 
		return MSG_UNCOMPLETE; 
 
	MSG_STATE ret = MSG_COMPLETE; 
	switch(msgType) { 
	case SP2CS_WELCOME: 
		ret = OnWelcome(); 
		break; 
	case SP2CS_MSG: 
		ret = OnMsg(); 
		break; 
	default: 
		ret = MSG_ERR_TYPE; 
		break; 
	} 
 
	// copy left data to start of recvBuf 
	if(recvOff >= msgSize) { 
		memcpy(recvBuf, recvBuf+msgSize, recvOff-msgSize); 
		recvOff -= msgSize; 
	} 
 
	return ret; 
} 
 
//解析注册成功消息 
MSG_STATE SPClient::OnWelcome() { 
	UINT startBlockID; 
	CopyMoveSrc(&startBlockID, recvPointer, sizeof(startBlockID)); 
 
	lastSentBlockID = startBlockID; 
	logFile->StatusOut("Start Block %d.", lastSentBlockID); 
 
	isLogin = TRUE; 
	return MSG_COMPLETE; 
} 
 
//收到SP_MSG消息 
MSG_STATE SPClient::OnMsg() { 
	// 错误代码 
	UINT16 errCode; 
	CopyMoveSrc(&errCode, recvPointer, sizeof(errCode)); 
 
	// 是否需要断开连接 
	bool shouldDisconnect; 
	CopyMoveSrc(&shouldDisconnect, recvPointer, sizeof(shouldDisconnect)); 
 
	// 根据错误代码处理 
	switch(errCode) { 
	case ERR_PROTOCOL_FORMAT: 
		sprintf(errStr, "协议错误"); 
		break; 
	case ERR_AUTHORIZATION: 
		sprintf(errStr, "验证错误"); 
		break; 
	case ERR_INTERNAL: 
		sprintf(errStr, "未知错误"); 
		break; 
	default: 
		shouldDisconnect = true; 
	} 
	if(shouldDisconnect) { 
		return MSG_REMOTE_ERR; 
	} 
 
	return MSG_COMPLETE; 
} 
 
BOOL SPClient::SendPackets() { 
	if(m_sendList.empty()) 
		return TRUE; 
 
	TCPPacket* packet = m_sendList.front(); 
	m_sendList.pop_front(); 
 
	int ret = send(m_socket, packet->buf+packet->sent, min(packet->size-packet->sent, 40000), 0); 
	if(SOCKET_ERROR == ret) { 
		DWORD lastError = ::WSAGetLastError(); 
		if (WSAEWOULDBLOCK == lastError) { 
			m_sendList.push_front(packet); 
		} 
		else { 
			m_sendList.push_front(packet); 
			logFile->StatusErr("Sending data on TCP", lastError); 
			return FALSE; 
		} 
	} 
	else { 
		totalUpBytes += ret; 
		logFile->StatusOut("Send data %d of %d", ret, packet->size-packet->sent); 
		assert(ret <= packet->size-packet->sent); 
		if(ret == packet->size-packet->sent) { 
			if(packet->size > 16384) 
				logFile->StatusOut("Sent Block %d.", *packet->GetBlockID()); 
			// 已经发送完毕,释放Buffer 
			m_freeList.Release(packet); 
		} 
		else {	// 尚未发送完毕,下次继续发送 
			packet->sent += ret; 
			m_sendList.push_front(packet); 
			logFile->StatusOut("!!!!!!!!!!!"); 
		} 
	} 
	return TRUE; 
} 
 
void SPClient::RunReceiver(SPClient* client) 
{ 
	timeval timeout;  
	DWORD lastSentUpdate = 0; 
 
	fd_set read_set, write_set; 
 
	DWORD lastManageTime=0, currTime=0; 
 
	while(client->isRunning) 
	{ 
		if(client->m_socket == INVALID_SOCKET) 
		{ 
			if(!client->m_bufferMgr->ShouldConnect()) { 
				Sleep(500); 
				continue; 
			} 
			CONNECT_RESULT ret = client->Connecting(); 
			if(ret == CR_WOULDBLOCK) { 
				// 等待8秒钟,看能否连接上 
				FD_ZERO(&write_set); 
				FD_SET(client->m_socket, &write_set); 
				timeout.tv_sec = 8; 
				timeout.tv_usec = 0; 
				int s = select(0, NULL, &write_set, NULL, &timeout); 
				if(s > 0) 
					ret = CR_CONNECTED; 
			} 
			if(ret == CR_CONNECTED) { 
				client->lastSentBlockID = 0; 
				client->readData = 0; 
				// 可以开始存储数据了 
				client->m_bufferMgr->StartSave(); 
			} 
			else { 
				TE_CloseSocket(client->m_socket, FALSE); 
				client->m_socket = INVALID_SOCKET; 
				client->logFile->StatusOut("无法连接SP,请检查网络。"); 
				//MessageBox(NULL, "无法连接SP,请检查网络。然后重新打开本程序。", "错误", MB_OK|MB_ICONINFORMATION); 
                for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) { 
                    if(!client->isRunning) 
                        break; 
				    Sleep(1000); 
                } 
				continue; 
			} 
		} 
		if(client->SendRegister()) { 
			// 开始接收发送数据 
			while(client->isRunning) { 
				// 获取当前时间 
				currTime = timeGetTime(); 
 
				timeout.tv_sec = 0; 
				timeout.tv_usec = 20000; 
				FD_ZERO(&read_set); 
				FD_SET(client->m_socket, &read_set); 
				int s = select(0, &read_set, 0, NULL, &timeout); 
				if(s > 0) { 
                    if(FD_ISSET(client->m_socket, &read_set)) { 
                        // 接收信息 
                        if(!client->BaseRecv()) { 
                            for(int i = 0; i < client->cs->cfgData.reconnectSecond; ++i) { 
                                if(!client->isRunning) 
                                    break; 
                                Sleep(1000); 
                            } 
                            break; 
						} 
					} 
				} 
 				else if(s == SOCKET_ERROR) { 
					client->logFile->StatusErr("selecting", WSAGetLastError()); 
					Sleep(1); // prevent dead loop 
				} 
				if(currTime-lastManageTime >= 10000) { 
					lastManageTime = currTime; 
					// 生成传输信息并打印 
					client->GenerateTransferInfo(TRUE); 
					client->logFile->StatusOut("Cur: (%.2f/%.2f)KB/s. Avg: (%.2f/%.2f)KB/s. Total: %.2f/%.2fMB.",  
						client->currDownSpeed/1024, client->currUpSpeed/1024,  
						client->avgDownSpeed/1024, client->avgUpSpeed/1024,  
						client->totalDownBytes/1024.f/1024.f, client->totalUpBytes/1024.f/1024.f); 
				} 
 
				if(!client->m_bufferMgr->CheckRecvingSample()) { 
					client->logFile->StatusOut("No Sample Any More!"); 
					//MessageBox(client->cs->parentWindow, "CaptureServer 20秒钟没有接收到Sample了!", "Sample中断", MB_OK|MB_ICONSTOP); 
					if(client->cs->parentWindow) 
						SendMessage(client->cs->parentWindow, WM_NOMORESAMPLE, 0, 0); 
				} 
 
				// 发送数据 
                if(client->isLogin && client->isRunning) { 
                    // 目前SP尚不支持这个协议 
                    //client->SendMediaType(); 
					client->SendBlock(); 
                } 
				if(currTime-lastSentUpdate > 10000) { 
					if(!client->SendUpdate()) 
						break; 
					lastSentUpdate = currTime; 
				} 
				if(!client->SendPackets()) 
					break; 
			} 
		} 
		client->Disconnect(); 
	} 
} 
 
BOOL SPClient::SendBegin(TCPPacket*& packet, UINT8 msgType) { 
	packet= m_freeList.Allocate(); 
	if(!packet) 
		return FALSE; 
 
	// 先留着消息大小不写,到最后再写 
	sendPointer = packet->buf+sizeof(UINT); 
	CopyMoveDes(sendPointer, &msgType, sizeof(msgType)); 
 
	return TRUE; 
} 
 
void SPClient::SendEnd(TCPPacket*& packet) { 
	// 消息的大小就是移动的指针减去初始的指针 
	packet->size = sendPointer-packet->buf; 
	packet->sent = 0; 
	memcpy(packet->buf, &packet->size, sizeof(packet->size)); 
 
	m_sendList.push_back(packet); 
} 
 
void SPClient::CopyMoveSrc(void * des, const char *& src, size_t size) { 
	assert(des && src); 
	if(!des || !src) 
		return; 
	memcpy(des, src, size); 
	src += size; 
} 
 
void SPClient::CopyMoveDes(char *& des, const void * src, size_t size) { 
	assert(des && src); 
	if(!des || !src) 
		return; 
	memcpy(des, src, size); 
	des += size; 
}