www.pudn.com > fifo.rar > fifo.c


//fifo.c

#include "fifo.h"

//**************************************************************
//datadispatch module
//	1. dispatch one data source to multiple outputs
//	2. input packet size and output packet size can be variable
//	3. if [data cache] is full, input function don't put data into [data cache] and log the error.
//
//	                   |------>[data cache]======>
//	[data source]----->|
//	                   |------>[data cache]======>
//	                   |
//	                   |------>[data cache]======>
//	                   |
//
//	--------> input		======>get from cache
//
//
//Usage:
//	1.call dispInit().
//	2.Call regOutputModule(): once to register one output. This function is not thread safe.
//	3.The input thread call datadispInput() to input data; and each output thread call getdatadisp() to get data from cache.
//	4.After input and output threads stopped, then call disregAllOutputModules() to unregister all output modules.



DISP_HANDLE dispInit()
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)malloc(sizeof(DISP_OUTPUT));
	handle->dispindex = 0;
	return handle;
}

//register an output module
//return output module ID, error: <0
int regOutputModule(DISP_HANDLE h,DWORD fifoSize)
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
	DISP_OUPUT_CORE * pCore = &(handle->dispcore[handle->dispindex]);

	assert(handle->dispindexdispindex>=MAX_OUTPUT_MODULE)
		return -1;//full
	pCore->buf = (char *)malloc(fifoSize);
	assert(pCore->buf);
	if (!(pCore->buf))
		return -2;
	pCore->bufsize = fifoSize;
	pCore->pin = pCore->buf;
	pCore->pout = pCore->buf;
	pCore->discardBytes = 0;
	pCore->inputBytes= 0;
	pCore->outputBytes= 0;

	if(pthread_mutex_init(&(pCore->mut),NULL)!=0)
		{
            mylog("init mutex in regoutModule error! \n");
			return -1;
		}//add by Eric

	handle->dispindex++;
	return (handle->dispindex-1);
}

//free all modules that registered by regOutputModule()
int disregAllOutputModules(DISP_HANDLE h)
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
	DISP_OUPUT_CORE * pCore;
	int i;
	int size;
		
	for (i=0; idispindex; i++)
	{
		pCore = &(handle->dispcore[i]);
		//get valide data size
		size=pCore->pin - pCore->pout;
		if (size<0)
			size+=pCore->bufsize;
		mylog("output module #%d, input bytes[%u],output bytes[%u], discard bytes[%u], left bytes[%i]\n",i,pCore->inputBytes,pCore->outputBytes,pCore->discardBytes,size);
		assert(pCore->buf);
		free(pCore->buf);
		pCore->buf = 0;
		pthread_mutex_destroy(&pCore->mut);//add by Eric
	}
	handle->dispindex = 0;
	free(handle);
	return 0;
}

//dispatch data to each output module
//if return -3 ,indicating full not input anydata
int datadispInput(DISP_HANDLE h, const void *data, int datalen)
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
	DISP_OUPUT_CORE * pCore;
	int size;
	int i;
	int rightsize;	
	int bFullNotInput=false;;
	
	for (i=0; i< handle->dispindex; i++)
	{
        	//mylog("dispindex size=%d \n",handle->dispindex);
		pCore = &(handle->dispcore[i]);
		//get valide data size
		if(pthread_mutex_lock(&pCore->mut) !=0)
		{
		   mylog(" cj input data error !lock mutex error!\n");
		   return -1;
		}//add by Eric
		size=pCore->pin - pCore->pout;
		if (size<0)
			size+=pCore->bufsize;
		//printf("cur size 0x%x\n",size);

		if (datalen+size > pCore->bufsize-1)
		{//1: to differ empty and full 
			  pCore->discardBytes+=datalen;
//#ifdef FULL_LOG 
              bFullNotInput=true;
//		mylog("output module #%d is full, data in buffer [%d], can't push in [%d],pCore->pin=%d,pCore->pout=%d,pCore->bufsize=%d\n",i,size,datalen,pCore->pin,pCore->pout,pCore->bufsize);
//#endif
              pthread_mutex_unlock(&pCore->mut);
			  continue;
		}
              bFullNotInput=false;
		//copy data
		rightsize=pCore->buf + pCore->bufsize  - pCore->pin;
//		mylog("Eric:: before data input \n buf start: %d, pCore->pin: %d, pout: %d\nrightsize is %d, datalen is %d\n",
//			pCore->buf,pCore->pin,pCore->pout,rightsize,datalen);
		if (rightsize>=datalen) 
		{
			memcpy(pCore->pin,data,datalen);	
		} 
		else 
		{
			memcpy(pCore->pin,data,rightsize);
			memcpy(pCore->buf,(char *)data+rightsize,datalen-rightsize);
		}

		//set inputBytes
		pCore->inputBytes+= datalen;

		//move  pIn;
		if (pCore->pin+datalen > pCore->buf + pCore->bufsize-1)
			pCore->pin = pCore->pin+datalen - pCore->bufsize;
		else
			pCore->pin+=datalen;
		pthread_mutex_unlock(&pCore->mut);	//add by Eric
//		mylog("Eric:: after datainput\n buf start: %d, pCore->pin: %d, pCore->pout : %d\n\n",
//			pCore->pin,pCore->pout);
	}
	if(bFullNotInput)
		return -3;  //indicating full not input;
	return 0;	
}

