www.pudn.com > 使用IP网络聊天的VOIP源码.zip > threads.cpp


/* 
   Talker - A small program which utilizes the Layer-3 codec (ACM) in windows for voice-over-IP 
   Copyright (C) 1999 Dino Klein 
 
   This program is free software; you can redistribute it and/or modify it under the terms of 
   the GNU General Public License as published by the Free Software Foundation; either 
   version 2 of the License, or (at your option) any later version.  
 
   This program is distributed in the hope that it will be useful, but WITHOUT ANY 
   WARRANTY; without even the implied warranty of MERCHANTABILITY or 
   FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 
   more details.  
 
   You should have received a copy of the GNU General Public License along with this 
   program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, 
   MA 02139, USA. 
 
   email: dinoklein@hotmail.com 
*/ 
 
 
#define  WIN32_LEAN_AND_MEAN 
#include  
#include  
#include  
#include  
#include  
#include  
#include "defines.h" 
#include  
 
 
 
extern bool        stereo; 
extern int         queue_size; 
extern sound_sec   sqq[MAX_QUEUE]; 
extern int         index_take, index_put; 
 
 
 
 
extern bool              dbl; 
extern bool              blocks_got[16]; 
extern unsigned char     total_blocks; 
extern char              output_buffer[4096]; 
extern CRITICAL_SECTION  cs; 
extern SOCKET            ssin, sout; 
extern unsigned short    next_send_id, next_receive_id; 
extern HACMSTREAM        has, hasd; 
extern HANDLE            hPlay, hRec[4], hTerminate, hData; 
extern ACMSTREAMHEADER   ahc, ahd; 
extern char              tempbuf [4096]; 
extern HWND              talker_dlg; 
extern char              dst_addr_txt [16]; 
extern char              listen_mode; 
extern unsigned long     dst_addr; 
extern int               input_buffer_size, lock_size, segments, output_buf_size; 
extern WAVEFORMATEX      wfx; 
 
 
 
DWORD ticks; 
 
 
bool zero_output_buffer (void); 
 
extern LPDIRECTSOUNDCAPTUREBUFFER  lpdscb; 
extern LPDIRECTSOUNDBUFFER         lpdsb; 
 
 
 
 
#define MAX_SOUND_DATA 541   // 540 
 
 
 
 
DWORD WINAPI sender (LPVOID lpv) 
{ 
   DWORD         r; 
   HANDLE        things[11]; 
   SOUND_PACKET  out_sp; 
   DWORD         cb1, cb2; 
   LPVOID        pv1, pv2; 
   int           lock_from, to_lock; 
 
 
   lock_from = 0; 
   SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); 
   things[0] = hTerminate; 
   for (int i=0; i<11; i++) things[i+1] = hRec[i]; 
   for (;;) 
   { 
      r = WaitForMultipleObjects (11, things, 0, INFINITE); 
      if (r == WAIT_OBJECT_0) break; 
 
 
      r -= WAIT_OBJECT_0; 
      lock_from = (r-1) * lock_size; 
      if (r==segments) to_lock = input_buffer_size-lock_from; 
      else to_lock = lock_size; 
 
      if (lpdscb->Lock(lock_from, lock_size, &pv1, &cb1, &pv2, &cb2, 0)!=DS_OK) continue; 
 
      ahc.cbSrcLength = cb1; 
      memcpy(ahc.pbSrc, pv1, cb1); 
      lpdscb->Unlock(pv1, cb1, pv2, cb2); 
 
      DWORD d = GetTickCount(); 
      r = acmStreamConvert (has, &ahc, ACM_STREAMCONVERTF_START|ACM_STREAMCONVERTF_BLOCKALIGN); 
      d = GetTickCount() - d; 
      PostMessage(talker_dlg, ST_SIZE, (WPARAM) d, 0); 
      if (!r) 
      { 
         int  index = 0, tocopy, t; 
 
 
 
         out_sp.sph.uid = 0x5A5A; 
         out_sp.sph.ordinal_id = next_send_id; 
 
         out_sp.sph.extra = (ahc.cbDstLengthUsed/MAX_SOUND_DATA)<<4; 
 
 
 
         for (t=0;;t++) 
         { 
            tocopy = ahc.cbDstLengthUsed-index; 
            if (tocopy>MAX_SOUND_DATA) tocopy = MAX_SOUND_DATA; 
            out_sp.sph.extra = (out_sp.sph.extra&0xF0) | t; 
            memcpy(out_sp.data, &ahc.pbDst[index], tocopy); 
            out_sp.sph.length = tocopy; 
            send (sout, (char *) &out_sp, sizeof (SOUND_PACKET_HEADER) + out_sp.sph.length, 0); 
            if (dbl) send(sout, (char *) &out_sp, sizeof (SOUND_PACKET_HEADER) + out_sp.sph.length, 0); 
 
            index += tocopy; 
            if (t==(ahc.cbDstLengthUsed/MAX_SOUND_DATA)) break; 
         } 
 
 
         PostMessage(talker_dlg, ST_ERR, (WPARAM) ahc.cbDstLengthUsed, t+1); 
 
         next_send_id++; 
      } 
   } 
 
 
   ExitThread (0); 
   return (0); 
} 
 
 
 
 
 
