www.pudn.com > 使用IP网络聊天的VOIP源码.zip > threads.cpp
/* Talker - A small program which utilizes the Layer-3 codec (ACM) in windows for voice-over-IP Copyright (C) 1999 Dino Klein This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. email: dinoklein@hotmail.com */ #define WIN32_LEAN_AND_MEAN #include#include #include #include #include #include #include "defines.h" #include extern bool stereo; extern int queue_size; extern sound_sec sqq[MAX_QUEUE]; extern int index_take, index_put; extern bool dbl; extern bool blocks_got[16]; extern unsigned char total_blocks; extern char output_buffer[4096]; extern CRITICAL_SECTION cs; extern SOCKET ssin, sout; extern unsigned short next_send_id, next_receive_id; extern HACMSTREAM has, hasd; extern HANDLE hPlay, hRec[4], hTerminate, hData; extern ACMSTREAMHEADER ahc, ahd; extern char tempbuf [4096]; extern HWND talker_dlg; extern char dst_addr_txt [16]; extern char listen_mode; extern unsigned long dst_addr; extern int input_buffer_size, lock_size, segments, output_buf_size; extern WAVEFORMATEX wfx; DWORD ticks; bool zero_output_buffer (void); extern LPDIRECTSOUNDCAPTUREBUFFER lpdscb; extern LPDIRECTSOUNDBUFFER lpdsb; #define MAX_SOUND_DATA 541 // 540 DWORD WINAPI sender (LPVOID lpv) { DWORD r; HANDLE things[11]; SOUND_PACKET out_sp; DWORD cb1, cb2; LPVOID pv1, pv2; int lock_from, to_lock; lock_from = 0; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); things[0] = hTerminate; for (int i=0; i<11; i++) things[i+1] = hRec[i]; for (;;) { r = WaitForMultipleObjects (11, things, 0, INFINITE); if (r == WAIT_OBJECT_0) break; r -= WAIT_OBJECT_0; lock_from = (r-1) * lock_size; if (r==segments) to_lock = input_buffer_size-lock_from; else to_lock = lock_size; if (lpdscb->Lock(lock_from, lock_size, &pv1, &cb1, &pv2, &cb2, 0)!=DS_OK) continue; ahc.cbSrcLength = cb1; memcpy(ahc.pbSrc, pv1, cb1); lpdscb->Unlock(pv1, cb1, pv2, cb2); DWORD d = GetTickCount(); r = acmStreamConvert (has, &ahc, ACM_STREAMCONVERTF_START|ACM_STREAMCONVERTF_BLOCKALIGN); d = GetTickCount() - d; PostMessage(talker_dlg, ST_SIZE, (WPARAM) d, 0); if (!r) { int index = 0, tocopy, t; out_sp.sph.uid = 0x5A5A; out_sp.sph.ordinal_id = next_send_id; out_sp.sph.extra = (ahc.cbDstLengthUsed/MAX_SOUND_DATA)<<4; for (t=0;;t++) { tocopy = ahc.cbDstLengthUsed-index; if (tocopy>MAX_SOUND_DATA) tocopy = MAX_SOUND_DATA; out_sp.sph.extra = (out_sp.sph.extra&0xF0) | t; memcpy(out_sp.data, &ahc.pbDst[index], tocopy); out_sp.sph.length = tocopy; send (sout, (char *) &out_sp, sizeof (SOUND_PACKET_HEADER) + out_sp.sph.length, 0); if (dbl) send(sout, (char *) &out_sp, sizeof (SOUND_PACKET_HEADER) + out_sp.sph.length, 0); index += tocopy; if (t==(ahc.cbDstLengthUsed/MAX_SOUND_DATA)) break; } PostMessage(talker_dlg, ST_ERR, (WPARAM) ahc.cbDstLengthUsed, t+1); next_send_id++; } } ExitThread (0); return (0); } /* need to keep track of the pointer and zero out the buffer only once when needed, and not waste cycles as it is done now */ DWORD WINAPI actual_player (LPVOID lpv) { DWORD write_offset; bool buffer_blank; HANDLE *stuff; //[0] = terminate event; [1] = data; [2] = pipe DWORD cb1, cb2; LPVOID pv1, pv2; HRESULT hr; DWORD pplay, pwrite, bytes_in_pipe, bytes_wrote_to_buf, free_for_lock; DWORD bytes_read; char dump[8192]; //FILE *fp = fopen("c:\\check.dat", "rb"); stuff = (HANDLE*)lpv; buffer_blank = true; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_HIGHEST); for (;;) { PeekNamedPipe(stuff[2], 0, 0, 0, &bytes_in_pipe, 0); if (bytes_in_pipe) bytes_in_pipe -= bytes_in_pipe%wfx.nBlockAlign; hr = lpdsb->GetCurrentPosition(&pplay, &pwrite); if (hr==DS_OK) { if (buffer_blank) { lpdsb->SetCurrentPosition(0); zero_output_buffer(); write_offset = 0; free_for_lock = output_buf_size-output_buf_size%wfx.nBlockAlign; } else { // check if write_offset is in a valid place, in case of computer slowness if (pwrite>pplay) if ((write_offset>=pplay)&&(write_offset =pplay) write_offset = pwrite; // in case windows does not copies in n*BlockAlign steps, which // I kinda tested and it seems like it doesn't always do that, so, what the heck if (write_offset && (write_offset%wfx.nBlockAlign)) { write_offset = write_offset+(wfx.nBlockAlign-write_offset%wfx.nBlockAlign); if (write_offset>=output_buf_size) write_offset -= output_buf_size; } // calc how many bytes we can write into the buffer if (write_offset>pplay) free_for_lock = output_buf_size-write_offset+pplay; else free_for_lock = pplay-write_offset; free_for_lock -= free_for_lock%wfx.nBlockAlign; } hr = lpdsb->Lock(write_offset, free_for_lock, &pv1, &cb1, &pv2, &cb2, 0); if (hr==DS_OK) { bytes_wrote_to_buf = 0; if (bytes_in_pipe) { ReadFile(stuff[2], pv1, (bytes_in_pipe>cb1)?cb1:bytes_in_pipe, &bytes_read, 0); //fwrite(pv1, bytes_read, 1, fp); bytes_in_pipe -= bytes_read; bytes_wrote_to_buf += bytes_read; if (bytes_in_pipe && pv2) { ReadFile(stuff[2], pv2, (bytes_in_pipe>cb2)?cb2:bytes_in_pipe, &bytes_read, 0); //fwrite(pv2, bytes_read, 1, fp); bytes_wrote_to_buf += bytes_read; } } if (bytes_wrote_to_buf Unlock(pv1, cb1, pv2, cb2); write_offset += bytes_wrote_to_buf; if (write_offset>=output_buf_size) write_offset -= output_buf_size; if (buffer_blank) { buffer_blank = false; lpdsb->Play(0, 0, DSBPLAY_LOOPING); } } } if (hr==DSERR_BUFFERLOST) { buffer_blank = true; // attempting to restore hr = lpdsb->Restore(); if (hr==DS_OK) continue; } // in case there was a failure on the way, dump 0.2sec of the wavedata // FIX THIS TO READ BLOCK ALIGNED DATA if CB1 is greater than bytes_in_pipe if ((hr!=DS_OK) && bytes_in_pipe) { cb1 = 11025/5*wfx.nBlockAlign; ReadFile(stuff[2], dump, (cb1>bytes_in_pipe)?bytes_in_pipe:cb1, &bytes_read, 0); } // wait for data or termination until timeout // NEED TO ADJUT TIMEOUT ACCORDING TO DS BUFFER SIZE (although, most buffs should be bigger than .25sec) if (WaitForMultipleObjects(2, stuff, 0, 250)==WAIT_OBJECT_0) break; } lpdsb->Stop(); //fclose(fp); ExitThread(0); return 0; } DWORD WINAPI player (LPVOID lpv) // should be renamed to decompressor { DWORD r; HANDLE things[2]; DWORD child_id; HANDLE terminate_child, child; HANDLE write_pipe, read_pipe; HANDLE for_child[3]; // just in case init went bad... if (WaitForSingleObject(hTerminate,0)==WAIT_OBJECT_0) ExitThread(0); for_child[1] = CreateEvent(0, 0, 0, 0); if (!for_child[1]) ExitThread(0); if (CreatePipe(&read_pipe, &write_pipe, 0, 128*1024)) { terminate_child = CreateEvent(0, 0, 0, 0); if (terminate_child) { for_child[0] = terminate_child; for_child[2] = read_pipe; child = CreateThread(0, 0, actual_player, (LPVOID)for_child, 0, &child_id); if (child) { SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); things [0] = hTerminate; things [1] = hData; for (;;) { r = WaitForMultipleObjects (2, things, 0, INFINITE); if (r==WAIT_OBJECT_0) break; EnterCriticalSection (&cs); queue_size--; if (++index_take==MAX_QUEUE) index_take=0; memcpy(ahd.pbSrc, sqq[index_take].data, sqq[index_take].size); ahd.cbSrcLength = sqq[index_take].size; LeaveCriticalSection (&cs); if (acmStreamConvert(hasd, &ahd, ACM_STREAMCONVERTF_BLOCKALIGN)==0) { WriteFile(write_pipe, ahd.pbDst, ahd.cbDstLengthUsed, &r, 0); SetEvent(for_child[1]); } } SetEvent(terminate_child); WaitForSingleObject(child, INFINITE); } } } ExitThread (0); return (0); } DWORD WINAPI receiver (LPVOID lpv) { SOUND_PACKET in_sp; int r; fd_set fs; timeval tv; char mybuf[4096]; int total_size; total_size = 0; SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); while (WaitForSingleObject (hTerminate, 0) != WAIT_OBJECT_0) { FD_ZERO(&fs); FD_SET(ssin, &fs); tv.tv_sec = 1; tv.tv_usec = 0; if (select(0, &fs, 0, 0, &tv)<=0) continue; if (listen_mode) { SOCKADDR_IN si; int l; l = sizeof (SOCKADDR_IN); r = recvfrom (ssin, (char *) &in_sp, sizeof (SOUND_PACKET), 0, (SOCKADDR *) &si, &l); if (r < 5120 && r >= sizeof(SOUND_PACKET_HEADER)) if (in_sp.sph.uid == 0x5A5A) if (in_sp.sph.length==(r-sizeof(SOUND_PACKET_HEADER))) { SOCKADDR_IN ssi; dst_addr = si.sin_addr.S_un.S_addr; ssi.sin_family = AF_INET; ssi.sin_port = htons(IN_PORT); ssi.sin_addr = si.sin_addr; for (int i=0;i<8; i++) ssi.sin_zero[i] = 0; connect (ssin, (SOCKADDR *) &si, sizeof (SOCKADDR_IN)); connect (sout, (SOCKADDR *) &ssi, sizeof (SOCKADDR_IN)); sprintf (dst_addr_txt, "%u.%u.%u.%u", si.sin_addr.S_un.S_un_b.s_b1, si.sin_addr.S_un.S_un_b.s_b2, si.sin_addr.S_un.S_un_b.s_b3, si.sin_addr.S_un.S_un_b.s_b4); PostMessage (talker_dlg, RT_CONNECTED, 0, 0); listen_mode = 0; lpdscb->Start(DSCBSTART_LOOPING); goto DoIt; } } else { r = recv (ssin, (char *) &in_sp, sizeof (SOUND_PACKET), 0); if (r < 5120 && r >= sizeof(SOUND_PACKET_HEADER)) if (in_sp.sph.uid == 0x5A5A) if (in_sp.sph.ordinal_id >= next_receive_id) if (in_sp.sph.length==(r-sizeof(SOUND_PACKET_HEADER))) { DoIt: //if (rand() % 2) continue; // debug thing //if (in_sp.sph.extra&0x0F==1) continue; // debug thing if (in_sp.sph.ordinal_id>next_receive_id) { // the next block allows to play partial seconds of sound if (total_size) { sound_sec scp; for (int m=0,i=0; i<=total_blocks; i++) { if (blocks_got[i]) { memcpy(&scp.data[m], &mybuf[i*MAX_SOUND_DATA], (i==total_blocks)?(total_size-i*MAX_SOUND_DATA):MAX_SOUND_DATA); m += (i==total_blocks)?(total_size-i*MAX_SOUND_DATA):MAX_SOUND_DATA; } scp.size = m; } EnterCriticalSection (&cs); if (index_put!=index_take) { ReleaseSemaphore(hData, 1, 0); memcpy(&sqq[index_put], &scp, sizeof(scp)); if (++index_put == MAX_QUEUE) index_put = 0; queue_size++; PostMessage(talker_dlg, ST_QUE, (WPARAM) queue_size, 0); } LeaveCriticalSection (&cs); int flg; flg = 0; for (int a=0; a<=total_blocks; a++) if (blocks_got[a]) flg++; PostMessage (talker_dlg, ST_SEQ, (WPARAM) next_receive_id, (LPARAM)flg|((total_blocks+1)<<16)); } next_receive_id = in_sp.sph.ordinal_id; for (int y=0; y<16; y++) blocks_got[y] = false; total_size = 0; } if (blocks_got[in_sp.sph.extra&0x0F]) continue; total_blocks = in_sp.sph.extra >> 4; // for now; the code is messy anyway, so who cares if ((in_sp.sph.extra&0x0F)==(in_sp.sph.extra>>4)) total_size = total_blocks*MAX_SOUND_DATA+in_sp.sph.length; else if (total_size==0) total_size = total_blocks*MAX_SOUND_DATA; memcpy(&mybuf[(in_sp.sph.extra & 0x0F)*MAX_SOUND_DATA], in_sp.data, in_sp.sph.length); blocks_got[in_sp.sph.extra&0x0F] = true; for (int r=0; r<=total_blocks;r++) { if (blocks_got[r]==false) break; else if (r==total_blocks) { EnterCriticalSection (&cs); if (index_put!=index_take) { ReleaseSemaphore(hData, 1, 0); sqq[index_put].size = total_size; memcpy(&sqq[index_put].data[0], mybuf, total_size); queue_size++; PostMessage(talker_dlg, ST_QUE, (WPARAM) queue_size, 0); if (++index_put == MAX_QUEUE) index_put = 0; } LeaveCriticalSection (&cs); PostMessage (talker_dlg, ST_SEQ, (WPARAM) in_sp.sph.ordinal_id, ((total_blocks+1)<<16)|(total_blocks+1)); // reset buf for next sec for (int y=0; y<16; y++) blocks_got[y] = false; next_receive_id++; total_size=0; } } } else continue; } } ExitThread (0); return (0); }