Event Handling

Reactor

Prerequisite Patterns

Adapter, State, Template Method, Singleton, Bridge

Purpose

Allow an event-driven application to efficiently de-multiplex and dispatch service requests arriving concurrently from many clients.

Scenario

The rector pattern is typically applicable in event-drive applications where a server receives multiple service requests concurrently but processes them synchronously and serially. For example, consider a distributed system where a central server logs status information arriving concurrently from remote clients. Clients communicate with the server using a connection-oriented protocol (i.e., TCP/IP):

The logging server can be accessed simultaneously by many remote clients, each of which maintains its own connection with the server. A new client connection is indicated to the server using a CONNECT event, while a request to log client records is indicated to the server using a READ event.

A possible solution might to be use threading: a 'thread per client connection' that allocates a dedicated thread for each connected client. However, using threading in this scenario introduces the following liabilities:

Solution
Summary

Wait synchronously for the arrival of each event (CONNECT or READ) on one or more event sources (handles). De-multiplex and dispatch those events to services that handle them

Detail

Add an event handler for each for each service an application offers. In our example, there will be two handlers, a ConnectHandler to handle connection requests, and a ReadHandler to handle logging requests. Register each event handler with a reactor that uses synchronous event de-multiplexer (i.e., WaitForSingleObject) to wait for events to occur on one or more event sources (i.e., handles). When such an event occurs,  the synchronous event de-multiplexer (WaitForSingleObject) notifies the reactor which then synchronously dispatches (i.e., calls) the event handler associated with the event.

Structure & UML
Components

It is possible to discern five components from the proposed solutions:

Handles
Handles are provided by the operating system to identify event sources. Under Windows, the handles are of type HANDLE.

Synchronous event de-multiplexer
These are provided by the operating system to wait for one or more events to occur on a set of handles. Under Windows, these correspond to WaitForSingleObject / WaitForMultipleObjects.

Event Handler
An abstract base class that defines an interface for processing events that occur on handles

Concrete Event Handler
A class that implements the interface define by Event Handler in an application-specific manner.

Reactor
A class that defines an abstraction for registering/removing event handlers, and running the application's event  loop reactively

UML

 

Behavior
Example

The logging server is connected to its remote clients via sockets handles (not shown in the example) which identify transport endpoint that receive remote client events. In this example, there are two kinds of events originating from remote clients: CONNECT to indicate a  new client connection request and READ to indicate a request to log records. Therefore, there are two events, and hence two handles and two associated handlers.

The socket part of the logging server appropriately sets one of those two handles to the signaled state whenever the corresponding event is received from any client. While the example does not implement any socket activities, it  simulates incoming events by simple function calls that signal the appropriate event handles.

Declaring  & Defining the Event Handlers

An Event Handler can be defined in two ways:

  1. Classes: The event handler is implemented as a  class. This approach makes it easy to subclass event handlers in order to reuse and extend existing components. It also makes it easy to integrate state and methods of any given service into a single component
  2. Functions: The event handler is implemented as a function rather than an object. A pointer to this function makes it easy to register callbacks without having to define new subclasses that inherit from an event handler base class.

Note that the Adapter pattern can be suitably used here to support both approaches simultaneously.

If an Event Handler was defined as a class, there are two approaches to implementing its public interface:

  1. Single Method: The event handler class has only one method, HandleEvent(),  to handle all types of events. The type of the event that has occurred is passed as a parameter. Although the single method approach makes it possible to add new types of events without changing the public interface, it may result in a large multi-conditional if statements or a bloated switch statement. Recall that the State pattern can typically be used to handle this situation.
  2. Multi-Method: The event handler class has a separate method for handling each type of event. This approach may be more extensible because the de-multiplexing is performed by the reactor rather than by a concrete event handler's HandlEvent() implementation.

Note that the Template Method pattern can be suitably used here implement both approaches.

/* EVENT HANDLERS */

/* This event handler is implemented as a class (as opposed to a function). using the Single Method */
class EventHandler
{
// Constructors/Destructors
public:
    virtual ~EventHandler(){}

// Public interface
public:
    // This hook method is dispatched (called) by the reactor to handle a specific event
   
virtual void HandleEvent(HANDLE hEvent /*, eEventTypes eType*/ ) = 0;

    // Returns the event handler's HANDLE
   
virtual HANDLE GetHandle() const = 0;
};

class ConnectHandler : public EventHandler
{
// Constructors/Destructors
public:
    ConnectHandler(Reactor* pReactor);
    ~ConnectHandler(){}

// Public interface
public:
    // This hook method is dispatched by the reactor to handle a specific event
   
void HandleEvent(HANDLE hEvent /*, eEventTypes eType*/ );

