www.pudn.com > kRtspProxyd.zip > wait4sessionprocess.c
/* kRtspProxyd */ /**************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2, or (at your option) * any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * ****************************************************************/ #include#include #include #include #include #include #include #include //for in_aton() #include "structure.h" #include "prototypes.h" #include "proxy.h" extern int errno; static char *Buffer[CONFIG_KRTSPROXYD_NUMCPU]; #define MAX_LINE_BUFF 256 #define MAX_CMD_BUFF 128 void make_socket_nonblocking(struct socket *skt) { skt->sk->proc |= O_NONBLOCK; //fix me??? } static char *find_transport_header(char *inp) { inp = strchr(inp, ';'); if (inp != NULL) inp++; return inp; } /* grab a full line from input put into strBuff as 0 terminated must maintain the exact eol that the input string has. return next position. */ char* get_line_str( char* strBuff, const char *input, int buffSize) { const char *p = input; int sawEOLChar = 0; if( strBuff == NULL || input == NULL || buffSize <= 0){ KRTSPROXYD_OUT(KERN_INFO "error occured in get_line_str!!\n"); return NULL; } memset(strBuff, 0, buffSize); while( *p ) { if(buffSize <= 0){ KRTSPROXYD_OUT(KERN_INFO "error occured in get_line_str!!\n"); return NULL; } if ( *p == '\r' || *p == '\n' ) { // grab all eol chars sawEOLChar = 1; *strBuff = *p; strBuff++; buffSize--; } else if ( sawEOLChar ) { // we saw eol char(s) and now this is not one // that means we're done break; } else { // grab all line chars *strBuff = *p; strBuff++; buffSize--; } p++; } *strBuff = 0; // we're not going to change the contents // of input, but the caller may have the right too. return (char*)p; } int process_session(rtsp_session *s, const int CPUNR) { int i, canRead, numDelims = 0; char *pBuf, *p; char temp[RTSP_SESSION_BUF_SIZE]; char lineBuff[MAX_LINE_BUFF]; track_info *t; char cmd[MAX_CMD_BUFF], *w; int responseHeaderLen = 0; char *startBuff; int len = 0; int count = 0; EnterFunction("process_session"); /* see if we have any commands coming in */ pBuf = s->cinbuf + s->amtInClientInBuffer; canRead = RTSP_SESSION_BUF_SIZE - s->amtInClientInBuffer - 1; if((s->state == stRecvClientCommand) && (canRead>0) && !skb_queue_empty(&(s->client_skt->sk->receive_queue))){ /* First, read the data */ len = ReceiveBuffer(s->client_skt, pBuf, canRead); if(len < 0) { //error occurred!! KRTSPROXYD_OUT(KERN_ERR "failed in recievebuffer()\n"); switch(errno){ case EAGAIN: KRTSPROXYD_OUT(KERN_INFO "EAGAIN\n"); break; case EPIPE: //connection broke case ENOTCONN: //shutdown case ECONNRESET: KRTSPROXYD_OUT(KERN_INFO "EPIPE,ENOTCONN,ECONNRESET\n"); s->state = stClientShutdown; break; default: KRTSPROXYD_OUT(KERN_ERR "problem reading from session socket\n"); break; } } else if (len == 0){ // if readable and # of bytes is 0, then the client has shut down s->state = stClientShutdown; } else { pBuf[len] = '\0'; //terminate it!!! KRTSPROXYD_OUT(KERN_INFO "the packet is right!the command is %s--ENDING\n", pBuf); s->amtInClientInBuffer += len; } } /* see what we have to do as a result of this or previous commands */ switch (s->state) { case stIdle: break; case stRecvClientCommand: // have we read a full command yet? if (s->amtInClientInBuffer == 0 || ! has_two_crlfs(s->cinbuf)) { break; } memset(temp, 0, RTSP_SESSION_BUF_SIZE); count++; strcpy(temp, s->cinbuf); KRTSPROXYD_OUT(KERN_INFO "the CLIENT COMMAND is %s..ENDING\n", temp); pBuf = temp; if ( (p = str_sep(&pBuf, "\r\n")) != NULL ) { if (is_command(p, cmd, temp)) { if (s->server_address != NULL) kfree(s->server_address); s->server_address = kmalloc(strlen(temp) + 1, GFP_KERNEL); if(!s->server_address) { s->state = stError; return count; } strcpy(s->server_address, temp); KRTSPROXYD_OUT(KERN_INFO "s->server_address = %s\n", s->server_address); // take port off address (if any) if ((w = strchr(s->server_address, ':')) != NULL) *w++ = '\0'; // make an async request for the IP number s->tempIP = in_aton(s->server_address); //network byte order if (s->tempIP == INADDR_NONE){ //invalid address!! send_rtsp_error(s->client_skt, kServerNotFound); s->state = stBadServerName; } else s->state = stParseClientCommand; } } else { KRTSPROXYD_OUT(KERN_ERR "Couldn't make sense of client command\n"); s->state = stError; } break; case stParseClientCommand: // analysis the client's command KRTSPROXYD_OUT(KERN_INFO "in stParseClientCommand\n"); count++; startBuff= s->soutbuf; pBuf = s->cinbuf; s->amtInServerOutBuffer = 0; //initialize KRTSPROXYD_OUT(KERN_INFO "the CLIENT COMMAND being PARSED is %s\n", pBuf); while ((p = str_sep(&pBuf, "\r\n")) != NULL) { // Count the empty fields; three in a row is the end of the header if (*p == '\0') { if (++numDelims == 3) break; continue; } else numDelims = 0; // see if we can snarf our data out of the headers if (is_command(p, cmd, temp)) { unsigned int ip; // get server address if (s->server_address != NULL) kfree(s->server_address); s->server_address = kmalloc(strlen(temp) + 1, GFP_KERNEL); if(!s->server_address) { s->state = stError; return count; } strcpy(s->server_address, temp); // get server port (if any) if ((w = strchr(s->server_address, ':')) != NULL) { *w++ = '\0'; s->server_port = atoi(w); KRTSPROXYD_OUT(KERN_INFO "kernel server_port is %d\n", s->server_port); } // check to see if command is pointing to the same server that // we're already connected to. ip = in_aton(s->server_address); //network byte order if ((ip != s->server_ip) && (s->server_skt != NULL)) { sock_release(s->server_skt); atomic_dec(&gMaxPorts); s->server_skt = NULL; s->server_ip = ip; //nbo } if (ip == INADDR_NONE) { s->state = stBadServerName; return count; } s->server_ip = ip; KRTSPROXYD_OUT(KERN_INFO "%s command for server %s:%d (ip %s)\n", cmd, s->server_address, s->server_port, ntoa(s->server_ip)); s->transaction_type = cmd_to_transaction_type(cmd); if (s->transaction_type == ttSetup && has_trackID(p, &i)) { KRTSPROXYD_OUT(KERN_INFO "s->transaction_type = ttSETUP\n"); if ((len = track_id_to_idx(s, i)) == -1) { len = s->numTracks; s->trk[s->numTracks++].ID = i; } s->cur_trk = len; } } else if (s->transaction_type == ttSetup && has_client_port(p, &(s->trk[s->cur_trk].ClientRTPPort))) { t = s->trk + s->cur_trk; KRTSPROXYD_OUT(KERN_INFO "Client ports for track %d are %d-%d\n", s->cur_trk, t->ClientRTPPort, t->ClientRTPPort + 1); // make rtp/rtcp port pair for proxy=>server if (make_udp_port_pair(gProxyIP, s->server_ip, &t->RTP_S2P, &t->RTCP_S2P, CPUNR) == -1) { s->server_skt = NULL; KRTSPROXYD_OUT(KERN_ERR "Couldn't create udp port pair for proxy=>server\n"); s->state = stError; break; } KRTSPROXYD_OUT(KERN_INFO "Created ports for server to proxy on track %d are %d-%d\n", s->cur_trk, t->RTP_S2P->port, t->RTCP_S2P->port); // reconstruct the client port string sprintf(temp, "%s%d-%d", p, t->RTP_S2P->port, t->RTCP_S2P->port); p = temp; } if (0 == strn_casecmp(p, "x-dynamic-rate", 14))// don't send to server not supported. p = "x-dynamic-rate: 0"; // put the line in the outgoing buffer len = strlen(p); memcpy(s->soutbuf + s->amtInServerOutBuffer, p, len); s->amtInServerOutBuffer += len; s->soutbuf[s->amtInServerOutBuffer++] = '\r'; s->soutbuf[s->amtInServerOutBuffer++] = '\n'; } if (*(s->cinbuf + s->amtInServerOutBuffer) == 0) { s->soutbuf[s->amtInServerOutBuffer++] = '\r'; s->soutbuf[s->amtInServerOutBuffer++] = '\n'; } s->amtInClientInBuffer -= s->amtInServerOutBuffer; if (s->amtInClientInBuffer > 0) { memcpy(s->cinbuf, s->cinbuf + s->amtInServerOutBuffer, s->amtInClientInBuffer); s->cinbuf[s->amtInClientInBuffer] = 0;//mark KRTSPROXYD_OUT(KERN_INFO "in stParseClientCommand memcpy to s->cinbuf=%s\n", s->cinbuf); } else if (s->amtInClientInBuffer < 0) s->amtInClientInBuffer = 0; s->state = stServerTransactionSend; KRTSPROXYD_OUT(KERN_INFO "set s->state to stServerTransactionSend!\n"); if (s->soutbuf[s->amtInServerOutBuffer - 4] != '\r') { s->soutbuf[s->amtInServerOutBuffer++] = '\r'; s->soutbuf[s->amtInServerOutBuffer++] = '\n'; } KRTSPROXYD_OUT(KERN_INFO "in stParseClientCommand SEND TO SERVER=%s\n", s->soutbuf); KRTSPROXYD_OUT(KERN_INFO "s->amtInServerOutBuffer = %d\n", s->amtInServerOutBuffer); break; case stServerTransactionSend: // check to see if we've got a connection to the server open KRTSPROXYD_OUT(KERN_INFO "in stServerTransactionSend\n"); count++; if (s->server_skt == NULL) { if ((s->server_skt = new_socket_tcp()) == NULL) { KRTSPROXYD_OUT(KERN_ERR "Couldn't open a socket to connect to server\n"); s->state = stError; return count; } KRTSPROXYD_OUT(KERN_INFO "new_socket_tcp success!\n"); set_socket_reuse_address(s->server_skt); //make_socket_nonblocking(s->server_skt); KRTSPROXYD_OUT(KERN_INFO "make_socket_nonblocking() succsess!!!\n"); s->server_skt_pending_connection = 1; if ((i = connect_to_address(s->server_skt, s->server_ip, s->server_port)) != 0) { switch (errno) { case EISCONN: /* already connected */ break; case EINPROGRESS: /* connection can't be completed immediately */ case EAGAIN: /* connection can't be completed immediately */ case EALREADY: /* previous connection attempt hasn't been completed */ break; default: sock_release(s->server_skt); atomic_dec(&gMaxPorts); s->server_skt = NULL; KRTSPROXYD_OUT(KERN_ERR "couldn't connect to server %s",ntoa(s->server_ip)); s->state = stCantConnectToServer; return count; } } else s->server_skt_pending_connection = 0; } // check to see if we're connected (writable) and send the command if (s->amtInServerOutBuffer && (s->server_skt_pending_connection == 0 )) { s->server_skt_pending_connection = 0; if ((len = SendBuffer(s->server_skt, s->soutbuf, s->amtInServerOutBuffer)) < 0) { switch (errno) { case EPIPE: // connection broke case ENOTCONN: // shut down case ECONNRESET: s->state = stServerShutdown; break; case EAGAIN: // was busy - try again case EINTR: // got interrupted - try again break; default: KRTSPROXYD_OUT(KERN_ERR "writing to server error"); s->state = stError; return count; } } else if (len == 0) s->state = stServerShutdown; else { s->amtInServerOutBuffer -= len; if (s->amtInServerOutBuffer == 0) { s->state = stServerTransactionRecv; } } } break; case stServerTransactionRecv: // check to see if we've got a response from the server KRTSPROXYD_OUT(KERN_INFO "in stServerTransactionRecv\n"); count++; if (s->server_skt == NULL) { s->state = stServerShutdown; break; } pBuf = s->sinbuf + s->amtInServerInBuffer; canRead = RTSP_SESSION_BUF_SIZE - s->amtInServerInBuffer - 1; if (canRead > 0 && !skb_queue_empty(&(s->server_skt->sk->receive_queue))) { if ((len = ReceiveBuffer(s->server_skt, pBuf, canRead)) < 0) { switch (errno) { case EAGAIN: break; case EPIPE: // connection broke case ENOTCONN: // shut down case ECONNRESET: s->state = stServerShutdown; break; default: KRTSPROXYD_OUT(KERN_ERR "problems reading from server\n"); break; } } else { pBuf[len] = '\0'; KRTSPROXYD_OUT(KERN_INFO "read %d bytes from server:%s\n", len, pBuf); s->amtInServerInBuffer += len; } } else{ KRTSPROXYD_OUT(KERN_INFO "Cannot read now\n"); break; } // DMS - if there is a content-length, make sure we've gotten all that data too. if ((s->totalContentLength > 0) && (s->amtInServerInBuffer < s->totalContentLength)) { break; } if ( s->totalContentLength == 0 ) { //-rt if totalContentLength != 0,then we must have seen "has_two_crlfs" // and already parsed out the content-length header. // now we won't be able to find it again becuase str_sep // in the parsing code below has already replaced the CRLFs with \0's. // did we get complete response headers yet? responseHeaderLen = has_two_crlfs(s->sinbuf); if (responseHeaderLen == 0) { KRTSPROXYD_OUT(KERN_INFO "we have not get complete response headers!!\n"); break; } } if ( !s->haveParsedServerReplyHeaders ) { // we can only do this one time! // munge the data for the client, while snarfing what we need for (pBuf = s->sinbuf; (p = str_sep(&pBuf, "\r\n")) != NULL; ) { // Count the empty fields; three in a row is end of the header if (*p == '\0') { if (++numDelims == 3) break; continue; } else numDelims = 0; if (0 == strn_casecmp(p, "x-Accept-Dynamic-Rate", 21)) p = "x-Accept-Dynamic-Rate: 0"; // see if we can snarf any data out of the headers if (has_content_length(p, &s->contentLength)) { s->totalContentLength = s->contentLength + responseHeaderLen; } else if (has_sessionID(p, temp)) { if (!s->sessionID) { s->sessionID = kmalloc(strlen(temp) + 1, GFP_KERNEL); if(!s->sessionID) { KRTSPROXYD_OUT(KERN_ERR "cannot alloate session ID's Mem!\n"); s->state = stError; return count; } strcpy(s->sessionID, temp); } else if (str_casecmp(s->sessionID, temp) != 0) KRTSPROXYD_OUT(KERN_ERR "Bad session ID in response from server"); } else if (s->transaction_type == ttSetup && has_ports(p, &i, &len)) { t = s->trk + s->cur_trk; t->ServerRTPPort = len; KRTSPROXYD_OUT(KERN_INFO "Server ports for track %d are %d-%d \n", s->cur_trk, t->ServerRTPPort, t->ServerRTPPort + 1); // make rtp/rtcp port pair here proxy=>client if (make_udp_port_pair(gProxyIP, s->client_ip, &t->RTP_P2C, &t->RTCP_P2C, CPUNR) == -1) { s->server_skt = NULL; KRTSPROXYD_OUT(KERN_INFO "Couldn't create udp port pair for proxy=>client \n"); s->state = stError; break; } // set up transfer param blocks t->RTP_S2C_tpb.status = &s->die; t->RTP_S2C_tpb.send_from = t->RTP_P2C; t->RTP_S2C_tpb.send_to_ip = s->client_ip; t->RTP_S2C_tpb.send_to_port = t->ClientRTPPort; strcpy(t->RTP_S2C_tpb.socketName, "RTP Server to Client"); upon_receipt_from(t->RTP_S2P, s->server_ip, (do_routine)transfer_data, (void *)&(t->RTP_S2C_tpb)); t->RTCP_S2C_tpb.status = &s->die; t->RTCP_S2C_tpb.send_from = t->RTCP_P2C; t->RTCP_S2C_tpb.send_to_ip = s->client_ip; t->RTCP_S2C_tpb.send_to_port = t->ClientRTPPort + 1; strcpy(t->RTCP_S2C_tpb.socketName, "RTCP Server to Client"); upon_receipt_from(t->RTCP_S2P, s->server_ip, (do_routine)transfer_data, (void *)&(t->RTCP_S2C_tpb)); t->RTCP_C2S_tpb.status = &s->die; t->RTCP_C2S_tpb.send_from = t->RTCP_S2P; t->RTCP_C2S_tpb.send_to_ip = s->server_ip; t->RTCP_C2S_tpb.send_to_port = t->ServerRTPPort + 1; strcpy(t->RTCP_C2S_tpb.socketName,"RTCP Client to Server"); upon_receipt_from(t->RTCP_P2C, s->client_ip, (do_routine)transfer_data, (void *)&(t->RTCP_C2S_tpb)); KRTSPROXYD_OUT(KERN_INFO "Created ports for proxy to client on track %d are %d-%d\n", s->cur_trk, t->RTP_P2C->port, t->RTCP_P2C->port); // reconstruct the client;server string w = find_transport_header(p); if (w != NULL) *w = '\0'; sprintf(temp, "%sclient_port=%d-%d;server_port=%d-%d;source=%s", p, t->ClientRTPPort, t->ClientRTPPort+1, t->RTP_P2C->port, t->RTCP_P2C->port, ntoa(gProxyIP)); p = temp; } // put the line in the outgoing buffer len = strlen(p); if(len + s->amtInClientOutBuffer + 2 > RTSP_SESSION_BUF_SIZE) { s->state = stError; return count; } memcpy(s->coutbuf + s->amtInClientOutBuffer, p, len); s->amtInClientOutBuffer += len; s->coutbuf[s->amtInClientOutBuffer++] = '\r'; s->coutbuf[s->amtInClientOutBuffer++] = '\n'; s->responseBodyP = pBuf + 3; } if(s->amtInClientOutBuffer +2 > RTSP_SESSION_BUF_SIZE) { KRTSPROXYD_OUT(KERN_ERR "s->amtInClientOutBuffer + 2>RTSP_SESSION_BUF_SIZE\n"); s->state = stError; return count; } s->coutbuf[s->amtInClientOutBuffer++] = '\r'; s->coutbuf[s->amtInClientOutBuffer++] = '\n'; } // the headers are done now. s->haveParsedServerReplyHeaders = 1; // DMS - if there is a content-length, make sure we've gotten all that data too. if ((s->totalContentLength > 0) && (s->amtInServerInBuffer < s->totalContentLength)) { break; } pBuf = s->responseBodyP; // munge and add the content if there is any if (s->contentLength) { if (s->transaction_type == ttDescribe) { char *nextBuffPos = pBuf; nextBuffPos = get_line_str( lineBuff, nextBuffPos, MAX_LINE_BUFF ); //for dbg if(nextBuffPos == NULL){ KRTSPROXYD_OUT(KERN_ERR "nextBuffPos == NULL\n"); s->state = stError; return count; } while ( *lineBuff ) { // c=IN IP0 ? if (has_IN_IP(lineBuff, temp)) { } // put the line in the outgoing buffer len = strlen(lineBuff); if( len + s->amtInClientOutBuffer > RTSP_SESSION_BUF_SIZE ) { KRTSPROXYD_OUT(KERN_ERR "len + s->amtInClientOutBuffer > RTSP_SESSION_BUF_SIZE \n"); s->state = stError; return count; } memcpy(s->coutbuf + s->amtInClientOutBuffer, lineBuff, len); s->amtInClientOutBuffer += len; nextBuffPos = get_line_str( lineBuff, nextBuffPos, MAX_LINE_BUFF ); if(nextBuffPos == NULL) {//for dbg s->state = stError; return count; } } } else { if( s->contentLength + s->amtInClientOutBuffer > RTSP_SESSION_BUF_SIZE ) { KRTSPROXYD_OUT(KERN_ERR "s->contentLength + s->amtInClientOutBuffer > RTSP_SESSION_BUF_SIZE\n"); s->state=stError; return count; } memcpy(s->coutbuf + s->amtInClientOutBuffer, pBuf, s->contentLength); s->amtInClientOutBuffer += s->contentLength; } } s->state = stSendClientResponse; s->amtInServerInBuffer = 0; s->totalContentLength = 0; s->haveParsedServerReplyHeaders = 0; s->contentLength = 0; break; case stSendClientResponse: // check to see that we're still connected (writable) and send the response KRTSPROXYD_OUT(KERN_INFO "in stSendClientResponse\n"); count++; if (s->amtInClientOutBuffer){// && isWritable(s->client_skt)) { KRTSPROXYD_OUT(KERN_INFO "the Length of the message is %d\n",s->amtInClientOutBuffer); KRTSPROXYD_OUT(KERN_INFO "and the message is %s\n", s->coutbuf); if ((len = SendBuffer(s->client_skt, s->coutbuf, s->amtInClientOutBuffer)) < 0) { switch (errno) { case EPIPE: // connection broke case ENOTCONN: // shut down case ECONNRESET: s->state = stClientShutdown; break; case EAGAIN: // was busy - try again case EINTR: // got interrupted - try again break; default: KRTSPROXYD_OUT(KERN_ERR "writing to client error\n"); s->state = stError; return count; } } else { s->amtInClientOutBuffer -= len; if (s->amtInClientOutBuffer == 0){ if (s->transaction_type == ttTeardown) s->state = stClientShutdown; else s->state = stRecvClientCommand; } } } break; case stClientShutdown: s->die = 1; KRTSPROXYD_OUT(KERN_INFO "stClientShutdown\n"); count = 0; break; case stBadServerName: KRTSPROXYD_OUT(KERN_INFO "stBadServerName\n"); send_rtsp_error(s->client_skt, kServerNotFound); s->state = stServerShutdown; count = 0; break; case stCantConnectToServer: KRTSPROXYD_OUT(KERN_INFO "stCantConnectToServer\n"); send_rtsp_error(s->client_skt, kServerNotFound); s->state = stServerShutdown; count = 0; break; case stServerShutdown: KRTSPROXYD_OUT(KERN_ERR "Server shutdown (ip %s)", ntoa(s->server_ip)); s->die = 1; count = 0; break; case stError: send_rtsp_error(s->client_skt, kUnknownError); KRTSPROXYD_OUT(KERN_ERR "error condition\n"); s->die = 1; count = 0; break; } LeaveFunction("process_session"); return count; } int Wait4SessionProcess(const int CPUNR) { struct rtsp_session *CurrentSession,**Prev; int count = 0; EnterFunction("Wait4SessionProcess"); CurrentSession = threadinfo[CPUNR].RtspSessionQueue; Prev = &(threadinfo[CPUNR].RtspSessionQueue); while (CurrentSession!=NULL) { if(CurrentSession->new_session == 1) { CurrentSession->new_session = 0; if (gUserLimit && (atomic_read(&gNumUsers) > gUserLimit)){ CurrentSession->die = 1; send_rtsp_error(CurrentSession->client_skt, kTooManyUsers); } } /* If the connection is lost, remove from queue */ if ((CurrentSession->client_skt->sk->state != TCP_ESTABLISHED && CurrentSession->client_skt->sk->state != TCP_CLOSE_WAIT) ||CurrentSession->die) { struct rtsp_session*Next; Next = CurrentSession->next; *Prev = CurrentSession->next; CurrentSession->next = NULL; KRTSPROXYD_OUT(KERN_INFO "cleanupsession!\n"); CleanUpSession(CurrentSession, CPUNR); CurrentSession = Next; continue; } //service session count += process_session(CurrentSession, CPUNR); Prev = &(CurrentSession->next); CurrentSession = CurrentSession->next; } LeaveFunction("Wait4SessionProcess"); return count; } void StopSessionProcess(const int CPUNR) { struct rtsp_session *CurrentSession,*Next; EnterFunction("StopSessionProcess"); CurrentSession = threadinfo[CPUNR].RtspSessionQueue; while (CurrentSession!=NULL) { Next = CurrentSession->next; CleanUpSession(CurrentSession, CPUNR); CurrentSession = Next; } threadinfo[CPUNR].RtspSessionQueue = NULL; free_page((unsigned long)Buffer[CPUNR]); Buffer[CPUNR]=NULL; EnterFunction("StopWaitingForHeaders"); }