www.pudn.com > MyLinuxThread.rar > RecvMessThread.cpp, change:2007-05-25,size:1758b


#include "RecvMessThread.h"
#include "Log.h"

G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool)
{
	counter = 0;
	epfd = epoll_create(256);

	g_data = new G_Data(this);
}

G_RecvMessThread::~G_RecvMessThread()
{
	close(epfd);
}

unsigned int G_RecvMessThread::getCounter()
{
	return counter;
}

bool G_RecvMessThread::setNonBlock(int sockfd)
{
	int opts = fcntl(sockfd , F_GETFL);
	if(-1 == opts)
	{
		debug_output("%s\n" , "fcntl F_GETFL is faild");
		return false;
	}

	opts = opts | O_NONBLOCK;
	if(fcntl(sockfd , F_SETFL , opts)  0)
	{
		debug_output("%s\n" , "fcntl F_SETFL is faild");
		return false;
	}
	return true;
}

void G_RecvMessThread::addSocket(int nSocket)
{
	struct epoll_event ev;
	bzero(&ev , sizeof(ev));

	setNonBlock(nSocket);
	ev.data.fd = nSocket;
	ev.events = EPOLLIN | EPOLLET;
	epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev);
	counter++;
}

bool G_RecvMessThread::pushData(std::string pStr)
{
	return g_dataBufferQueue.push(pStr);
}

void G_RecvMessThread::Run()
{
	pause();    /// 暂停线程

	int nfds , sock;
	struct epoll_event ev;
	bool nRet;
	char line[1024];
	while(1)
	{
		nfds = epoll_wait(epfd,events,100,50);
		for(int i=0; i<nfds; i++)
		{
			if(events[i].events&EPOLLIN)
			{
				if((sock = events[i].data.fd)  0)
					continue;
				if(!(nRet=g_data->recvData(sock)))
				{
					debug_output("client is quit\n");
					ev.data.fd= sock;
					epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);
					close(sock);
					events[i].data.fd = -1;
					counter --;
				}
				else
				{
					std::string pBuffer;
					while(g_dataBufferQueue.size())
					{
						g_dataBufferQueue.pop(pBuffer);
						g_threadPool->recvMessage((void*)pBuffer.c_str() , sock);
					}
				}
				usleep(100);
			}
		}
	}
}