www.pudn.com > NetPaw.rar > async.cpp


/* 
 Async.cpp: implementation of the CEventSource and CEventCallback classes 
 
 Written by Robert Simpson (robert@blackcastlesoft.com) 
 Created 5/11/2004 
 Version 1.01 -- Last Modified 6/10/2004 
 
 1.01 
   - Fixed a bug in the async callback wherein I send the m_Param variables 
     rather than my local copies to the callback.  Brainfart! 
   - Made the class compile cleanly at level 4 
   - Fixed a problem in CEventSource when multiple event sources are 
     present.  If an event source with no events outstanding were to be 
     destroyed in a thread in the pool while other event sources were pending 
     operations, it would artificially reduce its refcount and assert an 
     error.  Had to make sure that not only was it being destroyed by a 
     thread in the pool, but it that it was THIS particular eventsource 
     that had a pending overlapped operation on it. 
 
 1.0 
 Initial release 
*/ 
////////////////////////////////////////////////////////////////////// 
 
#include "StdAfx.h" 
#include "async.h" 
 
////////////////////////////////////////////////////////////////////// 
// CEventSource implementation 
////////////////////////////////////////////////////////////////////// 
CEventSource::CEventSource() 
{ 
	m_nRefCount = 0; 
	m_hEventFin = NULL; 
} 
 
CEventSource::~CEventSource() 
{ 
	BOOL  bInPool = IsThreadInPool(this); 
	DWORD dwTls   = GetTls(); 
 
	// Set the Tls value to 0 which will cause the CEventCallback not 
	// to call ReleaseRef() since we're doing it now. 
	if (bInPool) 
	{ 
		TlsSetValue(dwTls, (LPVOID)(DWORD)0); 
		ReleaseRef(); 
	} 
 
	_ASSERTE(m_nRefCount == 0); 
} 
 
// Waits for all pending overlapped operations to complete 
BOOL CEventSource::WaitForPending(DWORD dwTimeout) 
{ 
	BOOL bInPool = IsThreadInPool(this); 
 
	// Create a temporary handle to wait on 
	m_hEventFin = CreateEvent(NULL, FALSE, FALSE, NULL); 
 
	// Decrease the refcount manually, which should cause the refcount 
	// to reach -1 once all operations have been completed. 
	ReleaseRef(); 
 
	// Release again if the current thread is in the thread pool, 
	// indicating we're processing an IOCP at the moment which won't be released 
	// until sometime after we return from this function. 
	if (bInPool) 
		ReleaseRef(); 
 
	// Wait for the signal that all I/O operations have completed 
	DWORD dwRet = WaitForSingleObject(m_hEventFin, dwTimeout); 
	DWORD dwErr = GetLastError(); 
 
	CloseHandle(m_hEventFin); 
 
	// Restore the refcount to its normal state 
	AddRef(); 
 
	// Restore the refcount if we're in the threadpool 
	if (bInPool) 
		AddRef(); 
 
	m_hEventFin = NULL; 
 
	SetLastError(dwErr); 
 
	return (dwRet == WAIT_OBJECT_0); 
} 
 
// Called by CEventCallback::Acquire prior to executing an overlapped I/O operation 
LONG CEventSource::AddRef() 
{ 
	return InterlockedIncrement(&m_nRefCount); 
} 
 
// Called by CEventCallback::OnAsyncEvent(), and should only 
// be called manually if an overlapped I/O function call failed. 
void CEventSource::ReleaseRef() 
{ 
	if (InterlockedDecrement(&m_nRefCount) < 0) 
	{ 
		// If the refcount falls below zero, it means someone is waiting for 
		// all pending ops to complete, so signal it. 
		_ASSERTE(m_hEventFin != NULL); 
		if (m_hEventFin != NULL) SetEvent(m_hEventFin); 
	} 
} 
 
// Determines if the current thread is an IOCP thread 
BOOL CEventSource::IsThreadInPool(CEventSource *p) 
{ 
	if (GetPool()->IsThreadInPool()) 
	{ 
		// Generic check to see if the thread is a pooled thread 
		if (p == NULL) 
			return TRUE; 
		else 
		{ 
			// Further check to see if the pending operation is taking 
			// place on a particular event source 
			if (TlsGetValue(GetTls()) == (LPVOID)p) 
				return TRUE; 
		} 
	} 
 
	return FALSE; 
} 
 
// Retrieve the threadpool, it will be initialized on the first request for it 
CEventCallback::CBTHREADPOOL *CEventSource::GetPool(void) 
{ 
	static CEventCallback::CBTHREADPOOL pool; 
 
	if( pool.GetQueueHandle() == NULL ) 
		pool.Initialize(); 
 
	return &pool; 
} 
 
// Retrieves the thread local storage index for CEventCallback 
DWORD CEventSource::GetTls() 
{ 
	static DWORD dwTls = TLS_OUT_OF_INDEXES; 
 
	if (dwTls == TLS_OUT_OF_INDEXES) 
		dwTls = TlsAlloc(); 
 
	return dwTls; 
} 
 
