www.pudn.com > RTP通用开发库(for Linux).rar > framereadfragmented.c


/* framereadfragmented.c - framereadfragmented */

#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define DEBUG(x) x
#include 

/*------------------------------------------------------------------------
 * framereadnonfragmented - reassemble and copy out the frame of which 
 * pln.rln_rtp is part. This function waits for all fragments to arrive
 * if in blocking mode. If not blocking and any fragment is missing,
 * this function return SYN_FRAGMENT. Regardless of the value of the
 * SYN_READ_FLUSH bit of the read flags, this function will return
 * SYN_FRAGMENT if all fragments are not received within the frame's
 * useful time. The time at which the frame expires is determined by a 
 * call to frameduration on its first fragment, or by the next highest 
 * timestamp received.
 *------------------------------------------------------------------------
 */
int
framereadfragmented(struct synsession *pssn, struct synstream *psstm, struct timespec *time, mediatime_t *tsout, char *buf, int buflen, char *extraheaderbuf, int *extraheaderbuflen, bool *mark, mediatime_t ts)
{

	struct frameparam     *pparam;
	struct timespec       now, exptime, tmptime;
	struct rtpqueue       queue;
	struct rtpln          *pln;
	struct rtp	      *prtp;
	mediatime_t	      frameduration, endtime;
	bool		      block, aligned, newdata;
	int		      dataoffset, bytes, framelen, accum, rv;
	int		      bytescopied, bytestocopy;
	char		      *pframe;

	pparam = (struct frameparam *) psstm->sstm_parameters;
	block = (psstm->sstm_readflags & SYN_READ_BLOCK) != 0;

	framelen = -1;
	exptime.tv_sec = LONG_MAX;
	exptime.tv_nsec = 0;
	accum = 0;  
	rtpqinit(&queue);

	/*
	 * Accumulate fragments.
	 */
	while(framelen != accum) {
    
		if (psstm->sstm_zombie == TRUE) {
			rtpqdestroy(&queue);
			return ERROR;
		}
    
		/*
		 * Check for rebuffering.
		 */
		if (psstm->sstm_buffering == TRUE) {
			rtpqdestroy(&queue);
			return ERROR;
		}
      
		/*
		 * Check for expiration of the frame.
		 */
		clock_gettime(CLOCK_REALTIME, &now);
		if (timecmp(exptime, now) >= 0) {
			rtpqdestroy(&queue);
			return ERROR;
		}
    
		/* 
		 * Get the next packet.
		 */
		pln = rtpqextracthead(pssn->ssn_session, psstm->sstm_ssrc);
    
		newdata = FALSE;
		if (pln != NULL) {
			prtp = &pln->rln_rtp;
      
			if (tseq(prtp->rtp_time, ts))
				newdata = TRUE;	
			else if (tslt(prtp->rtp_time, ts)) {
				bufpoolfreebuf(pln);
				continue;
			}
			else {
				/*
				 * Infer expiration time of the frame being reassembled
				 */
				endtime = prtp->rtp_time;
				tmptime = timeunflatten(endtime - psstm->sstm_clky, psstm->sstm_clkrt);
				tmptime = timeadd(psstm->sstm_clkx, tmptime);
	  
				if (timecmp(tmptime, exptime) < 0)
					exptime = tmptime;
	
				rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln);
			}
		}

		/* 
		 * Check if no new data; if none, condition wait.
		 */
		if (newdata == FALSE) {
			if (block == TRUE) {
				rv = pthread_cond_timedwait(&psstm->sstm_readcond, &psstm->sstm_readmutex, &exptime);
				if (rv != 0 && rv != ETIMEDOUT) {
					rtpqdestroy(&queue);
					return ERROR;
				}
				continue;
			}
			else {
				/* 
				 * Put frags back into main queue.
				 */
				for (pln = queue.rq_head; pln != NULL; pln = pln->rln_next)
					rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln);

				queue.rq_head = queue.rq_tail = NULL;
				rtpqdestroy(&queue);

				if (exptime.tv_sec != LONG_MAX)
					*time = exptime;
				return SYN_FRAME_FRAG;
			}
		}

		/*
		 * Keep this packet.
		 */
		_rtpqinsert(&queue, pln, NULL);
    
		dataoffset = pparam->fp_dataoffset(prtp, pln->rln_len); 
		if (dataoffset == ERROR) {
			rtpqdestroy(&queue);
			return ERROR;
		}
    
		bytes = pln->rln_len - RTP_HEADER_LEN(prtp) - dataoffset;   
		accum += bytes;
    
		/* 
		 * If we havn't already seen the initial fragment, get the 
		 * details we need from it.
		 */
		if (framelen == -1) {
			aligned = pparam->fp_framealigned(prtp, pln->rln_len);
			if (aligned == (bool) ERROR) {
				rtpqdestroy(&queue);
				return ERROR;
			}

			if (aligned == TRUE) {
				pframe = RTP_DATA(prtp) + dataoffset;
				framelen = pparam->fp_framelength(pframe, bytes);
				frameduration = pparam->fp_frameduration(pframe, bytes);

				if (framelen == ERROR || frameduration == ERROR) {
					rtpqdestroy(&queue);
					return ERROR;
				}
				endtime = ts + frameduration;
				exptime = timeunflatten(endtime - psstm->sstm_clky, psstm->sstm_clkrt);
				exptime = timeadd(psstm->sstm_clkx, tmptime);
			}
		}
	} /* while(framelen != accum) */

	/*
	 * Reassemble the fragments
	 */
	bytescopied = 0;
	pln = queue.rq_head;
  
	while(pln != NULL && buflen > 0) {

		prtp = &pln->rln_rtp;
		dataoffset = pparam->fp_dataoffset(prtp, pln->rln_len);
		bytes = pln->rln_len - RTP_HEADER_LEN(prtp) - dataoffset;
		pframe = RTP_DATA(prtp) + dataoffset;
    
		/*
		 * Copy extra headers only from first fragment's packet.
		 */
		if (bytescopied == 0 && extraheaderbuf != NULL) {
			*extraheaderbuflen = min(dataoffset, *extraheaderbuflen);
			bcopy(RTP_DATA(prtp), extraheaderbuf, *extraheaderbuflen);
			*mark = prtp->rtp_mark;
		}
    
		bytestocopy = min(buflen, bytes);
		bcopy(pframe, buf, bytestocopy);

		bytescopied += bytestocopy;
		buf += bytestocopy;
		buflen -= bytestocopy;
      
		pln = pln->rln_next;
	}
  
	rtpqdestroy(&queue);
  
	/* 
	 * Set *time to local time of ts.
	 */
	tmptime = timeunflatten(ts - psstm->sstm_clky, psstm->sstm_clkrt);
	*time = timeadd(tmptime, psstm->sstm_clkx);
	*tsout = ts;
  
	psstm->sstm_lastread = ts + frameduration;
  
	return bytescopied;
}