    // Returns the event handler's HANDLE
    HANDLE GetHandle() const;

    // Data members
    Reactor* m_pReactor;
    HANDLE m_hConnectHandle;
};

class ReadHandler : public EventHandler
{
// Constructors/Destructors
public:
    ReadHandler(Reactor* pReactor);
    ~ReadHandler(){}

// Public interface
public:
    // This hook method is dispatched by the reactor to handle a specific event
    void HandleEvent(HANDLE hEvent /*, eEventTypes eType*/ );

    // Returns the event handler's HANDLE
    HANDLE GetHandle() const;

// Data members
    Reactor* m_pReactor;
    HANDLE m_hReadHandle;
};

ConnectHandler::ConnectHandler(Reactor* pReactor) : m_pReactor(pReactor)
{
    /* Create a handle associated with this handler ( a manual reset event)
    Handle must be created before Reactor::RegisterHandle() is called */
    m_hConnectHandle = CreateEvent( NULL, TRUE, FALSE, NULL );
    if (!m_hConnectHandle)
        std::cout << "Failed to create ConnectHandler handle" << std::endl;

    // Register this handler with the reactor
    if (m_pReactor)
        m_pReactor->RegisterHandler( this );
}

HANDLE ConnectHandler::GetHandle() const
{
    return m_hConnectHandle;
}

void ConnectHandler::HandleEvent(HANDLE hEvent)
{
    std::cout << "Handling Connect event ..." << std::endl;
    // Do what ever handling necessary here
}

ReadHandler::ReadHandler(Reactor* pReactor) : m_pReactor(pReactor)
{
    /* Create a handle associated with this handler ( a manual reset event)
    Handle must be created before Reactor::RegisterHandle() is called */
    m_hReadHandle = CreateEvent( NULL, TRUE, FALSE, NULL );
    if (!m_hReadHandle)
        std::cout << "Failed to create ConnectHandler handle" << std::endl;

    // Register this handler with the reactor
    if (m_pReactor)
        m_pReactor->RegisterHandler( this );
}

HANDLE ReadHandler::GetHandle() const
{
    return m_hReadHandle;
}

void ReadHandler::HandleEvent(HANDLE hEvent)
{
    std::cout << "Handling Read event ..." << std::endl;
    // Do what ever handling necessary here
}

Declaring  & Defining the Reactor

Recall that the reactor is used by the application to add/remove event handlers and their associated handles, and to invoke the application's event loop:

Methods for adding event handlers can be defined using either or both of the following signatures:

Note that the methods that remove event handlers follow the same logic; the signatures can either be passed an event handler, or a handle.

/* REACTOR */

/* The reactor interface is used in the logging service to define an abstraction for registering/removing event handlers, and running the application's event loop reactively. In this example, the Bridge pattern was not applied in order to simplify the code (frankly, I'm just being lazy) */
class Reactor
{
// Constructors/Destructors
public: 
    virtual ~Reactor(){}

// Public interface
public:
    /* Methods to add/remove event handlers. Uses the Two Parameters approach */
    void RegisterHandler(EventHandler *pEH /*, eEventTypes eEvent*/);
    void RemoveHandler(EventHandler *pEH /*, eEventTypes eEvent*/);

    /* Entry point into the reactive loop */
    void HandleEvents( long lMillisecondsTimeout = 2000 );

    /* Define a single instance */
    static Reactor* Instance();

// Data members
private:
    static Reactor* m_pSingleInstance;

    // TODO: Add some comments on why value is EventHandler* and not EventHandler object.
    /* Each HANDLE serves as a key used to locate the associated event handler */
    typedef std::map<HANDLE, EventHandler*> MAP_HANDLE_EVENTHANDLER;
    MAP_HANDLE_EVENTHANDLER m_mapHandleToHandler;
};

Reactor* Reactor::m_pSingleInstance = 0;

/* Add event handlers. Uses the Two Parameters approach */
void Reactor::RegisterHandler(EventHandler *pEH /*, eEventTypes eEvent*/)
{
    const HANDLE h = pEH->GetHandle();
    m_mapHandleToHandler.insert( MAP_HANDLE_EVENTHANDLER::value_type( h, pEH) );
}

/* Remove event handlers. Uses the Two Parameters approach */
void Reactor::RemoveHandler(EventHandler *pEH /*, eEventTypes eEvent*/)
{
    try
    {
        // Get the event handler's handle
        const HANDLE h = pEH->GetHandle();

        // Use the handle to erase the associated event handler
        m_mapHandleToHandler.erase( h );
    }
    catch( const std::exception& e)
    {
        std::cout << e.what() << std::endl;
    }
}

