Event Handling

Proactor

Prerequisite Patterns

Adapter, Bridge, Template

Purpose

Allow an event-driven application to efficiently de-multiplex and dispatch service requests triggered by the completion of asynchronous operations.

Scenario

The proactor pattern is typically applicable in event-drive applications where a server must perform multiple operations simultaneously, such as a web server . For example, consider a file server that processes requests from multiple clients. When a user wants to download content from the file server, he/she establishes connection with the file server (using sockets), and sends it the required file name. The file server receives the client's connection events, accepts it, reads the required file(s) and returns immediately. The file server sends the contents of the file(s) back to the client when all file(s) have been read

There are at least three ways to implements an asynchronous file server:

  1. Use the Reactor pattern. The problem with this approach is that it does not scale up to support many simultaneous clients and/or long-duration requests (i.e., very large files). Recall that a reactor serializes all processing at the demultiplexing layer, therefore, only one request can be serviced at any given time.
  2. Potentially more scalable that the Reactor pattern is a form of synchronous multithreading, where a separate server thread processes each client request. Although this is a common concurrency model, problems with efficiency, scalability, and programming complexity may occur.
  3. Scalability can be improved further by using asynchronous I/O. Although more scalable that the previous two approaches, asynchronous programming tends to be more difficult due to the separation in time and space of method invocations and their and their subsequent completion.
Solution
Summary

Divide the server code into two parts: long-duration operations that execute asynchronously, and completion handlers that process the results of these asynchronous operations.

Detail

For every service offered by an application, introduce asynchronous operations that initiate the processing of service requests., along with completion handlers that process completions results. An asynchronous operation (ReadFile, WriteFile) is typically executed by the OS which inserts a completion event containing the result in a system-provided completion event queue.

The queue is waited on by by a system-provided API GetQueuedCompletionStatus which removes a completion event from the queue when the asynchronous operations if completed. The procator can then demultiplex and dispatch this completion event to the application-specific completion handler.

Structure & UML
Components

Completion Handler
An ABC (abstract base class) that defines an interface for processing results of asynchronous operations.

Concrete Completion Handler
A derived class that specializes the implementation of the interface defined by the Completion Handler Abstract Base Class.

Handles
Handles are provided by the operating system to identify event sources. Under Windows, the handles are of type HANDLE.

Asynchronous Operation
An asynchronous operation is an operation that executes and returns immediately without blocking. Results from this asynchronous operations are typically retrieved via some callback function.

Asynchronous Operation Processor
An OS kernel that processes asynchronous operations. It also generates events to indicate completions of the asynchronous operation.

Completion Event Queue
An OS-provided queue that buffers events indicating completion of an asynchronous operation. In Win32 this queue corresponds to a completion port.

Asynchronous Event Demultiplexer
An OS-provided function that waits for completion events to be inserted into the completion queue when an asynchronous operation is completed. In Win32 this corresponds to GetQueuedCompletionStatus API function.

Proactor
A class that call the asynchronous event demultiplexer to deque a completion event. It then demultiplexes and dispatches completion events to their corresponding completion handlers.

UML

 

Behavior
Example

Define a type to convey the results of asynchronous operations

The adapter pattern can be applied to convert information stored in a completion event into a form suitable for dispatching to the associated completion handler

/* This class is used to encapsulate information used by asynchronous operations. Objects of this class are used to convey async results to concrete completion handlers. Inheritance is used to allow applications to add custom state and methods to the results of async operations */
class CompletionHandler;

class AsyncResult : public OVERLAPPED
{
// Constructor/Destructor
public:
    AsyncResult(){}
    virtual ~AsyncResult(){}

// Public interface
    /* Dispatch (i.e., call) the completion handler */
    virtual void Complete() = 0;

    /* Get/Set number of bytes to transfer */
    DWORD GetBytesTransferred() const { return m_dwBytes; }
    void SetBytesTransferred( DWORD dwBytes) {m_dwBytes = dwBytes; }

    /* Get/Set status of async operations */
    BOOL GetStatus() const { return m_bStatus; }
    void SetStatus(BOOL bStatus) { m_bStatus = bStatus; }

    /* Get/Set error values if async failed or was cancelled */
    DWORD GetError() const { return m_dwErr; }
    void SetError(DWORD dwErr) { m_dwErr = dwErr; }

// Data members
private:
    DWORD m_dwBytes;
    BOOL m_bStatus;
    DWORD m_dwErr;
};

/* Asynchronous Completion Token (ACT) for async read */
class AsyncProcessorReadResult : public AsyncResult
{
public:
    /* Cache completion handler */
    AsyncProcessorReadResult( CompletionHandler* pCH ) : m_pCompletionHandler( pCH ){}

public:
    /* Calls HandleEvent() on the cached completion handler */
    void Complete();

private:
    CompletionHandler* m_pCompletionHandler;
};

