Concurrency

Active Object

Prerequisite Patterns

Proxy, Factory, Command.

Purpose

Decouple method invocation from method execution to improve concurrency and simplify access to objects that reside in their own threads.

Scenario

Consider a software module that acts as a gateway between a data supplier (say an exchange supplying electronic market data) and data consumers (say traders using trading applications to view and interact with market data):

Suppliers, consumers, and gateway interact using sockets on TCP which is connection-oriented (similar to a telephone conversation). TCP uses flow-control to ensure that suppliers do not flood consumers with data which can potentially cause blocking and degradation of performance. Therefore, the entire gateway must not block while waiting for TCP to resume data flow over any one connection.

An effective solution to prevent blocking and improve scalability is to introduce concurrency into the gateway design; for each TCP connection between gateway and data consumer, associate a different thread. Threads whose TCP connections are flow controlled can block without affecting the performance of other connections that are not flow controlled.

Solution
Summary

Assume that the gateway wraps each potential TCP connection to a remote machine with a single object. For each such object, decouple method invocation on this object from method execution. Method invocation should appear in the client's (exchange gateway) main thread, whereas method execution on that object, which will send data to the consumer, should occur in a separate working thread. Note that the working thread will also be running on the client machine and if the method execution blocks, then only that thread will block without any effect on any other thread.

Detail

A proxy that represents the interface of the TCP connection object runs in the client main thread, and a servant that provides the object's implementation runs in a different working thread. Now, to transfer information between gateway and a trading machine, the gateway invokes a method on the proxy. The proxy transforms this invocation into a method request, which is deposited into an activation queue by a scheduler. The scheduler, which runs in the same working thread as the servant, continuously monitors the queue and dispatches method requests on to the servant. The client can obtain the result of the method invocation via a future returned by the proxy.

Structure & UML
Components

It is possible to discern six components from the proposed solutions:

Servant (active object)
A class that defines actual state and behavior. Methods are invoked on a servant object through the scheduler when it dequeues method requests from the activation list. The servant and the scheduler run in the same working thread.

Proxy
A class that provides a placeholder for the servant object in order to control access to it. The proxy resides in the client's (i.e., gateway) thread.

Method Request
A class that defines an interface for executing methods on the servant object. When the client invokes a method on the proxy, it creates a method request object and initializes it with context information such as the method's parameters.

Activation List
A container that maintains a bounded buffer of pending method requests created and inserted by the proxy. The activation list lives in the servant's thread.

Scheduler
Decides which method requests to execute on the servant object. The decision is based on various criteria such as the order in which methods are called in the servant object, and/or on certain properties on the servant object such as its state.

Future
A class responsible for storing the result of a method call on the servant object.

UML

 

Behavior
  1. Client invokes a method on the proxy which triggers the construction of the Future and MethodRequest objects. The MethodRequest object maintains the method arguments.
  2. The proxy passes the MethodRequest object to the scheduler which enqueues it on the activation list.
  3. The scheduler, which runs continuously in a different thread than its clients, monitors the activation list and determines which method can be invoked. When a method is runnable, the scheduler removes it from the list, and dispatches it to the servant.
  4. The servant executes the requested method and returns its result to be saved in the future object. The scheduler returns to monitor the list for runnable requests.
  5. Clients retrieve results via the future object.
Example

Assume electronic exchange data arrives at a gateway whose responsibility is to concurrently transmit this data to each registered remote consumer. Using the Active Object pattern, the gateway maintains a proxy for each consumer and a corresponding servant (active object) implemented as a queue for incoming messages. Incoming messages are placed in the active object's queue via the proxy. Concurrency is achieved when method invocation on the proxy and method execution on the servant can run concurrently. The proxy runs in the client thread, while the servant methods are executed in a separate thread.

For simplicity, error checking and exception handling has been kept to a minimum.