/* Entry point into the reactive loop. Call a synchronous event de-multiplexer, WaitForMultipleObjects, to wait for one or more events to occur on the reactor's handle set. When a handle is signaled, this method reacts by calling the event  handler associated with the signaled event handle.

Note: In WaitForMultipleObjects, when fWaitAll is FALSE, and multiple objects are  in the signaled state, the function chooses one of the objects to satisfy the wait;  the states of the objects not selected are unaffected */

void Reactor::HandleEvents( long lMSTimeout)
{
    /* Create an array of all registered handles */
    int nIndex       = int();
    int nSize        = m_mapHandleToHandler.size();
    HANDLE *aHandles = new HANDLE[ nSize ];
    MAP_HANDLE_EVENTHANDLER::const_iterator itB = m_mapHandleToHandler.begin();
    const MAP_HANDLE_EVENTHANDLER::iterator itE = m_mapHandleToHandler.end();
    for( ; itB != itE; ++itB )
    {
        aHandles[nIndex] = itB->first;
        ++nIndex;
    }

    /* Wait for up to lMillisecondsTimeout for indication events to occur on the handle set */
    const DWORD dwRet = WaitForMultipleObjects( 
                                    nSize,             // size of array
                                    aHandles,          // array of handles
                                    FALSE,             // do not wait for all handles
                                    (DWORD) lMSTimeout // timeout period
                                            );
    std::cout << std::endl;
    std::cout << "WaitForMultipleObjects() returned!" << std::endl;
    
    if (WAIT_TIMEOUT == dwRet)
    {
        std::cout << "Timed out while waiting for events" << std::endl;
        // other actions ...

    }
    else if ( ( dwRet >= WAIT_OBJECT_0) && (dwRet < WAIT_OBJECT_0 + nSize) )
    {
        // A handle is signaled. Get the handle
        HANDLE h = aHandles[WAIT_OBJECT_0 + dwRet]; 

        std::cout << "Handle " << h << " has been signaled" << std::endl;

        /* Call the handler's event handler. Note how the map is used as an
        associative array */
        m_mapHandleToHandler[h]->HandleEvent( h );

        // Event handled. Reset event to non-signaled state
        if (!ResetEvent( h ) )
            std::cout << "Failed to reset handle" << std::endl;
    }
    else if ( ( dwRet >= WAIT_ABANDONED_0 ) && (dwRet < WAIT_ABANDONED_0 + nSize) )
    {
        // A handle was abandoned. Log error 
        std::cout << "Handle" << WAIT_ABANDONED_0 + dwRet << " was abandoned" << std::endl;

        // other actions ...
    }

    // Cleanup
    delete [] aHandles;
}

/* Define a single instance */
Reactor* Reactor::Instance()
{
    if (!m_pSingleInstance)
        m_pSingleInstance = new Reactor;

    return m_pSingleInstance; 
}

unsigned int __stdcall ThreadFunc(void *);

int main(int argc, char* argv[])
{
    /* Create two event handlers */
    ConnectHandler ch( Reactor::Instance() );
    ReadHandler rh( Reactor::Instance() );

    /* Now that handlers are created, we can start handling all events in 
    a separate thread */
    unsigned int uiThreadID = 0;
    unsigned long ulThreadHandle = _beginthreadex( NULL, 0, ThreadFunc, NULL, 0, &uiThreadID);

    /* Incoming events are simulated by entering characters c for Connect r for Read.
    Any other character terminates the programming */
    char c;
    while ( (std::cin >> c) )
    {
        if (c == 'c')
            SetEvent( ch.GetHandle() );
        else 
        if (c == 'r')
            SetEvent( rh.GetHandle() );
        else
            break; 
    }
    return 0;
}

unsigned int __stdcall ThreadFunc(void* pParam)
{
    // TODO: Need a proper way to terminate this thread
    for( ; ; )
    {
        Reactor::Instance()->HandleEvents( 10000 /*msec*/);    // enters into wait state
    }

    return 0;
}

 

Sample Output 

Entering character 'c' indicates the occurrence of a CONNECT event. Entering character 'r' indicates the occurrence of a READ event. 

       


Variants

The Reactor pattern can have the following variations to support concurrency, reentrancy, and timer-based events:

Thread-safe Reactor

In a multithreaded application, a specific thread may run the reactor's HandleEvents() loop, while other threads may concurrently register/remove event handlers with the reactor. In addition, an event handler called by the reactor, may share state with other threads. Therefore, the following issues must be addressed:

Concurrent Event Handlers

Event handlers can run in their thread of control. This allows the reactor to process (de-multiplex and dispatch) new events concurrently with the processing of event handlers. Active Object, Leader/Follower, and Half-Sync/Half Async patterns can be used to implement concurrent event handlers.

Benefits
Liabilities