www.pudn.com > ChatRoom.zip > IOCPServer.cpp, change:2011-07-28,size:6039b


#include "StdAfx.h" 
#include "IOCPServer.h" 
#include <MSWSock.h> 
 
CIOCPServer::CIOCPServer(void) 
{ 
	m_strError = _T(""); 
 
	m_socketListen = NULL; 
	m_hCompletionPort = NULL; 
 
	m_nThreadNum = 0; 
} 
 
CIOCPServer::~CIOCPServer(void) 
{ 
} 
 
CPerSocketData* CIOCPServer::AssignSocketToCompletionPort(SOCKET hSocket, BOOL bListenSocket) 
{ 
	if(hSocket != NULL) 
	{ 
		CPerSocketData* pSocketData = new CPerSocketData; 
		if(pSocketData) 
		{ 
			if(bListenSocket) 
			{ 
				pSocketData->m_socketListen = hSocket; 
			} 
			else 
			{ 
				pSocketData->m_socketClient = hSocket; 
			} 
 
 
			m_lstSocketData.AddTail(pSocketData); 
 
			CreateIoCompletionPort((HANDLE)hSocket, m_hCompletionPort, (ULONG_PTR)pSocketData, 0); 
 
			return pSocketData; 
		} 
	} 
 
	return NULL; 
} 
 
BOOL CIOCPServer::InitWinSock() 
{ 
	WSADATA wsaData; 
	int nResult = WSAStartup(MAKEWORD(2,2), &wsaData); 
 
	if(nResult != NO_ERROR) 
	{ 
		WSACleanup( ); 
		return FALSE; 
	} 
 
	if(LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 )  
	{ 
		WSACleanup( ); 
		return FALSE;  
	} 
 
	return TRUE; 
} 
 
void CIOCPServer::CloseSocket() 
{ 
	if(m_socketListen) 
	{ 
		closesocket(m_socketListen); 
	} 
 
	if(m_hCompletionPort) 
	{ 
		CloseHandle(m_hCompletionPort); 
	} 
 
	WSACleanup(); 
} 
 
CString CIOCPServer::GetLastErrorMsg() 
{ 
	return m_strError; 
} 
 
BOOL CIOCPServer::PostAccept(CPerSocketData* pSocketData) 
{ 
	if(pSocketData) 
	{ 
		DWORD dwFlags = 0; 
 
		COverlappedEx* pOverLapEx = new COverlappedEx(TYPE_ACCEPT); 
 
		m_lstOverLapEx.AddTail(pOverLapEx); 
 
		pSocketData->m_socketClient = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 
 
		BOOL bRet = AcceptEx(pSocketData->m_socketListen, pSocketData->m_socketClient, pOverLapEx->m_szBuf, 0, 
			sizeof(sockaddr_in)+16, sizeof(sockaddr_in)+16, &dwFlags, &(pOverLapEx->m_overLapped)); 
		if(!bRet) 
		{ 
			if(GetLastError() != ERROR_IO_PENDING) 
			{ 
				return FALSE; 
			} 
		} 
 
		return TRUE; 
	} 
 
	return FALSE; 
} 
 
BOOL CIOCPServer::StartServer(UINT nListenPort) 
{ 
	SYSTEM_INFO sysInfo = {0}; 
 
	//1.加载Winsock库 
	if(!InitWinSock()) 
	{ 
		m_strError = _T("加载Winsock库失败!"); 
		return FALSE; 
	} 
 
	//2.创建一个异步的socket 
	m_socketListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 
 
	//3.BIND 
	sockaddr_in sockAddrServer; 
	sockAddrServer.sin_family = AF_INET; 
	sockAddrServer.sin_addr.s_addr = INADDR_ANY; 
	sockAddrServer.sin_port = htons(nListenPort); 
 
	if(SOCKET_ERROR == bind(m_socketListen, (SOCKADDR*)&sockAddrServer, sizeof(sockaddr_in))) 
	{ 
		m_strError = _T("绑定失败!"); 
		CloseSocket(); 
		return FALSE; 
	} 
 
	//4.监听 
	if(SOCKET_ERROR == listen(m_socketListen, SOMAXCONN)) 
	{ 
		m_strError = _T("监听失败!"); 
		CloseSocket(); 
		return FALSE; 
	} 
 
	//5.加入完成端口 
	//5.1创建一个完成端口 
	m_hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); 
	if(NULL == m_hCompletionPort) 
	{ 
		m_strError = _T("创建完成端口失败!"); 
		CloseSocket(); 
		return FALSE; 
	} 
 
	//5.2将监听SOCKET绑定到完成端口上 
	CPerSocketData* pSocketData = AssignSocketToCompletionPort(m_socketListen, TRUE); 
	if(NULL == pSocketData) 
	{ 
		m_strError = _T("将监听SOCKET绑定到完成端口上失败!"); 
		CloseSocket(); 
 
		return FALSE; 
	} 
 
	//5.3 发送一个Accept请求 
	PostAccept(pSocketData); 
 
	//5.4得到当前CPU的个数 
	GetSystemInfo(&sysInfo); 
	for(int i=0; i<sysInfo.dwNumberOfProcessors; i++) 
	{ 
		CreateThread(NULL, 0, ThreadTalking, this, 0, 0); 
	} 
 
	return TRUE; 
} 
 
