www.pudn.com > streamrtp.rar > sampread.c


/*-------------------------------------------------------------------------
 * sampread.c - sampread
 *-------------------------------------------------------------------------
 */

#include 
#include 
#include 
#include 
#include 
#include 
#include 

#define DEBUG(x) x
#include 

/*------------------------------------------------------------------------
 * sampread - read from a sample-oriented stream beginning at "now".
 * Notes: If flush reading begins at `now', else at psstm->sstm_lastread+1
 *	  Buflen is in *bytes*.
 *        Shift the location within the first byte of the beginning of
 *          the first sample. Shift's initial value has no meaning. Shift
 *          is necessary for encodings that use < 8 bits / sample.
 *	  Time gives the local time of the first sample in the data. On
 *          some errors, time gives the time of next available data. Its 
 *          initial value has no meaning.
 *        Ts gives the media timestamp of the beginning of the data.
 *          Its initial value has no meaning. It has meaning only on
 *          success.
 *        Mark is set to true when the first packet read has the marker
 *          bit set in the RTP header.
 *        On success, sampread returns the number of samples copied to buf.
 *          Note that here sample means 1 sample * number of channels.
 *	    The number of bytes is (samples * samplesize * channels) / 8.
 *------------------------------------------------------------------------
 */
int
sampread(struct synsession *pssn, struct synstream *psstm, struct timespec *time, mediatime_t *ts, char *buf, int buflen, int *shift, bool *mark)
{

	struct sampparam	*pparam;
	struct rtp		*prtp;
	struct rtpln		*pln;
	struct timespec	now, tmptime;
	mediatime_t		endtime, mnow, readfrom;
	int			samples, samplestocopy, startdelta, samplescopied = 0;
	int			bufsamples, bytescopied, rv;
	bool			block, flush;

	pparam = (struct sampparam *) psstm->sstm_parameters;
	block = (psstm->sstm_readflags & SYN_READ_BLOCK) != 0;
	flush = (psstm->sstm_readflags & SYN_READ_FLUSH) != 0;

	bzero(time, sizeof(struct timespec));
	*ts = 0;

	while (TRUE) {    

		/*
		 * Get the next packet.
		 */
		if ((rv = synreadnextpacket(pssn, psstm, &pln)) < 0)
			return rv;
    
		prtp = &pln->rln_rtp;

		/*
		 * Convert local time to corresponding media time.
		 */
		clock_gettime(CLOCK_REALTIME, &now);
		mnow = timeflatten(timesub(now, psstm->sstm_clkx), psstm->sstm_clkrt) + psstm->sstm_clky;
		samples = ((pln->rln_len - RTP_HEADER_LEN(prtp)) * 8) / (pparam->sp_samplesz * pparam->sp_channels);
		endtime = prtp->rtp_time + samples - 1;

		/* 
		 * Check if all data in packet is old.
		 */
		readfrom = (flush == TRUE ? mnow : psstm->sstm_lastread + 1);
		if (tslt(endtime, readfrom)) {
			bufpoolfreebuf(pln);
			continue;
		}

		/* 
		 * Determine where current data begins within packet.
		 */
		readfrom = tsmax(prtp->rtp_time, readfrom);
    
		/*
		 * Check if data is too new.
		 */
		if ((rv = synreadwait(pssn, psstm, pln, readfrom, now, mnow, time)) < 0) 
			return rv;
		else if (rv == SYN_CONTINUE)
			continue;
    
		/*
		 * Compute offset from first data to start reading
		 * (i.e., where current data begins)
		 */
		startdelta = (tsgt(readfrom, prtp->rtp_time) ? readfrom - prtp->rtp_time : 0);
		*shift = (startdelta * pparam->sp_samplesz * pparam->sp_channels) % 8;
		tmptime = timeunflatten(readfrom - psstm->sstm_clky, psstm->sstm_clkrt);
		tmptime = timeadd(tmptime, psstm->sstm_clkx);
		*time = tmptime;
		*ts = readfrom;
		*mark = prtp->rtp_mark;

		while(TRUE) { 
			bufsamples = ((buflen * 8) - (samplescopied == 0 ? *shift : 0)) / (pparam->sp_samplesz * pparam->sp_channels);
			samplestocopy = min(samples - startdelta, bufsamples);
			bcopy(RTP_DATA(prtp) + ((startdelta * pparam->sp_samplesz * pparam->sp_channels) / 8), buf, align(samplestocopy * pparam->sp_samplesz * pparam->sp_channels, 8) / 8);      
			psstm->sstm_lastread = readfrom + samplestocopy - 1;
			samplescopied += samplestocopy;
      
			/* 
			 * Check if we've consumed the entire packet.
			 */
			if (tseq(psstm->sstm_lastread, endtime)) {
				bufpoolfreebuf(pln);
				bytescopied = align(samplestocopy * pparam->sp_samplesz * pparam->sp_channels, 8) / 8;
				buflen -= bytescopied;
				buf += bytescopied;

				if (buflen == 0)
					return samplescopied;
	
				pln = rtpqextracthead(pssn->ssn_session, psstm->sstm_ssrc);
				if (pln == NULL)
					return samplescopied;

				prtp = &pln->rln_rtp;
	
				/*
				 * Check for gap between packets.
				 */
				if (!tseq(prtp->rtp_time, psstm->sstm_lastread + 1)) {
					rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln);
					return samplescopied;
				}
	
				/*
				 * Continue with next packet.
				 */
				samples = ((pln->rln_len - RTP_HEADER_LEN(prtp)) * 8) / (pparam->sp_samplesz * pparam->sp_channels);
				endtime = prtp->rtp_time + samples - 1;
				readfrom = prtp->rtp_time;
				startdelta = 0;
			}
			else {
				rtpqinsert(pssn->ssn_session, psstm->sstm_ssrc, pln);
				return samplescopied;
			}
		}
	}
}