www.pudn.com > net_oss.rar > thread_pool_server.cc


#include 
#include 
#include 
#include 
#include "thread_pool_server.h"
using namespace std;

thread_pool_server::thread_pool_server(int i, key_t key) : thread_pool(i,key)
{}
void* processing(void* v)
{
        #ifdef __DEBUG
        cout << " # " << pthread_self() << " is serving you. " << endl;
        #endif
        thread_pool_server::_mesg msg;    
        
        thread_pool_server::info inf;
        
        ofstream fout("result.dat",ios::app);
        pthread_detach(pthread_self());
        while(1)
        {
                memset(msg.inf,0x00,sizeof(inf));
                if(msgrcv(*((int*)v),&msg,sizeof(msg),0,0)<0) 
                {
                				#ifdef __DEBUG
                        cout << strerror(errno) << endl;
                        #endif
                }
                
                memcpy(&inf,msg.inf,sizeof(inf));
                fout << inf.user_name << "\t" << inf.start_time << "\t" \
                << "\t" << inf.IP << "\t" << inf.duration << endl;
                
        }
}


void child_wait(int s)
{
	while(waitpid(-1,0,WNOHANG)>0);
	signal(s,child_wait);
}

void thread_pool_server::start(int port)
{
		
    //create threads
    for(int i = 0; i < total; i++)
    {
            pthread_t tid;
            if(pthread_create(&tid,NULL,processing,&mqid)<0) exit(-1);
    }
      _start_listener(port);
}       

int thread_pool_server::operate()
{
	socklen_t len = sizeof(sockaddr_in);
	_mesg buf;
	while(1)
  {
  	int connected = accept(fd,(struct sockaddr*)&client,&len);
  	if(connected < 0 ) 
  	{
  		if(errno == EINTR) continue;
  		else
  		{
  			close(fd);
  			exit(-1);
  		}
  	}
  	pid_t id = fork();
  	if(id == 0)
  	{
  		close(fd);
  		while(1)
  		{
          memset(buf.inf,0x00,sizeof(info));
          if(read(connected,buf.inf,sizeof(info))<0)   
                  break;
          buf.mtype = 1;
          if(msgsnd(mqid,&buf,sizeof(buf),0)<0) ERR_REPORT();
  		}
  	}
  	else
  	{
  		close(connected);
  		continue;
  	}
  }		
		
}
void thread_pool_server::_start_listener(int port)
{
	int val = -1;
  fd = socket(AF_INET,SOCK_STREAM,0);
  if(fd < 0 ) ERR_REPORT();

  socklen_t len = sizeof(client),lens = sizeof(client);
  
  serv.sin_family = AF_INET;
  serv.sin_port = htons(port);
  serv.sin_addr.s_addr = htonl(INADDR_ANY);


	getsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,&lens);
  if(val == 0) 
  {
      val = 1;
      setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,lens);
  }
  getsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&val,&lens);
  if(val == 0) 
  {
      val = 1;
      setsockopt(fd,SOL_SOCKET,SO_KEEPALIVE,&val,lens);
  }

  if(bind(fd,(const struct sockaddr*)&serv,sizeof(serv))<0) ERR_REPORT();
  
  listen(fd,LISTENQ);
  
  if(signal(SIGCHLD,child_wait)==SIG_ERR) ERR_REPORT();
  
  if(operate()<0)
  {
  	ERR_REPORT();
  }
  
}