/* Use the ACT to complete processing of an async read operation. The ACT delegates to the cached completion handler */
void AsyncProcessorReadResult::Complete()
{
    m_pCompletionHandler->HandleEvent( m_pCompletionHandler->GetHandle(), *this);
};

Define the Completion Handler

Refer to event handlers in the Reactor pattern for a discussion on issues relating to implementing completion handlers

/* CompletionHandler is an ABC that defines an interface for processing async operation results. In particular, a procator uses the HandleEvent() member function to send completion events to their associated completion handlers */
class AsyncResult;

class CompletionHandler
{
// Constructors/Destructors
public:
    CompletionHandler( ProactorImp* pP ) : m_pProactor( pP ) {}
    virtual ~CompletionHandler(){}

// Public interface
public:
    virtual void HandleEvent(HANDLE h, AsyncResult& result) = 0;
    virtual HANDLE GetHandle() const = 0;

// Data members
protected:
    /* protected to allow access for derived classes*/
    ProactorImp* m_pProactor;
};

class ReadHandler : public CompletionHandler
{
public:
    ReadHandler( ProactorImp* pP) : CompletionHandler( pP ) {}

// Public interface
public:
    void HandleEvent(HANDLE h, AsyncResult& result);
    HANDLE GetHandle() const;
    void Read();

// Data members
private:
    AsyncProcessor m_AsyncProcessor;
    HANDLE m_hFile;
};

/* Starts an async read operations */
void ReadHandler::Read()
{
    // TODO: Need to close file handle

    // Create file to read (assume it's always the same file. Could pass as argument
    // to Read)
    m_hFile = CreateFile( "ReadMe.txt",         // FileName
                           GENERIC_READ,        // Desired access: Read only
                           FILE_SHARE_READ,     // Share mode: only read operations can succeed
                           NULL,                // Security: Handle cannot be inherited
                           OPEN_EXISTING,       // Creation disposition: Open only existing files
                           FILE_FLAG_OVERLAPPED,// Asynchronous file operations
                           NULL);               // Handle to template file

    if (INVALID_HANDLE_VALUE == m_hFile)
    {
        std::ostringstream os;
        os << "Failed to crate file: " << GetLastError();
        throw std::runtime_error( os.str() );
    }

    // Initialize the async processor
    m_AsyncProcessor.Open( this, m_hFile, m_pProactor);

    // Start an async read operation
    DWORD dwByteCount = 1024;
    char* pBuf        = new char[dwByteCount];
    m_AsyncProcessor.AsyncRead( pBuf, dwByteCount);        // TODO: Need to free memory
}

/* This is method is dispatched by the proactor singleton after the asynchronous ReadFile is completed */
void ReadHandler::HandleEvent(HANDLE h, AsyncResult& result)
{
    std::cout << "handling read event" << std::endl;

    /* Do whatever processing is required on completion of ReadFile. Perhaps access
    file data and send over a socket to a remote client, etc. */
}

HANDLE ReadHandler::GetHandle() const
{
    return m_hFile;
}

Define the asynchronous operation interface

/* This class generalizes the invocation of async read and write operations. When an async operation (read or write) finishes, it generates a completion token that contains the ACT it received when the WIN32 async operation was invokes (see AsyncRead.) When the proactor's HandleEvents() method removes this event from its completion queue, it invokes the appropriate AsyncResult::Complete() method which in turn dispatches the completion handler's HandleEvent() method */
class CompletionHandler;

class AsyncProcessor
{
// Constructors/Destructors
public:
    /* Initializes memory */
    AsyncProcessor();

// Public interface
public:
    /* Initialization */
    void Open( CompletionHandler* pCH, HANDLE h, ProactorImp *pP);

    /* Invoke an async read operation */
    void AsyncRead( void* pBuf, DWORD ulByteCount );

    /* Invoke an async write operation */
    void AsyncWrite( const void* pBuf, DWORD ulByteCount );

// Data members
private:
    CompletionHandler* m_pCH;
    HANDLE             m_hHandle;
    ProactorImp*       m_pProactor;
};

AsyncProcessor::AsyncProcessor()
{
    m_pCH       = NULL;
    m_hHandle   = NULL;
    m_pProactor = NULL;
}

void AsyncProcessor::Open( CompletionHandler* pCH, HANDLE h, ProactorImp *pP)
{
    m_pCH       = pCH;
    m_hHandle   = h;
    m_pProactor = pP;

    pP->RegisterHandle( h );
}

void AsyncProcessor::AsyncRead( void* pBuf, DWORD dwByteCount )
{
    // Create an Asynchronous Completion Token (ACT)
    AsyncProcessorReadResult* pAPRR = new AsyncProcessorReadResult( m_pCH );
    pAPRR->Offset     = 0;
    pAPRR->OffsetHigh = 0;
    pAPRR->hEvent     = NULL;

    /* Perform the async read operation. Note that when this async method is
    completed, the proactor will be able to obtain the ACT and from it dispatch
    the appropriate completion handler */

    DWORD dwBytesRead = 0;
    if (!ReadFile( m_hHandle, pBuf, dwByteCount, &dwBytesRead, pAPRR))
    {
        DWORD dwErr = GetLastError();
        std::cout << "ReadFile() failed " << dwErr << std::endl;
    }
}