Thread Details
  1. Main thread: Each proxy object is wrapped in a consumer handler class which runs in the main thread. The main thread places messages in the active object queue via the proxy in the consumer handler.
  2. Scheduler Thread
    Scheduler and Servant objects are instantiated by the proxy object which itself lives in the main thread. While Scheduler and Servant objects live in the same thread as the proxy, the Scheduler spins its own thread in order to manage the Activation List. It is inside this thread that Scheduler dispatches calls to the Servant.
  3. Consumer Handler Thread
    Note from item 2 that the Scheduler thread executes calls on Servant. These Servant calls add/remove entries to the queue and return immediately. The Scheduler could have also been implemented to call a Transmit() method on the Servant that transmits messages over TCP to remote clients. If that was the case, then because the Transmit() method might block, it will be necessary to introduce some event mechanism to inform the sending client of the result of Transmit() method call. To keep this example simple without having to introduce asynchronous event mechanism, we spin another thread, the Customer Handler Thread. This thread is used as follows:

    The proxy is instantiated by a consumer handler class. However, the consumer handler spins its own working thread that gets messages from the active object queue in order to transmit them to remote consumers over TCP. This thread can block while transmitting over TCP, without affecting other consumer handlers.
Note on the given example

I tried to keep the example as simple as possible without obscuring it with error and thread management code. For example, the given example does not contain all required error checking and handling. It also does not contain implementation to properly terminate threads via a PostThreadQuitMessage or some other appropriate means. The example, like all others, are not production-quality. They only serve to illustrate the problem at hand.

Usage

/* Servant */

/* DEFINITION: A servant class defines the state and behavior of the active object. A servant may contain predicate methods to determine when to execute method requests.

EXAMPLE: The servant is a queue of supplier messages. Concurrency is achieved when the working thread retrieves messages from the servant and transmits them to remote consumers. Blocking while sending messages will have no effect on the system. 

Note that the queue can be made bounded by upper limit on its size (initialized on construction). Items can be pushed only if current queue size is less than the full size, else an exception can be thrown to indicate that queue is full. This  implementation has not been added for simplicity */

class AO_Servant
{
// Constructor/Destructor
public:
    AO_Servant(){}
    virtual ~AO_Servant() {}

// Public Interface
public:
    bool Empty() const { return queue.empty(); };

    // Add/remove message to/from queue
   
void Put( const std::string & s )
    { 
        std::cout << "pushing " << s << " into queue" << std::endl;
        queue.push( s );
    }
    std::string Get()
    { 
        std::string strQueueItem = "";
        if ( !queue.empty() )
        {
            strQueueItem = queue.front();     // get front item ...
            queue.pop();                      // then remove it from queue
        }
        std::cout << "poping" << strQueueItem << " from queue" << std::endl;
        return strQueueItem;
    }

// Data members
private:
    // Messages are formatted as strings.
    std::queue<std::string> queue;
};

/* Proxy */

/* DEFINITION: A proxy provides an interface to the Servant's methods. For each method invocation by a client, the proxy creates a concrete Method Request object and uses the Scheduler to insert the method request object into the Activation List.

EXAMPLE: AO_Proxy provides an interface for AO_Servant. Note that the proxy need not serialize access to its methods because it does not change state after it is  created. Multiple client threads in a process can then  share the proxy. Note that it is the Scheduler and the Activation List that are responsible for the necessary synchronization */
class AO_Proxy
{
// Public Interface (Same as the AO_Servant)
public:
    /* Create a Put method request object, and insert into activation list via the 
    scheduler */
   
void Put( const std::string & s );

    /* Create a Get method request object, and insert into activation list via the 
    scheduler. The Get method request object is initialized with a Message Future
    object to hold retrieved result */
    MessageHandle Get();

    /* Delegate the empty method invocation to the servant */
    bool Empty() const;

// Data members
private:
    AO_Servant    m_Servant;
    AO_Shceduler  m_Scheduler;
};

void AO_Proxy::Put( const std::string & s )

    std::cout << "Proxy: Putting " << s << "into a Method Request" << std::endl;

    MethodRequest* pobPut = new MRPut( &m_Servant, s);
    m_Scheduler.Insert( pobPut );
}

MessageHandle AO_Proxy::Get()
{
    std::cout << "Proxy: Getting a Method Request" << std::endl;

    MessageHandle result("");
    MethodRequest* pobGet = new MRGet( &m_Servant, result);
    m_Scheduler.Insert( pobGet );
    return result;
}

bool AO_Proxy::Empty() const 

    return m_Servant.Empty();
};

/* Future */

/* Body: Represents the shared object. No class except the handle class can instantiate and destroy Body class. Therefore, constructors/destructors must be private */
class MessageBody
{
// Constructors/Destructors
private:
    /* private access to prevent uncontrolled instantiation AND deletion */
    MessageBody(const std::string & str) : strMessage(str) {}
    ~MessageBody(){}

// Friends
    /* Handle has access to Body class internals, including constructor/destructor */
   
friend class MessageHandle;

// Public interface
public:
    std::string GetData() const             { return strMessage; }
    void SetData( const std::string & str)  { strMessage = str; }

// Data members
private:
    long         m_lRefCounter;
    std::string  strMessage;     /* Messages are implemented as strings */
};