// Associate a handle with the thread pool 
BOOL CEventSource::Attach(HANDLE hSource) 
{ 
	HANDLE hIOCP = GetPool()->GetQueueHandle(); 
	return (CreateIoCompletionPort(hSource, hIOCP, 0, 0) != NULL); 
} 
 
 
////////////////////////////////////////////////////////////////////// 
// CEventCallback implementation 
////////////////////////////////////////////////////////////////////// 
CEventCallback::CEventCallback(CEventSource *pEventSrc, ASYNCCALLBACK fncb) 
{ 
	ZeroMemory((LPOVERLAPPED)this, sizeof(OVERLAPPED)); 
 
	// Class is unusable if no source is specified!! 
	_ASSERTE(pEventSrc != NULL); 
 
	m_pSource   = pEventSrc; 
	m_fncb      = fncb; 
	m_nRefCount = -1; 
 
	m_Param1 = NULL; 
	m_Param2 = NULL; 
	m_Param3 = NULL; 
	m_Param4 = NULL; 
} 
 
CEventCallback::~CEventCallback() 
{ 
	_ASSERTE(m_nRefCount == -1); 
} 
 
// Called during OnAsyncEvent to signal that we're no longer being used in an active 
// I/O request.  Optionally releases the I/O event as well. 
void CEventCallback::UnAcquire(BOOL bRelease) throw() 
{ 
	if (!m_pSource) return; 
 
	_ASSERTE(m_nRefCount == 0); 
 
	InterlockedDecrement(&m_nRefCount); 
 
	if (bRelease) m_pSource->ReleaseRef(); 
} 
 
// Called by CAsyncWorker to trigger a callback event 
void CEventCallback::OnAsyncEvent() throw() 
{ 
	_ASSERTE(m_nRefCount != -1); // We were used without being acquired! 
 
	// Make a copy of the data here, because when we self-release 
	// another thread could try and use us before we've made our 
	// callback.  Conversely, someone could destroy us during our 
	// notification event, thus invalidating the event source before we 
	// have a chance to release it. 
	LPVOID Param1 = m_Param1; 
	LPVOID Param2 = m_Param2; 
	LPVOID Param3 = m_Param3; 
	LPVOID Param4 = m_Param4; 
	CEventSource *pSrc = m_pSource; 
	DWORD dwTls = CEventSource::GetTls(); 
 
	// We must release before we make the callback, in case the callback wants to re-use us 
	// Pass FALSE so as not to release the overlapped operation counter -- we do that after 
	// the function call instead. 
	UnAcquire(FALSE); 
 
	// Set a flag in this thread indicating we're in a callback 
	// also used in conjunction with CEventSource's destructor 
	// If after we've made our callback, the value still remains a 1, then 
	// we will call ReleaseRef() to release the event. 
	TlsSetValue(dwTls, (LPVOID)pSrc); 
 
	// Make the call 
	m_fncb(pSrc, Param1, Param2, Param3, Param4, this); 
 
	// Signal the event source that we're done processing an IO request 
	// This must be done after the call is made because the event source may 
	// be waiting on us to release this IO request so it can terminate. 
	// If we release prematurely, another thread could destroy the source 
	// while the above function was executing! 
	if (TlsGetValue(dwTls) == (LPVOID)pSrc) 
		pSrc->ReleaseRef(); 
} 
 
// Acquire use of this callback.  Fails if it is part of a pending I/O request already. 
// Calling this function also resets the overlapped I/O data 
BOOL CEventCallback::Acquire(void) 
{ 
	if (!m_pSource) return FALSE; 
 
	if (InterlockedIncrement(&m_nRefCount) > 0) 
	{ 
		InterlockedDecrement(&m_nRefCount); 
 
		// Set an error indicating we've been locked already 
		SetLastError(WSAEALREADY); 
		return FALSE; 
	} 
 
	ZeroMemory((LPOVERLAPPED)this, sizeof(OVERLAPPED)); 
 
	// Increment the pending overlapped I/O counter 
	m_pSource->AddRef(); 
 
	return TRUE; 
} 
 
// Release the callback and release the pending I/O operation 
void CEventCallback::UnAcquire(void) 
{ 
	UnAcquire(TRUE); 
} 
 
// Post this callback to the IOCP threadpool 
BOOL CEventCallback::Post() 
{ 
	if (!m_pSource) return FALSE; 
 
	_ASSERTE(m_nRefCount != -1); // Don't use us unless we've been acquired first! 
 
	HANDLE hIOCP = CEventSource::GetPool()->GetQueueHandle(); 
	return PostQueuedCompletionStatus(hIOCP, 0, 0, this); 
} 
 
// Static alternate post method, to post a generic overlapped struct to the IOCP thread pool 
BOOL CEventCallback::Post(LPOVERLAPPED pOverlapped) 
{ 
	HANDLE hIOCP = CEventSource::GetPool()->GetQueueHandle(); 
	return PostQueuedCompletionStatus(hIOCP, 0, 0, pOverlapped); 
}