void AsyncProcessor::AsyncWrite( const void* pBuf, DWORD ulByteCount )
{
    // TODO
}

Define and implement the Proactor interface

/* Proactor Implementation. This class corresponds to the Implementer class in the Bridge pattern */
class ProactorImp
{
public:
    virtual void RegisterHandle(HANDLE h) = 0;
    virtual void HandleEvents( /* Time out value */ ) = 0;
};

/* Proactor Implementation. This class corresponds to the ConcreteImplementor class in the Bridge pattern */
class ProactorImp_Win32 : public ProactorImp
{
// Constructors/Destructors
public:
    /* Constructor creates a completion port */
    ProactorImp_Win32();

// Public interface
    public:
    /* Associate a handle with the proactor's completion event queue (system queue) */
    void RegisterHandle(HANDLE h);

    /* Entry point used by application into the event loop */
    void HandleEvents( /* Time out value */);

// Data members
private:
    /* Win32 Completion Port handle */
    HANDLE hCompletionPort;
};

/* The proactor class is used by the main application to invoke an event loop that: deques completion events from a system queue, retrieve the ACT, determine from the  ACT the appropriate completion handler, and finally invoke the completion handler's  HandleEvent() to finish processing the result of the async operations */
class Proactor
{
// Public interface
public:
    /* Associate a handle with the proactor's completion event queue (system queue) */
    void RegisterHandle(HANDLE h);

    /* Entry point used by application into the event loop. This method uses the system
    async event demultiplexer, GetQueuedCompletionStatus() to wait for completion events
    to arrive */
    void HandleEvents( /* Time out value */ );

    /* Define a singleton instance */
    static ProactorImp* Instance();

// Date members
private:
    static ProactorImp* m_pImp;
};

// static member initialization
ProactorImp* Proactor::m_pImp = NULL;

void Proactor::RegisterHandle(HANDLE h)
{
    m_pImp->RegisterHandle( h );
}

void Proactor::HandleEvents( /* Time out value */ )
{
    m_pImp->HandleEvents();
}

ProactorImp* Proactor::Instance()
{
    if (!m_pImp)
        m_pImp = new ProactorImp_Win32;

    return m_pImp;
}

ProactorImp_Win32::ProactorImp_Win32() : hCompletionPort(NULL)
{
    // Create an IO completion port without associating it with a file
    hCompletionPort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, 0, 0, 0 );
    if (!hCompletionPort)
    { 
        std::ostringstream os;
        os<< "Failed to create IO Completion Port";
        throw std::runtime_error( os.str() );
    }
}

void ProactorImp_Win32::RegisterHandle(HANDLE h)
{
    /* All completion events that resut from async operations invoked via handle h
    will be inserted into the proactor's completion port by the OS */
    HANDLE h2 = CreateIoCompletionPort( h,                 // File Handle
                                        hCompletionPort,   // Existing completion port
                                        0,                 // Completion key
                                        0                  // Number of concurrent threads
                                      );
}

void ProactorImp_Win32::HandleEvents( /* Time out value */)
{
    // Dequeue the next completion event
    DWORD         dwByteTransferredCount = 0;
    OVERLAPPED*   pACT;
    BOOL bRet = GetQueuedCompletionStatus( hCompletionPort, &dwByteTransferredCount, 0, &pACT, 1000);
    if (bRet == 0)
        DWORD dwErr = GetLastError();

    // Get the async result
    AsyncResult *pAR = static_cast<AsyncResult*>(pACT);
    if (!pAR)
    {
        std::cout << "Failed to get AsyncResult from GetQueuedCompletionStatus()" << std::endl;
        return;
    }

    // Update information in the AsyncResult class
    pAR->SetStatus( bRet );
    if (!bRet)
        pAR->SetError( GetLastError() );
    else
        pAR->SetBytesTransferred( dwByteTransferredCount );

    // From the Asycn Result object, invoke the associated completion handler
    pAR->Complete();

    delete pAR;
}

 Sample main application (for illustration purposes only)

int main(int argc, char* argv[])
{
    // Invoke an async operation
   
ReadHandler reader( Proactor::Instance() );
    reader.Read();

    // Handles the one and only async read operation. Typically this would be inside another thread
    // the runs in an infinite loop

    Proactor::Instance()->HandleEvents();
 
    return 0;
}

Variants

The Proactor pattern can have the following variations to support concurrency:

Concurrent Asynchronous Event Demultiplexer

See GetQueuedCompletionStatus win32 API with respect to multithreading.

Shared Completion Handlers

Multiple asynchronous operations can share the same completion handler. To support this, the completion handler must know unambiguously which asynchronous operation has completed. The ACT can be reapplied to disambiguate each asynchronous operation.

Benefits
Liabilities