/* Handle: Provides access to the Body class. Handle class is the only class allowed to access the Body class. Note that all Handle classes are passed in the main program by value, and are therefore allocated and destroyed automatically.

The message future is implemented as a counted pointer to simplify memory management for dynamically allocated MessageHandle objects. */
class MessageHandle
{
// Constructor/Destructor/Operators
public:
    MessageHandle( const std::string str );

    /* Destroying the MessageHandle means one less user of the MessageBody class. 
    If the reference count on the Body class is zero, the Body object must be delete */
    ~MessageHandle();

    /* Copying the handle object */
    MessageHandle( const MessageHandle & src );

    /* Assigning the handle object */
    MessageHandle& operator=(const MessageHandle & rhs);

    /* Assigning a message result (sets the Data Ready event) */
    MessageHandle& operator=( const std::string & str);

    /* The Body class can only be access by the MessageHandle class*/
    MessageBody* operator->();

// Public Interface
public:
    std::string result(DWORD dwTimeOut) const;

// Data members
private:
    MessageBody* m_pBody;       // The Body object can only be accessed by MessageHandle
    HANDLE       m_hDataReady;
};

MessageHandle::MessageHandle( const std::string str )
{
    // Create body
    m_pBody = new MessageBody( str );
    m_pBody->m_lRefCounter = 1;

    // Create data ready event
    m_hDataReady = CreateEvent( NULL, TRUE, FALSE, NULL);
    if (NULL == m_hDataReady)
        std::cout << "Failed to create Data Ready event" << std::endl;
}

/* Destroying the MessageHandle means one less user of the MessageBody class. If the refernece count on the Body class is zero, the Body object must be delete */
MessageHandle::~MessageHandle()
{
    --m_pBody->m_lRefCounter;
    if (m_pBody->m_lRefCounter == 0)
        delete m_pBody;
}

/* Copying the handle object  */
MessageHandle::MessageHandle( const MessageHandle & src )
{
    /* Both objects now share the same body. We can increment ref count on either m_pBody */
    m_hDataReady = src.m_hDataReady;
    m_pBody      = src.m_pBody;
    m_pBody->m_lRefCounter++;
}

/* Assigning the handle object */
MessageHandle& MessageHandle::operator=(const MessageHandle & rhs)
{
    rhs.m_pBody->m_lRefCounter++;
    if (--m_pBody->m_lRefCounter <= 0)
        delete m_pBody;

    m_pBody      = rhs.m_pBody;
    m_hDataReady = rhs.m_hDataReady;
    return *this;
}

/* Assigning a message result. Note how we set the event here to indicate that to consumer handler thread that result is available */
MessageHandle& MessageHandle::operator=( const std::string & str) 
{
    // Initial body with result
    m_pBody->SetData( str );

    // Inform waiting clients (if any) that result is available
    if (! SetEvent( m_hDataReady ) )
        std::cout << "Failed to set data ready event" << std::endl;

    return *this;
}

/* The Body class can only be access by the MessageHandle class*/
MessageBody* MessageHandle::operator->()
{
    return m_pBody;
}

std::string MessageHandle::result(DWORD dwTimeOut) const
{
    std::string strResult;
    DWORD dwRet = WaitForSingleObject( m_hDataReady, INFINITE /*dwTimeOut*/ );
    switch( dwRet )
    {
        case WAIT_OBJECT_0:
            strResult = m_pBody->GetData();
            break;
        case WAIT_TIMEOUT:
            strResult = "Timed Out!";
            break;
        default:
            strResult = "Error waiting for result";
    };

    // Handle no longer required. Clean it up
    if ( !CloseHandle( m_hDataReady ) )
        std::cout << "Failed to close handle" << std::endl;

    return strResult;
};

/* Method Request */

/* DEFINITION: Method Request declare an interface for all concrete method request classes. This interface typically declares two methods; IsRunnable() and Call(). Note that there should be one concrete method request class for each method defined in the proxy.

Note that Method Request is similar to a Command object in the sense that it is used to call the servant from another thread. */
class MethodRequest
{
// Public interface
public:
    /* Checks when it is possible to execute the Method Request object*/
   
virtual bool IsRunnable() = 0;