/* 
   need to keep track of the pointer and zero out the 
   buffer only once when needed, and not waste cycles 
   as it is done now 
*/ 
 
 
DWORD WINAPI actual_player (LPVOID lpv) 
{ 
   DWORD    write_offset; 
   bool     buffer_blank; 
   HANDLE   *stuff;      //[0] = terminate event; [1] = data; [2] = pipe 
   DWORD    cb1, cb2; 
   LPVOID   pv1, pv2; 
   HRESULT  hr; 
   DWORD    pplay, pwrite, bytes_in_pipe, bytes_wrote_to_buf, free_for_lock; 
   DWORD    bytes_read; 
   char     dump[8192]; 
 
   //FILE *fp = fopen("c:\\check.dat", "rb"); 
 
   stuff = (HANDLE*)lpv; 
   buffer_blank = true; 
   SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); 
   for (;;) 
   { 
      PeekNamedPipe(stuff[2], 0, 0, 0, &bytes_in_pipe, 0); 
      if (bytes_in_pipe) bytes_in_pipe -= bytes_in_pipe%wfx.nBlockAlign; 
      hr = lpdsb->GetCurrentPosition(&pplay, &pwrite); 
      if (hr==DS_OK) 
      { 
         if (buffer_blank) 
         { 
            lpdsb->SetCurrentPosition(0); 
            zero_output_buffer(); 
            write_offset = 0; 
            free_for_lock = output_buf_size-output_buf_size%wfx.nBlockAlign; 
         } 
         else 
         { 
            // check if write_offset is in a valid place, in case of computer slowness 
 
            if (pwrite>pplay) 
            if ((write_offset>=pplay)&&(write_offset=pplay) write_offset = pwrite; 
 
            // in case windows does not copies in n*BlockAlign steps, which 
            // I kinda tested and it seems like it doesn't always do that, so, what the heck 
            if (write_offset && (write_offset%wfx.nBlockAlign)) 
            { 
               write_offset = write_offset+(wfx.nBlockAlign-write_offset%wfx.nBlockAlign); 
               if (write_offset>=output_buf_size) write_offset -= output_buf_size; 
            } 
 
 
            // calc how many bytes we can write into the buffer 
            if (write_offset>pplay) free_for_lock = output_buf_size-write_offset+pplay; 
            else free_for_lock = pplay-write_offset; 
            free_for_lock -= free_for_lock%wfx.nBlockAlign; 
         } 
 
         hr = lpdsb->Lock(write_offset, free_for_lock, &pv1, &cb1, &pv2, &cb2, 0); 
         if (hr==DS_OK) 
         { 
            bytes_wrote_to_buf = 0; 
            if (bytes_in_pipe) 
            { 
               ReadFile(stuff[2], pv1, (bytes_in_pipe>cb1)?cb1:bytes_in_pipe, &bytes_read, 0); 
         //fwrite(pv1, bytes_read, 1, fp); 
               bytes_in_pipe -= bytes_read; 
               bytes_wrote_to_buf += bytes_read; 
               if (bytes_in_pipe && pv2) 
               { 
                  ReadFile(stuff[2], pv2, (bytes_in_pipe>cb2)?cb2:bytes_in_pipe, &bytes_read, 0); 
         //fwrite(pv2, bytes_read, 1, fp);          
                  bytes_wrote_to_buf += bytes_read; 
               } 
            } 
            if (bytes_wrote_to_bufUnlock(pv1, cb1, pv2, cb2); 
            write_offset += bytes_wrote_to_buf; 
            if (write_offset>=output_buf_size) write_offset -= output_buf_size; 
            if (buffer_blank) 
            { 
               buffer_blank = false; 
               lpdsb->Play(0, 0, DSBPLAY_LOOPING); 
            } 
         } 
      } 
      if (hr==DSERR_BUFFERLOST) 
      { 
         buffer_blank = true; 
         // attempting to restore 
         hr = lpdsb->Restore(); 
         if (hr==DS_OK) continue; 
      } 
 
      // in case there was a failure on the way, dump 0.2sec of the wavedata 
      // FIX THIS TO READ BLOCK ALIGNED DATA if CB1 is greater than bytes_in_pipe 
      if ((hr!=DS_OK) && bytes_in_pipe) 
      { 
         cb1 = 11025/5*wfx.nBlockAlign; 
         ReadFile(stuff[2], dump, (cb1>bytes_in_pipe)?bytes_in_pipe:cb1, &bytes_read, 0); 
      } 
 
      // wait for data or termination until timeout 
      // NEED TO ADJUT TIMEOUT ACCORDING TO DS BUFFER SIZE (although, most buffs should be bigger than .25sec) 
      if (WaitForMultipleObjects(2, stuff, 0, 250)==WAIT_OBJECT_0) break; 
   } 
 
   lpdsb->Stop(); 
 
   //fclose(fp); 
   ExitThread(0); 
   return 0; 
} 
 
 
 
 
DWORD WINAPI player (LPVOID lpv) // should be renamed to decompressor 
{ 
   DWORD   r; 
   HANDLE  things[2]; 
   DWORD   child_id; 
   HANDLE  terminate_child, child; 
   HANDLE  write_pipe, read_pipe; 
   HANDLE  for_child[3]; 
 
    
   // just in case init went bad... 
   if (WaitForSingleObject(hTerminate,0)==WAIT_OBJECT_0) ExitThread(0); 
 
   for_child[1] = CreateEvent(0, 0, 0, 0); 
   if (!for_child[1]) ExitThread(0); 
   if (CreatePipe(&read_pipe, &write_pipe, 0, 128*1024)) 
   { 
      terminate_child = CreateEvent(0, 0, 0, 0); 
      if (terminate_child) 
      { 
         for_child[0] = terminate_child; 
         for_child[2] = read_pipe; 
         child = CreateThread(0, 0, actual_player, (LPVOID)for_child, 0, &child_id); 
         if (child) 
         { 
 
            SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); 
            things [0] = hTerminate; 
            things [1] = hData; 
            for (;;) 
            { 
               r = WaitForMultipleObjects (2, things, 0, INFINITE); 
               if (r==WAIT_OBJECT_0) break; 
 
               EnterCriticalSection (&cs); 
               queue_size--; 
               if (++index_take==MAX_QUEUE) index_take=0; 
               memcpy(ahd.pbSrc, sqq[index_take].data, sqq[index_take].size); 
               ahd.cbSrcLength = sqq[index_take].size; 
               LeaveCriticalSection (&cs); 
 
               if (acmStreamConvert(hasd, &ahd, ACM_STREAMCONVERTF_BLOCKALIGN)==0) 
               { 
                  WriteFile(write_pipe, ahd.pbDst, ahd.cbDstLengthUsed, &r, 0); 
                  SetEvent(for_child[1]); 
               } 
            } 
            SetEvent(terminate_child); 
            WaitForSingleObject(child, INFINITE); 
         } 
      } 
   } 
 
   ExitThread (0); 
   return (0); 
} 
 
 
 
 
 
 
DWORD WINAPI receiver (LPVOID lpv) 
{ 
   SOUND_PACKET  in_sp; 
   int           r; 
   fd_set        fs; 
   timeval       tv; 
 
 
   char  mybuf[4096]; 
   int   total_size; 
 
 
 
 
   total_size = 0; 
   SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); 
   while (WaitForSingleObject (hTerminate, 0) != WAIT_OBJECT_0) 
   { 
      FD_ZERO(&fs); 
      FD_SET(ssin, &fs); 
      tv.tv_sec = 1; 
      tv.tv_usec = 0; 
      if (select(0, &fs, 0, 0, &tv)<=0) continue; 
 
      if (listen_mode) 
      { 
         SOCKADDR_IN  si; 
         int          l; 
 
 
         l = sizeof (SOCKADDR_IN); 
         r = recvfrom (ssin, (char *) &in_sp, sizeof (SOUND_PACKET), 0, (SOCKADDR *) &si, &l); 
         if (r < 5120 && r >= sizeof(SOUND_PACKET_HEADER)) 
         if (in_sp.sph.uid == 0x5A5A) 
         if (in_sp.sph.length==(r-sizeof(SOUND_PACKET_HEADER))) 
         { 
            SOCKADDR_IN  ssi; 
 
 
            dst_addr = si.sin_addr.S_un.S_addr; 
            ssi.sin_family = AF_INET; 
            ssi.sin_port = htons(IN_PORT); 
            ssi.sin_addr = si.sin_addr; 
            for (int i=0;i<8; i++) ssi.sin_zero[i] = 0; 
 
            connect (ssin, (SOCKADDR *) &si, sizeof (SOCKADDR_IN)); 
            connect (sout, (SOCKADDR *) &ssi, sizeof (SOCKADDR_IN)); 
            sprintf (dst_addr_txt, "%u.%u.%u.%u", si.sin_addr.S_un.S_un_b.s_b1, si.sin_addr.S_un.S_un_b.s_b2, si.sin_addr.S_un.S_un_b.s_b3, si.sin_addr.S_un.S_un_b.s_b4); 
            PostMessage (talker_dlg, RT_CONNECTED, 0, 0); 
            listen_mode = 0; 
            lpdscb->Start(DSCBSTART_LOOPING); 
            goto DoIt; 
         } 
      } 
      else 
      { 
         r = recv (ssin, (char *) &in_sp, sizeof (SOUND_PACKET), 0); 
         if (r < 5120 && r >= sizeof(SOUND_PACKET_HEADER)) 
         if (in_sp.sph.uid == 0x5A5A) 
         if (in_sp.sph.ordinal_id >= next_receive_id) 
         if (in_sp.sph.length==(r-sizeof(SOUND_PACKET_HEADER))) 
         { 
DoIt: 
 
            //if (rand() % 2) continue;   // debug thing 
            //if (in_sp.sph.extra&0x0F==1) continue;  // debug thing 
 
            if (in_sp.sph.ordinal_id>next_receive_id) 
            { 
 
            // the next block allows to play partial seconds of sound 
               if (total_size) 
               { 
 
 
                     sound_sec  scp; 
                     for (int m=0,i=0; i<=total_blocks; i++) 
                     { 
                        if (blocks_got[i]) 
                        { 
                           memcpy(&scp.data[m], &mybuf[i*MAX_SOUND_DATA], (i==total_blocks)?(total_size-i*MAX_SOUND_DATA):MAX_SOUND_DATA); 
                           m += (i==total_blocks)?(total_size-i*MAX_SOUND_DATA):MAX_SOUND_DATA; 
                        } 
                        scp.size = m; 
                     } 
 
                     EnterCriticalSection (&cs); 
                     if (index_put!=index_take) 
                     { 
                        ReleaseSemaphore(hData, 1, 0); 
                        memcpy(&sqq[index_put], &scp, sizeof(scp)); 
                        if (++index_put == MAX_QUEUE) index_put = 0; 
                        queue_size++; 
                        PostMessage(talker_dlg, ST_QUE, (WPARAM) queue_size, 0); 
                     } 
                     LeaveCriticalSection (&cs); 
 
 
                     int flg; 
                      
                     flg = 0; 
                     for (int a=0; a<=total_blocks; a++) if (blocks_got[a]) flg++; 
                     PostMessage (talker_dlg, ST_SEQ, (WPARAM) next_receive_id, (LPARAM)flg|((total_blocks+1)<<16)); 
               } 
 
 
 
               next_receive_id = in_sp.sph.ordinal_id; 
               for (int y=0; y<16; y++) blocks_got[y] = false; 
               total_size = 0; 
            } 
 
 
            if (blocks_got[in_sp.sph.extra&0x0F]) continue; 
 
            total_blocks = in_sp.sph.extra >> 4; // for now; the code is messy anyway, so who cares 
            if ((in_sp.sph.extra&0x0F)==(in_sp.sph.extra>>4)) total_size = total_blocks*MAX_SOUND_DATA+in_sp.sph.length; 
            else if (total_size==0) total_size = total_blocks*MAX_SOUND_DATA; 
 
 
 
            memcpy(&mybuf[(in_sp.sph.extra & 0x0F)*MAX_SOUND_DATA], in_sp.data, in_sp.sph.length); 
 
            blocks_got[in_sp.sph.extra&0x0F] = true; 
            for (int r=0; r<=total_blocks;r++) 
            { 
               if (blocks_got[r]==false) break; 
               else if (r==total_blocks) 
               { 
 
                     EnterCriticalSection (&cs); 
                     if (index_put!=index_take) 
                     { 
                        ReleaseSemaphore(hData, 1, 0); 
                        sqq[index_put].size = total_size; 
                        memcpy(&sqq[index_put].data[0], mybuf, total_size); 
                        queue_size++; 
                        PostMessage(talker_dlg, ST_QUE, (WPARAM) queue_size, 0); 
                        if (++index_put == MAX_QUEUE) index_put = 0; 
                     } 
                     LeaveCriticalSection (&cs); 
 
                     PostMessage (talker_dlg, ST_SEQ, (WPARAM) in_sp.sph.ordinal_id, ((total_blocks+1)<<16)|(total_blocks+1)); 
                     // reset buf for next sec 
                     for (int y=0; y<16; y++) blocks_got[y] = false; 
                     next_receive_id++; 
                     total_size=0; 
 
               } 
            } 
         } 
         else continue; 
      } 
   } 
 
   ExitThread (0); 
   return (0); 
}