int getdatadispsize(DISP_HANDLE h,int id)
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
	DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]);
	int size=0;//valide data size
	if(id>=handle->dispindex&&id<0)
		mylog("Eric:: invalde index=%d",id);
	//assert(id>=0 && id < handle->dispindex);
	if (id<0 || id>=handle->dispindex)
		return -1;
	if(pthread_mutex_lock(&pCore->mut)!=0)
	{
		mylog("cj get datadisp size error lock mutex error!\n");
		return -1;
	}
	else
	{
		size=pCore->pin - pCore->pout;
		if (size<0)
		size+=pCore->bufsize;
          	pthread_mutex_unlock(&pCore->mut);	//add by Eric
	}

	return size;
}

//get data from fifo of a registered module, if size in the fifo is not enough, return -1
/*modified by caojie 
 //usage: return -1 means not enough data ,if bForce=true ,read all the left,else ,not read
 //if read all the left,*pOutputlen indicates how many byte returned
*/
// end of data dispatch module ===============================================
//
int getdatadisp(DISP_HANDLE h, int id,void *buf, int* pOutputlen,bool bForce)
{
	DISP_OUTPUT* handle = (DISP_OUTPUT*)h;
	DISP_OUPUT_CORE * pCore = &(handle->dispcore[id]);
	int size;//valide data size
	int rightsize;
	int outputlen=*pOutputlen;
	int bNotEnough=false;
	if(id>=handle->dispindex&&id<0)
		mylog("Eric:: invalde index=%d",id);
	assert((id>=0) && (id < handle->dispindex));
	if (id<0 || id>=handle->dispindex){
		mylog("getdata disp error!!index d%\n",id);
		return -2;
		}

	if(pthread_mutex_lock(&pCore->mut) !=0)
		{
		        mylog("cj lock mutex error! getdatadisp \n");
              	 return -1;//modified by caojie
		}
	size=pCore->pin - pCore->pout;
	if (size<0)
		size+=pCore->bufsize;
	if (sizemut);	//add by caojie
			 //    mylog("not force read reading data from fifo not enough size=%d,poutputlen=%d,pCore->pin=%d,pCore->pout=%d \n",size,outputlen,pCore->pin,pCore->pout);
			    return -1;
			}
		       else
		       {
                 
			     *pOutputlen=size;  //read all the left
		          outputlen=size;
			      bNotEnough=true;
			    
		       }
	}
	//copy data
	rightsize=pCore->buf + pCore->bufsize  - pCore->pout;
	if (rightsize>=outputlen)
	{
		memcpy(buf,pCore->pout,outputlen);
	}
	else
	{
		memcpy(buf,pCore->pout,rightsize);
		memcpy((char *)buf+rightsize,pCore->buf,outputlen-rightsize);
	}

	//move  pOut;	
	if (pCore->pout+outputlen > pCore->buf + pCore->bufsize-1)
		pCore->pout = pCore->pout + outputlen - pCore->bufsize;
	else
		pCore->pout+=outputlen;
	//printf("Output udp data length:%d\n",outputlen);	
	pCore->outputBytes+=outputlen;
   	pthread_mutex_unlock(&pCore->mut);	//add by Eric

	if(bNotEnough){
       //	mylog("cj::data is not Enough in fifo!!\n");
		return -1;}
	return 0;
}

//**************************************************************
//fifo module
//The module is used for many inputers and one outputer
//The output packet size is the same as the input packet size
//

//
// end  of command fifo module=====================================
//