    /* Execute a method on the servant */
   
virtual void Call() = 0;
};

/* Class MRPut is a concrete MethodRequest object that corresponds to AO_Proxy::put(...) */
class MRPut : public MethodRequest
{
// Constructors/Destructor
public:
    MRPut(AO_Servant* ser, const std::string & msg) : 
        m_pServant(ser), m_strResult(msg) {}

// Public interface
public:
    /* Since the AO_Servant queue is not bounded by an upper limit, we can always
    call its Put() methods (if the queue was size-bounded, we can only call the 
    servant's Put if the queue is not full */
    bool IsRunnable() { return true; }

    // Call the appropriate method on the servant
    void Call() { m_pServant->Put( m_strResult ); }

// Data members
private:
    AO_Servant* m_pServant;
    std::string m_strResult;
};

/* Class Get is a concrete MethodRequest object that corresponds to AO_Proxy::get(...) */
class MRGet : public MethodRequest
{
// Constructors/Destructor
public:
    MRGet(AO_Servant* ser, const MessageHandle& msg) : m_pServant(ser), m_Result(msg) {}

// Public interface
public:
    // Cannot call a Get method until the queue is not empty
    bool IsRunnable(); { return (!m_pServant->Empty()); }

    // Call the appropriate method on the servant
    void Call() { m_Result = m_pServant->Get(); }

// Data members
private:
    AO_Servant*   m_pServant;
    MessageHandle m_Result;
};

/* Activation List */
/*
DEFINITION: A synchronized buffer that is shared between client threads and the thread in which the active object and its scheduler run. A scheduler can be used with timing mechanisms to determine how long it should wait for a method to execute (this is particularly important is the Activation list is bounded by a specific size and it needs to wait for its buffer to empty some of its entries

EXAMPLE: The ActivationList class must be thread-safe because it is shared by multiple
threads. It uses a std::list<> as the internal queue */
class ActivationList
{
// Data members
private:
    typedef std::list<MethodRequest*> LST_METHODREQUEST; 
    LST_METHODREQUEST::iterator it;
    LST_METHODREQUEST           m_lst;
    CRITICAL_SECTION            m_cs;

// Constructors/Destructor
public:
    /* Constructors/Destructor: Create/Delete a critical section. Scoped locking is an 
    ideal pattern for using critical sections in classes */

    ActivationList()     { InitializeCriticalSection( &m_cs ); }
    ~ActivationList();   { DeleteCriticalSection( &m_cs ); }

// Public interface
public:
    void Insert( MethodRequest* pM );
    LST_METHODREQUEST::iterator Remove( LST_METHODREQUEST::iterator );

    friend class AO_Shceduler;
};


/* We must serialize all access to the list. Note that push_front is used to insert items into the front of the list. This is directly related to AO_Shceduler::Dispatch() which iterates continuously over the Activation list. Note that using push_back() would cause an infinite loop when AO_Shceduler::Dispatch() iterates over the activation list. */
void ActivationList::Insert( MethodRequest* pM )
{
    EnterCriticalSection( &m_cs );
        m_lst.push_front( pM );
    LeaveCriticalSection( &m_cs );
}

/* We must serialize all access to the list */
ActivationList::LST_METHODREQUEST::iterator ActivationList::Remove( LST_METHODREQUEST::iterator it )
{
    LST_METHODREQUEST::iterator itRet;

    EnterCriticalSection( &m_cs );
        itRet = m_lst.erase( it );
    LeaveCriticalSection( &m_cs );

    return itRet;
}

/* Scheduler */

/* DEFINITION: The scheduler manages the activation list and executes methods whose synchronization constraints are met. Typically, the scheduler's public interface provides one method for the proxy to insert method requests into the Activation List and another method that dispatches method requests to the servant */
class AO_Shceduler
{
// Constructors/Destructors
public:
    /* Constructor spins Scheduler thread */
    AO_Shceduler();

// public interface
public:
    /* Proxy method: The Scheduler and this method are handled in the proxy's thread */
   
void Insert(MethodRequest* pMR) { m_ActList.Insert( pMR); }

    /* Retrieves method request objects from activation list and dispatches them
    to their Servant. This method runs in a separate thread */
   
virtual void Dispatch();

// Helpers
private:
    /* Scheduler executes Dispatch() inside this thread function */
   
static unsigned __stdcall SchedulerThread( void*);

// Data members
private:
    ActivationList m_ActList;
    unsigned long  m_ulThreadHandle;
    unsigned int   m_uThreadID;
};

