Allow an event-driven application to efficiently de-multiplex and dispatch service requests triggered by the completion of asynchronous operations.
The proactor pattern is typically applicable in event-drive applications where a server must perform multiple operations simultaneously, such as a web server . For example, consider a file server that processes requests from multiple clients. When a user wants to download content from the file server, he/she establishes connection with the file server (using sockets), and sends it the required file name. The file server receives the client's connection events, accepts it, reads the required file(s) and returns immediately. The file server sends the contents of the file(s) back to the client when all file(s) have been read

There are at least three ways to implements an asynchronous file server:
Divide the server code into two parts: long-duration operations that execute asynchronously, and completion handlers that process the results of these asynchronous operations.
For every service offered by an application, introduce asynchronous operations that initiate the processing of service requests., along with completion handlers that process completions results. An asynchronous operation (ReadFile, WriteFile) is typically executed by the OS which inserts a completion event containing the result in a system-provided completion event queue.
The queue is waited on by by a system-provided API GetQueuedCompletionStatus which removes a completion event from the queue when the asynchronous operations if completed. The procator can then demultiplex and dispatch this completion event to the application-specific completion handler.
Completion Handler
An ABC (abstract base class) that defines an interface for processing
results of asynchronous operations.
Concrete Completion Handler
A derived class that specializes the implementation of the interface
defined by the Completion Handler Abstract Base Class.
Handles
Handles are provided by the operating system to identify event sources. Under
Windows, the handles are of type HANDLE.
Asynchronous Operation
An asynchronous operation is an operation that executes and returns immediately
without blocking. Results from this asynchronous operations are typically
retrieved via some callback function.
Asynchronous Operation Processor
An OS kernel that processes asynchronous operations. It also generates events to
indicate completions of the asynchronous operation.
Completion Event Queue
An OS-provided queue that buffers events indicating completion of an
asynchronous operation. In Win32 this queue corresponds to a completion port.
Asynchronous Event Demultiplexer
An OS-provided function that waits for completion events to be inserted into the
completion queue when an asynchronous operation is completed. In Win32 this
corresponds to GetQueuedCompletionStatus API
function.
Proactor
A class that call the asynchronous event demultiplexer to deque a completion
event. It then demultiplexes and dispatches completion events to their
corresponding completion handlers.


