www.pudn.com > ChatRoom.zip > IOCPServerPackage.cpp, change:2011-08-01,size:7225b


#include "StdAfx.h" 
#include "IOCPServerPackage.h" 
 
 
CIOCPServerPackage::CIOCPServerPackage(CWnd* pParent) 
{ 
	m_pParent = pParent; 
 
	m_bInitWinSockFlag = FALSE; 
 
	m_nThreadNum = 0; 
 
	m_hCompletePort = NULL; 
 
	m_socketListen = NULL; 
 
	m_strErrorMsg = _T(""); 
} 
 
CIOCPServerPackage::~CIOCPServerPackage(void) 
{ 
	CloseSocket(); 
} 
 
void CIOCPServerPackage::CloseSocket() 
{ 
	while(m_nThreadNum > 0) 
	{ 
		PostQueuedCompletionStatus(m_hCompletePort, 0, NULL, NULL); 
		Sleep(100); 
	} 
 
	CSocketOverlap* pSocketOlp = NULL; 
	POSITION pos = m_lstSocketOlp.GetHeadPosition(); 
	while(pos) 
	{ 
		pSocketOlp = m_lstSocketOlp.GetNext(pos); 
		if(pSocketOlp->m_socket) 
		{ 
			closesocket(pSocketOlp->m_socket); 
			pSocketOlp->m_socket = NULL; 
		} 
 
		if(pSocketOlp->m_overLapped.hEvent) 
		{ 
			WSACloseEvent(pSocketOlp->m_overLapped.hEvent); 
			pSocketOlp->m_overLapped.hEvent = NULL; 
		} 
 
		delete pSocketOlp; 
	} 
	m_lstSocketOlp.RemoveAll(); 
 
	while(m_lstSocketOlp.GetHeadPosition()) 
	{ 
		delete m_lstSocketOlp.RemoveHead(); 
	} 
 
	if(m_hCompletePort) 
	{ 
		CloseHandle(m_hCompletePort); 
		m_hCompletePort = NULL; 
	} 
 
	if(m_socketListen) 
	{ 
		closesocket(m_socketListen); 
		m_socketListen = NULL; 
	} 
 
	if(m_bInitWinSockFlag) 
	{ 
		WSACleanup(); 
	} 
} 
BOOL CIOCPServerPackage::InitWinSock() 
{ 
	if(!m_bInitWinSockFlag) 
	{ 
		WORD wRequestVersion; 
		WSADATA wsaData; 
		int nRet; 
 
		wRequestVersion = 2<<8 | 2; 
 
		nRet = WSAStartup(wRequestVersion, &wsaData); 
 
		if(nRet != 0) 
		{ 
			m_strErrorMsg = _T("加载库失败!"); 
			return FALSE; 
		} 
 
		if((wsaData.wVersion>>8) != 2 || (wsaData.wVersion &0xff) != 2) 
		{ 
			m_strErrorMsg = _T("库的版本不对!"); 
			WSACleanup(); 
			return FALSE; 
		} 
 
		m_bInitWinSockFlag = TRUE; 
	} 
 
	return TRUE; 
} 
 
 
 
 
BOOL CIOCPServerPackage::PostAccept() 
{ 
	DWORD dwDataSize = 0; 
 
	CSocketOverlap* pItem = new CSocketOverlap; 
	m_lstSocketOlp.AddTail(pItem); 
 
	pItem->m_nIoType = TYPE_ACCEPT; 
	pItem->m_socket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 
 
	BOOL bRet = AcceptEx(m_socketListen, pItem->m_socket, pItem->m_szBuf, NULL, sizeof(sockaddr_in)+16, sizeof(sockaddr_in)+16, &dwDataSize, &pItem->m_overLapped); 
	if(!bRet) 
	{ 
		if(GetLastError() != ERROR_IO_PENDING) 
		{ 
			return FALSE; 
		} 
	} 
 
	// 
	CreateIoCompletionPort((HANDLE)pItem->m_socket, m_hCompletePort, (ULONG_PTR)&pItem->m_socket, 0); 
 
	return TRUE; 
} 
 
BOOL CIOCPServerPackage::PostRecv(CSocketOverlap* pItem) 
{ 
	memset(pItem->m_szBuf, 0, sizeof(pItem->m_szBuf)); 
	DWORD dwRecvNum = 0; 
	DWORD dwFlag = 0; 
	WSABUF wsaBuf; 
	wsaBuf.buf = pItem->m_szBuf; 
	wsaBuf.len = MAX_BUF_SIZE; 
	pItem->m_nIoType = TYPE_READ; 
	int nRet = WSARecv(pItem->m_socket, &wsaBuf, 1, &dwRecvNum, &dwFlag, &pItem->m_overLapped, FALSE); 
	if(nRet != 0) 
	{ 
		if(GetLastError() != ERROR_IO_PENDING) 
		{ 
			return FALSE; 
		} 
	} 
	return TRUE; 
} 
 