/* Creates a new thread to manager the Activation list*/
AO_Shceduler::AO_Shceduler()
{
    // Spin new thread
    m_ulThreadHandle = _beginthreadex( NULL,
                                       0,
                                       SchedulerThread,
                                       (void*)this,
                                       0,                 // Run immediately
                                      
& m_uThreadID);
    if (0 == m_ulThreadHandle)
    {
        // Need proper error checking
        std::cout << "ERROR: Failed to create thread" << std::endl;
    }
}

/* Scheduler thread manages Activation List through Dispatch() */
unsigned __stdcall AO_Shceduler::SchedulerThread( void* pParam )
{
    AO_Shceduler *pOAScheduler = static_cast<AO_Shceduler*>(pParam);
    pOAScheduler->Dispatch();

    return 0;
}

void AO_Shceduler::Dispatch()
{
    /* Iterate continuously for entries in the Activation List. See Possible 
    Enhancements item 2 at end of this section to improve performance */
   
for( ; ; )
    {
        /* Iterator over the activation list. Note how we remove list items while
        iterating over it. See lists in the STL section for an explanation */
        m_ActList.it = m_ActList.m_lst.begin();
        for ( ; m_ActList.it != (m_ActList.m_lst.end()) ; )
        {
            // Select a method request whose guard evaluates to true
            MethodRequest* pCurrent = (MethodRequest*)(*m_ActList.it);

            // Output some debugging information
            std::cout << "list size is " << m_ActList.m_lst.size() << std::endl;
            std::cout << "element type is: " << typeid( *pCurrent ).name() << std::endl;

            if (pCurrent->IsRunnable())
            {
                // This method is runnable. Remove it from the list and run it
                m_ActList.it = m_ActList.Remove( m_ActList.it );
                pCurrent->Call();
                delete pCurrent;
            }
        }
    } // for(;;)
}

/* Wraps a proxy object and runs it in its own thread. See Consumer Handler Thread section */
class ConsumerHandler
{
// Constructors/Destructor
public:
    // Spawns the active object's thread
    ConsumerHandler();

// Public interface
public:
    void Put( const std::string & str ) { m_Proxy.Put( str ); }

// Data members
private:
    AO_Proxy     m_Proxy;
    unsigned int m_uiThreadID;

// Helpers
   
static unsigned __stdcall ActiveObjectThread( void* );
};

/* Constructor spawns active object's thread*/
ConsumerHandler::ConsumerHandler()
{
    /* Need proper error checking and handling. Also need a proper way to terminate
    this thread */
    _beginthreadex( NULL, 0, ActiveObjectThread, this, 0, &m_uiThreadID);
}

unsigned __stdcall ConsumerHandler::ActiveObjectThread( void* pParam)
{
    ConsumerHandler *This = static_cast<ConsumerHandler*>(pParam);

    /* Enter an infinite loop to run active object. Again, need a proper method to 
    terminate this thread */
   
for( ; ; )
    {
        // Obtain message text. result() method blocks 3 seconds until the 
        // servant executes the method
       
MessageHandle msgHandle = This->m_Proxy.Get(); 
        std::string strMessage = msgHandle.result( 3000 );

        // Now transmit message over to some remote client
        // ... 
   

    return 0;
}

int main(int argc, char* argv[])
{
    /* Create a consumer handler. Spawns a thread that waits on messages dispatched
    by the Scheduler */
    ConsumerHandler ob;

    /* Now send a couple of message to the proxy via the handler. This could be 
    done inside a thread. In fact, there could be many threads to simulate sending
    simultaneous messages to the proxy to a specific consumer handler*/
    ob.Put( " (Message one) " );
    ob.Put( " (Message two) " );
    ob.Put( " (Message three) " );

    /* Infinite loop until user exits program with 'q' */
    char cUserInput;
    while (std::cin >> cUserInput)
    {
        if (cUserInput == 'q')
        break;
    } 

    return 0;
}

Possible Enhancements
  1. The active object queue and the Activation list can be made bounded by an upper size. If an upper size was used:
  2. The Scheduler thread invokes Dispatch() which runs in an infinite loop while continuously checking for entries in the Activation List. This continuous checking can be optimized by giving the Activation List an event that it signals every time a MethodRequest object was inserted. OA_Scheduler::Dispatch() can then efficiently wait on this event.
Benefits
Liabilities