www.pudn.com > NetPaw.rar > async.cpp
/*
Async.cpp: implementation of the CEventSource and CEventCallback classes
Written by Robert Simpson (robert@blackcastlesoft.com)
Created 5/11/2004
Version 1.01 -- Last Modified 6/10/2004
1.01
- Fixed a bug in the async callback wherein I send the m_Param variables
rather than my local copies to the callback. Brainfart!
- Made the class compile cleanly at level 4
- Fixed a problem in CEventSource when multiple event sources are
present. If an event source with no events outstanding were to be
destroyed in a thread in the pool while other event sources were pending
operations, it would artificially reduce its refcount and assert an
error. Had to make sure that not only was it being destroyed by a
thread in the pool, but it that it was THIS particular eventsource
that had a pending overlapped operation on it.
1.0
Initial release
*/
//////////////////////////////////////////////////////////////////////
#include "StdAfx.h"
#include "async.h"
//////////////////////////////////////////////////////////////////////
// CEventSource implementation
//////////////////////////////////////////////////////////////////////
CEventSource::CEventSource()
{
m_nRefCount = 0;
m_hEventFin = NULL;
}
CEventSource::~CEventSource()
{
BOOL bInPool = IsThreadInPool(this);
DWORD dwTls = GetTls();
// Set the Tls value to 0 which will cause the CEventCallback not
// to call ReleaseRef() since we're doing it now.
if (bInPool)
{
TlsSetValue(dwTls, (LPVOID)(DWORD)0);
ReleaseRef();
}
_ASSERTE(m_nRefCount == 0);
}
// Waits for all pending overlapped operations to complete
BOOL CEventSource::WaitForPending(DWORD dwTimeout)
{
BOOL bInPool = IsThreadInPool(this);
// Create a temporary handle to wait on
m_hEventFin = CreateEvent(NULL, FALSE, FALSE, NULL);
// Decrease the refcount manually, which should cause the refcount
// to reach -1 once all operations have been completed.
ReleaseRef();
// Release again if the current thread is in the thread pool,
// indicating we're processing an IOCP at the moment which won't be released
// until sometime after we return from this function.
if (bInPool)
ReleaseRef();
// Wait for the signal that all I/O operations have completed
DWORD dwRet = WaitForSingleObject(m_hEventFin, dwTimeout);
DWORD dwErr = GetLastError();
CloseHandle(m_hEventFin);
// Restore the refcount to its normal state
AddRef();
// Restore the refcount if we're in the threadpool
if (bInPool)
AddRef();
m_hEventFin = NULL;
SetLastError(dwErr);
return (dwRet == WAIT_OBJECT_0);
}
// Called by CEventCallback::Acquire prior to executing an overlapped I/O operation
LONG CEventSource::AddRef()
{
return InterlockedIncrement(&m_nRefCount);
}
// Called by CEventCallback::OnAsyncEvent(), and should only
// be called manually if an overlapped I/O function call failed.
void CEventSource::ReleaseRef()
{
if (InterlockedDecrement(&m_nRefCount) < 0)
{
// If the refcount falls below zero, it means someone is waiting for
// all pending ops to complete, so signal it.
_ASSERTE(m_hEventFin != NULL);
if (m_hEventFin != NULL) SetEvent(m_hEventFin);
}
}
// Determines if the current thread is an IOCP thread
BOOL CEventSource::IsThreadInPool(CEventSource *p)
{
if (GetPool()->IsThreadInPool())
{
// Generic check to see if the thread is a pooled thread
if (p == NULL)
return TRUE;
else
{
// Further check to see if the pending operation is taking
// place on a particular event source
if (TlsGetValue(GetTls()) == (LPVOID)p)
return TRUE;
}
}
return FALSE;
}
// Retrieve the threadpool, it will be initialized on the first request for it
CEventCallback::CBTHREADPOOL *CEventSource::GetPool(void)
{
static CEventCallback::CBTHREADPOOL pool;
if( pool.GetQueueHandle() == NULL )
pool.Initialize();
return &pool;
}
// Retrieves the thread local storage index for CEventCallback
DWORD CEventSource::GetTls()
{
static DWORD dwTls = TLS_OUT_OF_INDEXES;
if (dwTls == TLS_OUT_OF_INDEXES)
dwTls = TlsAlloc();
return dwTls;
}
// Associate a handle with the thread pool
BOOL CEventSource::Attach(HANDLE hSource)
{
HANDLE hIOCP = GetPool()->GetQueueHandle();
return (CreateIoCompletionPort(hSource, hIOCP, 0, 0) != NULL);
}
//////////////////////////////////////////////////////////////////////
// CEventCallback implementation
//////////////////////////////////////////////////////////////////////
CEventCallback::CEventCallback(CEventSource *pEventSrc, ASYNCCALLBACK fncb)
{
ZeroMemory((LPOVERLAPPED)this, sizeof(OVERLAPPED));
// Class is unusable if no source is specified!!
_ASSERTE(pEventSrc != NULL);
m_pSource = pEventSrc;
m_fncb = fncb;
m_nRefCount = -1;
m_Param1 = NULL;
m_Param2 = NULL;
m_Param3 = NULL;
m_Param4 = NULL;
}
CEventCallback::~CEventCallback()
{
_ASSERTE(m_nRefCount == -1);
}
// Called during OnAsyncEvent to signal that we're no longer being used in an active
// I/O request. Optionally releases the I/O event as well.
void CEventCallback::UnAcquire(BOOL bRelease) throw()
{
if (!m_pSource) return;
_ASSERTE(m_nRefCount == 0);
InterlockedDecrement(&m_nRefCount);
if (bRelease) m_pSource->ReleaseRef();
}
// Called by CAsyncWorker to trigger a callback event
void CEventCallback::OnAsyncEvent() throw()
{
_ASSERTE(m_nRefCount != -1); // We were used without being acquired!
// Make a copy of the data here, because when we self-release
// another thread could try and use us before we've made our
// callback. Conversely, someone could destroy us during our
// notification event, thus invalidating the event source before we
// have a chance to release it.
LPVOID Param1 = m_Param1;
LPVOID Param2 = m_Param2;
LPVOID Param3 = m_Param3;
LPVOID Param4 = m_Param4;
CEventSource *pSrc = m_pSource;
DWORD dwTls = CEventSource::GetTls();
// We must release before we make the callback, in case the callback wants to re-use us
// Pass FALSE so as not to release the overlapped operation counter -- we do that after
// the function call instead.
UnAcquire(FALSE);
// Set a flag in this thread indicating we're in a callback
// also used in conjunction with CEventSource's destructor
// If after we've made our callback, the value still remains a 1, then
// we will call ReleaseRef() to release the event.
TlsSetValue(dwTls, (LPVOID)pSrc);
// Make the call
m_fncb(pSrc, Param1, Param2, Param3, Param4, this);
// Signal the event source that we're done processing an IO request
// This must be done after the call is made because the event source may
// be waiting on us to release this IO request so it can terminate.
// If we release prematurely, another thread could destroy the source
// while the above function was executing!
if (TlsGetValue(dwTls) == (LPVOID)pSrc)
pSrc->ReleaseRef();
}
// Acquire use of this callback. Fails if it is part of a pending I/O request already.
// Calling this function also resets the overlapped I/O data
BOOL CEventCallback::Acquire(void)
{
if (!m_pSource) return FALSE;
if (InterlockedIncrement(&m_nRefCount) > 0)
{
InterlockedDecrement(&m_nRefCount);
// Set an error indicating we've been locked already
SetLastError(WSAEALREADY);
return FALSE;
}
ZeroMemory((LPOVERLAPPED)this, sizeof(OVERLAPPED));
// Increment the pending overlapped I/O counter
m_pSource->AddRef();
return TRUE;
}
// Release the callback and release the pending I/O operation
void CEventCallback::UnAcquire(void)
{
UnAcquire(TRUE);
}
// Post this callback to the IOCP threadpool
BOOL CEventCallback::Post()
{
if (!m_pSource) return FALSE;
_ASSERTE(m_nRefCount != -1); // Don't use us unless we've been acquired first!
HANDLE hIOCP = CEventSource::GetPool()->GetQueueHandle();
return PostQueuedCompletionStatus(hIOCP, 0, 0, this);
}
// Static alternate post method, to post a generic overlapped struct to the IOCP thread pool
BOOL CEventCallback::Post(LPOVERLAPPED pOverlapped)
{
HANDLE hIOCP = CEventSource::GetPool()->GetQueueHandle();
return PostQueuedCompletionStatus(hIOCP, 0, 0, pOverlapped);
}