BOOL CIOCPServerPackage::SendData(char *pSendData, DWORD dwDataSize) 
{ 
	POSITION pos = m_lstSocketOlp.GetHeadPosition(); 
	CSocketOverlap *pItem = NULL; 
	WSABUF wsaBuf; 
	DWORD dwRecvNum = 0,dwFlag = 0;		//必须赋初值 
	while(pos) 
	{ 
		pItem = m_lstSocketOlp.GetNext(pos); 
		if (pItem->m_socket) 
		{ 
			wsaBuf.buf = pSendData; 
			wsaBuf.len = dwDataSize; 
			if (SOCKET_ERROR == WSASend(pItem->m_socket, &wsaBuf, 1, &dwRecvNum, dwFlag, &pItem->m_overLapped, NULL)) 
			{ 
				if (WSAGetLastError() != ERROR_IO_PENDING) 
				{ 
					int nResult = GetLastError(); 
					return FALSE; 
				} 
			} 
		} 
	} 
	return TRUE; 
} 
//发送消息 
BOOL CIOCPServerPackage::SendMsg(CString strMsg) 
{ 
	BOOL bRet = FALSE; 
	if (m_pParent) 
	{ 
		bRet = m_pParent->SendMessage(UM_SHOW_MSG, (WPARAM)strMsg.GetBuffer(),(LPARAM)(sizeof(TCHAR)*(strMsg.GetLength()+1))); 
	} 
	return bRet; 
} 
BOOL CIOCPServerPackage::StartServer(UINT nListenPort) 
{	 
	if(!InitWinSock()) 
	{ 
		m_strErrorMsg = _T("加载winsock库失败!"); 
		CloseSocket(); 
		return FALSE; 
	} 
	m_socketListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 
	if(INVALID_SOCKET == m_socketListen) 
	{ 
		m_strErrorMsg = _T("创建监听socket失败!"); 
		CloseSocket(); 
		return FALSE; 
	} 
	sockaddr_in sockAddrServer; 
	sockAddrServer.sin_family = AF_INET; 
	sockAddrServer.sin_addr.s_addr = inet_addr("127.0.0.1"); 
	sockAddrServer.sin_port = htons(nListenPort); 
	if(SOCKET_ERROR == bind(m_socketListen, (const SOCKADDR*)&sockAddrServer, sizeof(sockAddrServer))) 
	{ 
		m_strErrorMsg = _T("bind错误!"); 
		CloseSocket(); 
		return FALSE; 
	} 
	if(SOCKET_ERROR == listen(m_socketListen, SOMAXCONN)) 
	{ 
		m_strErrorMsg = _T("listen失败!"); 
		CloseSocket(); 
		return FALSE; 
} 
 
//加入完成端口模型 
m_hCompletePort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0); 
if(INVALID_HANDLE_VALUE == m_hCompletePort) 
{ 
	m_strErrorMsg = _T("创建完成端口失败!"); 
	CloseSocket(); 
	return FALSE; 
} 
 
CreateIoCompletionPort((HANDLE)m_socketListen, m_hCompletePort, (ULONG_PTR)&m_socketListen, 0); 
PostAccept(); 
 
SYSTEM_INFO sysInfo; 
GetSystemInfo(&sysInfo); 
int nThreadNum = sysInfo.dwNumberOfProcessors*2; 
for(int i=0; i<nThreadNum; i++) 
{ 
	HANDLE hThread = NULL; 
	hThread = CreateThread(NULL, 0, ThreadWorking, this, CREATE_SUSPENDED, 0); 
	if(hThread != INVALID_HANDLE_VALUE) 
	{ 
		Sleep(20); 
		ResumeThread(hThread); 
		CloseHandle(hThread); 
	} 
} 
return TRUE; 
} 
BOOL CIOCPServerPackage::StopServer() 
{ 
	CloseSocket(); 
	return TRUE; 
} 
 
DWORD WINAPI CIOCPServerPackage::ThreadWorking(LPVOID lpVoid) 
{ 
	CIOCPServerPackage* pThis = (CIOCPServerPackage*)lpVoid; 
	DWORD dwRecvNum = 0; 
	SOCKET *pSocket = NULL; 
	CSocketOverlap* pSocketOlp = NULL; 
	BOOL bRet = FALSE; 
	CString strValue; 
 
	InterlockedIncrement(&pThis->m_nThreadNum); 
 
	while(1) 
	{ 
		bRet = GetQueuedCompletionStatus(pThis->m_hCompletePort, &dwRecvNum, (PULONG_PTR)&pSocket,(LPOVERLAPPED*)&pSocketOlp, INFINITE); 
		if(!bRet) 
		{ 
			continue; 
		} 
		int nRet = GetLastError(); 
		if(dwRecvNum <= 0 && pSocket && (TYPE_WRITE == pSocketOlp->m_nIoType || TYPE_READ == pSocketOlp->m_nIoType)) 
		{ 
			pThis->SendMsg(_T("客户端已下线!")); 
			//closesocket(pSocketOlp->m_socket); 
			WSACloseEvent(pSocketOlp->m_overLapped.hEvent); 
			pThis->m_lstSocketOlp.RemoveAt(pThis->m_lstSocketOlp.Find(pSocketOlp)); 
			delete pSocketOlp; 
		} 
 
		if(pSocket && pSocketOlp) 
		{ 
			WSAResetEvent(pSocketOlp->m_overLapped.hEvent); 
 
			switch(pSocketOlp->m_nIoType) 
			{ 
			case TYPE_ACCEPT: 
				{ 
					//TODO:得到客户端的IP地址 
					strValue.Format(_T("客户端:%d已连接"),pSocketOlp->m_socket); 
					pThis->SendMsg(strValue); 
					pThis->PostAccept(); 
					pThis->PostRecv(pSocketOlp); 
					::QueueUserWorkItem(ThreadWorking, pThis, WT_EXECUTELONGFUNCTION); 
					break; 
				} 
			case TYPE_READ: 
				strValue.Format(_T("%d说:%s"), pSocketOlp->m_socket, pSocketOlp->m_szBuf); 
				pThis->SendMsg(strValue); 
				pThis->PostRecv(pSocketOlp); 
				break; 
			case TYPE_WRITE: 
				break; 
 
			default: 
				break; 
			} 
		} 
		else if(!pSocket && !pSocketOlp) 
		{ 
			break; 
		} 
	} 
 
	InterlockedDecrement(&pThis->m_nThreadNum); 
 
	return 1; 
}