DWORD WINAPI CIOCPServer::ThreadTalking(LPVOID lParam) 
{ 
	CIOCPServer* pThis = (CIOCPServer*)lParam; 
	BOOL bRet = FALSE; 
	DWORD dwTrans = 0; 
	CPerSocketData* pSocketData = NULL; 
	COverlappedEx* pOverLapEx = NULL; 
 
	while(1) 
	{ 
		bRet = GetQueuedCompletionStatus(pThis->m_hCompletionPort, &dwTrans, (PULONG_PTR)&pSocketData,  
			(LPOVERLAPPED*)&pOverLapEx, INFINITE); 
		if(!bRet) 
		{ 
			continue; 
		} 
 
		//客户端下线,删除客户端:依次从链表中删除 
		if(0 == dwTrans && pSocketData && (TYPE_READ == pOverLapEx->m_nIoType || TYPE_WRITE == pOverLapEx->m_nIoType)) 
		{ 
			closesocket(pSocketData->m_socketClient); 
			closesocket(pSocketData->m_socketListen); 
			pThis->m_lstSocketData.RemoveAt(pThis->m_lstSocketData.Find(pSocketData)); 
			delete pSocketData; 
 
			WSACloseEvent(pOverLapEx->m_overLapped.hEvent); 
			pThis->m_lstOverLapEx.RemoveAt(pThis->m_lstOverLapEx.Find(pOverLapEx)); 
			delete pOverLapEx; 
		} 
		 
		//将异步事件置为无信号状态 
		if(pOverLapEx) 
		{ 
			WSAResetEvent(pOverLapEx->m_overLapped.hEvent); 
		} 
 
		if(pOverLapEx && pSocketData) 
		{ 
			switch(pOverLapEx->m_nIoType) 
			{ 
			case TYPE_ACCEPT: 
				{ 
					if(INVALID_SOCKET == pSocketData->m_socketClient) 
					{ 
						continue; 
					} 
					QueueUserWorkItem(ThreadTalking, pThis, WT_EXECUTEINLONGTHREAD); 
					//把客户端绑定到完成端口上 
					CPerSocketData* pNewPerSocketData = pThis->AssignSocketToCompletionPort(pSocketData->m_socketClient, FALSE); 
 
					//投递一个接收数据的请求 
					pThis->PostRecv(pNewPerSocketData); 
					pThis->PostAccept(pSocketData); 
					break; 
				} 
			case TYPE_READ: 
				//TODO:显示数据 
 
				//重新投递一个接收数据的请求 
				pThis->PostRecv(pSocketData); 
				break; 
			case TYPE_WRITE: 
				break; 
 
			} 
		} 
		else if(!pSocketData && !pOverLapEx) 
		{ 
			break; 
		} 
	} 
 
	return 1; 
} 
BOOL CIOCPServer::PostRecv(CPerSocketData* pSocketData) 
{ 
	COverlappedEx *pItem = NULL; 
	DWORD dwByteRecv = 0,dwFlag = 0; 
	WSABUF wsaBuf = {0}; 
	memset(pItem->m_szBuf, 0, MAX_BUF_SIZE); 
 
	wsaBuf.buf = pItem->m_szBuf; 
	wsaBuf.len = MAX_BUF_SIZE; 
 
	pItem->m_nIoType = TYPE_READ; 
 
	int nRet = WSARecv(pSocketData->m_socketClient, &wsaBuf, 1, &dwByteRecv, &dwFlag, &(pItem->m_overLapped),NULL); 
	if(nRet != NO_ERROR) 
	{ 
		if(GetLastError() != WSA_IO_PENDING) 
		{ 
			return FALSE; 
		} 
	} 
 
	return TRUE; 
} 
BOOL CIOCPServer::PostSend(LPCTSTR lpszText, DWORD dwLength) 
{ 
	return TRUE; 
}