Define a type to convey the results of asynchronous operations
The adapter pattern can be applied to convert information stored in a completion event into a form suitable for dispatching to the associated completion handler
/* This class is used to encapsulate information used by asynchronous operations. Objects
of this class are used to convey async results to concrete completion handlers. Inheritance
is used to allow applications to add custom state and methods to the results of async
operations */
class CompletionHandler;
class AsyncResult : public OVERLAPPED
{
// Constructor/Destructor
public:
AsyncResult(){}
virtual ~AsyncResult(){}
// Public interface
/* Dispatch (i.e., call) the completion handler */
virtual void Complete() = 0;
/* Get/Set number of bytes to transfer */
DWORD GetBytesTransferred() const { return m_dwBytes; }
void SetBytesTransferred( DWORD dwBytes) {m_dwBytes = dwBytes; }
/* Get/Set status of async operations */
BOOL GetStatus() const { return m_bStatus; }
void SetStatus(BOOL bStatus) { m_bStatus = bStatus; }
/* Get/Set error values if async failed or was cancelled */
DWORD GetError() const { return m_dwErr; }
void SetError(DWORD dwErr) { m_dwErr = dwErr; }
// Data members
private:
DWORD m_dwBytes;
BOOL m_bStatus;
DWORD m_dwErr;
};
/* Asynchronous Completion Token (ACT) for async read */
class AsyncProcessorReadResult : public AsyncResult
{
public:
/* Cache completion handler */
AsyncProcessorReadResult( CompletionHandler* pCH ) : m_pCompletionHandler( pCH ){}
public:
/* Calls HandleEvent() on the cached completion handler */
void Complete();
private:
CompletionHandler* m_pCompletionHandler;
};
/* Use the ACT to complete processing of an async read operation. The ACT delegates
to the cached completion handler */
void AsyncProcessorReadResult::Complete()
{
m_pCompletionHandler->HandleEvent( m_pCompletionHandler->GetHandle(), *this);
};
Define the Completion Handler
Refer to event handlers in the Reactor pattern for a discussion on issues relating to implementing completion handlers
/* CompletionHandler is an ABC that defines an interface for processing async operation
results. In particular, a procator uses the HandleEvent() member function to send completion
events to their associated completion handlers */
class AsyncResult;
class CompletionHandler
{
// Constructors/Destructors
public:
CompletionHandler( ProactorImp* pP ) : m_pProactor( pP ) {}
virtual ~CompletionHandler(){}
// Public interface
public:
virtual void HandleEvent(HANDLE h, AsyncResult& result) = 0;
virtual HANDLE GetHandle() const = 0;
// Data members
protected:
/* protected to allow access for derived classes*/
ProactorImp* m_pProactor;
};
class ReadHandler : public CompletionHandler
{
public:
ReadHandler( ProactorImp* pP) : CompletionHandler( pP ) {}
// Public interface
public:
void HandleEvent(HANDLE h, AsyncResult& result);
HANDLE GetHandle() const;
void Read();
// Data members
private:
AsyncProcessor m_AsyncProcessor;
HANDLE m_hFile;
};
/* Starts an async read operations */
void ReadHandler::Read()
{
// TODO: Need to close file handle
// Create file to read (assume it's always the same file. Could pass as argument
// to Read)
m_hFile = CreateFile( "ReadMe.txt",
// FileName
GENERIC_READ, // Desired access: Read only
FILE_SHARE_READ, // Share mode: only read operations can succeed
NULL,
// Security: Handle cannot be inherited
OPEN_EXISTING, // Creation disposition: Open only existing files
FILE_FLAG_OVERLAPPED,// Asynchronous file operations
NULL);
// Handle to template file
if (INVALID_HANDLE_VALUE == m_hFile)
{
std::ostringstream os;
os << "Failed to crate file: " << GetLastError();
throw std::runtime_error( os.str() );
}
// Initialize the async processor
m_AsyncProcessor.Open( this, m_hFile, m_pProactor);
// Start an async read operation
DWORD dwByteCount = 1024;
char* pBuf = new char[dwByteCount];
m_AsyncProcessor.AsyncRead( pBuf, dwByteCount);
// TODO: Need to free memory
}
/* This is method is dispatched by the proactor singleton after the asynchronous ReadFile
is completed */
void ReadHandler::HandleEvent(HANDLE h, AsyncResult& result)
{
std::cout << "handling read event" << std::endl;
/* Do whatever processing is required on completion of ReadFile. Perhaps access
file data and send over a socket to a remote client, etc. */
}
HANDLE ReadHandler::GetHandle() const
{
return m_hFile;
}
Define the asynchronous operation interface
/* This class generalizes the invocation of async read and write operations.
When an async operation (read or write) finishes, it generates a completion token
that contains the ACT it received when the WIN32 async operation was invokes (see
AsyncRead.) When the proactor's HandleEvents() method removes this event from its
completion queue, it invokes the appropriate AsyncResult::Complete() method which
in turn dispatches the completion handler's HandleEvent() method */
class CompletionHandler;
class AsyncProcessor
{
// Constructors/Destructors
public:
/* Initializes memory */
AsyncProcessor();
// Public interface
public:
/* Initialization */
void Open( CompletionHandler* pCH, HANDLE h, ProactorImp *pP);
/* Invoke an async read operation */
void AsyncRead( void* pBuf, DWORD ulByteCount );
/* Invoke an async write operation */
void AsyncWrite( const void* pBuf, DWORD ulByteCount );
// Data members
private:
CompletionHandler* m_pCH;
HANDLE
m_hHandle;
ProactorImp* m_pProactor;
};
AsyncProcessor::AsyncProcessor()
{
m_pCH = NULL;
m_hHandle = NULL;
m_pProactor = NULL;
}
void AsyncProcessor::Open( CompletionHandler* pCH, HANDLE h, ProactorImp *pP)
{
m_pCH = pCH;
m_hHandle = h;
m_pProactor = pP;
pP->RegisterHandle( h );
}
void AsyncProcessor::AsyncRead( void* pBuf, DWORD dwByteCount )
{
// Create an Asynchronous Completion Token (ACT)
AsyncProcessorReadResult* pAPRR = new AsyncProcessorReadResult( m_pCH );
pAPRR->Offset = 0;
pAPRR->OffsetHigh = 0;
pAPRR->hEvent = NULL;
/* Perform the async read operation. Note that when this async method is
completed, the proactor will be able to obtain the ACT and from it dispatch
the appropriate completion handler */
DWORD dwBytesRead = 0;
if (!ReadFile( m_hHandle, pBuf, dwByteCount, &dwBytesRead, pAPRR))
{
DWORD dwErr = GetLastError();
std::cout << "ReadFile() failed " << dwErr << std::endl;
}
}
void AsyncProcessor::AsyncWrite( const void* pBuf, DWORD ulByteCount )
{
// TODO
}
Define and implement the Proactor interface
/* Proactor Implementation. This class corresponds to the
Implementer class in the Bridge pattern */
class ProactorImp
{
public:
virtual void RegisterHandle(HANDLE h) = 0;
virtual void HandleEvents( /* Time out value
*/ ) = 0;
};
/* Proactor Implementation. This class corresponds to the ConcreteImplementor class in the
Bridge pattern */
class ProactorImp_Win32 : public ProactorImp
{
// Constructors/Destructors
public:
/* Constructor creates a completion port */
ProactorImp_Win32();
// Public interface
public:
/* Associate a handle with the proactor's completion event queue (system queue) */
void RegisterHandle(HANDLE h);
/* Entry point used by application into the event loop */
void HandleEvents( /* Time out value */);
// Data members
private:
/* Win32 Completion Port handle */
HANDLE hCompletionPort;
};
/* The proactor class is used by the main application to invoke an event loop that:
deques completion events from a system queue, retrieve the ACT, determine from the
ACT the appropriate completion handler, and finally invoke the completion handler's
HandleEvent() to finish processing the result of the async operations */
class Proactor
{
// Public interface
public:
/* Associate a handle with the proactor's completion event queue (system queue) */
void RegisterHandle(HANDLE h);
/* Entry point used by application into the event loop. This method uses the system
async event demultiplexer, GetQueuedCompletionStatus() to wait for completion events
to arrive */
void HandleEvents( /* Time out value
*/ );
/* Define a singleton instance */
static ProactorImp* Instance();
// Date members
private:
static ProactorImp* m_pImp;
};
// static member initialization
ProactorImp* Proactor::m_pImp = NULL;
void Proactor::RegisterHandle(HANDLE h)
{
m_pImp->RegisterHandle( h );
}
void Proactor::HandleEvents( /* Time out value */
)
{
m_pImp->HandleEvents();
}
ProactorImp* Proactor::Instance()
{
if (!m_pImp)
m_pImp = new ProactorImp_Win32;
return m_pImp;
}
ProactorImp_Win32::ProactorImp_Win32() : hCompletionPort(NULL)
{
// Create an IO completion port without associating it with a file
hCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0, 0, 0 );
if (!hCompletionPort)
{
std::ostringstream os;
os<< "Failed to create IO Completion Port";
throw std::runtime_error( os.str() );
}
}
void ProactorImp_Win32::RegisterHandle(HANDLE h)
{
/* All completion events that resut from async operations invoked via handle h
will be inserted into the proactor's completion port by the OS */
HANDLE h2 = CreateIoCompletionPort( h,
// File Handle
hCompletionPort, // Existing completion port
0,
// Completion key
0
// Number of concurrent threads
);
}
void ProactorImp_Win32::HandleEvents( /* Time out value */)
{
// Dequeue the next completion event
DWORD dwByteTransferredCount = 0;
OVERLAPPED* pACT;
BOOL bRet = GetQueuedCompletionStatus( hCompletionPort, &dwByteTransferredCount, 0, &pACT, 1000);
if (bRet == 0)
DWORD dwErr = GetLastError();
// Get the async result
AsyncResult *pAR = static_cast<AsyncResult*>(pACT);
if (!pAR)
{
std::cout << "Failed to get AsyncResult from GetQueuedCompletionStatus()" << std::endl;
return;
}
// Update information in the AsyncResult class
pAR->SetStatus( bRet );
if (!bRet)
pAR->SetError( GetLastError() );
else
pAR->SetBytesTransferred( dwByteTransferredCount );
// From the Asycn Result object, invoke the associated completion handler
pAR->Complete();
delete pAR;
}
Sample main application (for illustration purposes only)
int main(int argc, char* argv[])
{
// Invoke an async operation
ReadHandler reader( Proactor::Instance() );
reader.Read();
// Handles the one and only async read
operation. Typically this would be inside another thread
// the runs in an infinite loop
Proactor::Instance()->HandleEvents();
return 0;
}
The Proactor pattern can have the following variations to support concurrency:
See GetQueuedCompletionStatus win32 API with respect to multithreading.
Multiple asynchronous operations can share the same completion handler. To support this, the completion handler must know unambiguously which asynchronous operation has completed. The ACT can be reapplied to disambiguate each asynchronous operation.