Decouple method invocation from method execution to improve concurrency and simplify access to objects that reside in their own threads.
Consider a software module that acts as a gateway between a data supplier (say an exchange supplying electronic market data) and data consumers (say traders using trading applications to view and interact with market data):
Suppliers, consumers, and gateway interact using sockets on TCP which is connection-oriented (similar to a telephone conversation). TCP uses flow-control to ensure that suppliers do not flood consumers with data which can potentially cause blocking and degradation of performance. Therefore, the entire gateway must not block while waiting for TCP to resume data flow over any one connection.
An effective solution to prevent blocking and improve scalability is to introduce concurrency into the gateway design; for each TCP connection between gateway and data consumer, associate a different thread. Threads whose TCP connections are flow controlled can block without affecting the performance of other connections that are not flow controlled.
Assume that the gateway wraps each potential TCP connection to a remote machine with a single object. For each such object, decouple method invocation on this object from method execution. Method invocation should appear in the client's (exchange gateway) main thread, whereas method execution on that object, which will send data to the consumer, should occur in a separate working thread. Note that the working thread will also be running on the client machine and if the method execution blocks, then only that thread will block without any effect on any other thread.
A proxy that represents the interface of the TCP connection object runs in the client main thread, and a servant that provides the object's implementation runs in a different working thread. Now, to transfer information between gateway and a trading machine, the gateway invokes a method on the proxy. The proxy transforms this invocation into a method request, which is deposited into an activation queue by a scheduler. The scheduler, which runs in the same working thread as the servant, continuously monitors the queue and dispatches method requests on to the servant. The client can obtain the result of the method invocation via a future returned by the proxy.
It is possible to discern six components from the proposed solutions:
Servant (active object)
A class that defines actual state and behavior. Methods are invoked on a
servant object through the scheduler when it dequeues method requests from the
activation list. The servant and the scheduler run in the same working thread.
Proxy
A class that provides a placeholder for the servant object in order
to control access to it. The proxy resides in the client's (i.e., gateway)
thread.
Method Request
A class that defines an interface for executing methods on the servant
object. When the client invokes a method on the proxy, it creates a method
request object and initializes it with context information such as the method's
parameters.
Activation List
A container that maintains a bounded buffer of pending method requests created and inserted
by the proxy. The activation list lives in the servant's thread.
Scheduler
Decides which method requests to execute on the servant object. The decision
is based on various criteria such as the order in which methods are called in
the servant object, and/or on certain properties on the servant object such as
its state.
Future
A class responsible for storing the result of a method call on the servant
object.
Assume electronic exchange data arrives at a gateway whose responsibility is to concurrently transmit this data to each registered remote consumer. Using the Active Object pattern, the gateway maintains a proxy for each consumer and a corresponding servant (active object) implemented as a queue for incoming messages. Incoming messages are placed in the active object's queue via the proxy. Concurrency is achieved when method invocation on the proxy and method execution on the servant can run concurrently. The proxy runs in the client thread, while the servant methods are executed in a separate thread.
For simplicity, error checking and exception handling has been kept to a minimum.
I tried to keep the example as simple as possible without obscuring it with error and thread management code. For example, the given example does not contain all required error checking and handling. It also does not contain implementation to properly terminate threads via a PostThreadQuitMessage or some other appropriate means. The example, like all others, are not production-quality. They only serve to illustrate the problem at hand.
/* Servant */
/* DEFINITION: A servant class defines the state and behavior of the active object.
A servant may contain predicate methods to determine when to execute method requests.
EXAMPLE: The servant is a queue of supplier messages. Concurrency is achieved when
the working thread retrieves messages from the servant and transmits them to remote
consumers. Blocking while sending messages will have no effect on the system.
Note that the queue can be made bounded by upper limit on its size (initialized on construction). Items can
be pushed only if current queue size is less than the full size, else an exception can be thrown to indicate
that queue is full. This implementation has not been added for simplicity */
class AO_Servant
{
// Constructor/Destructor
public:
AO_Servant(){}
virtual ~AO_Servant() {}
// Public Interface
public:
bool Empty() const { return queue.empty(); };
// Add/remove message to/from queue
void Put( const std::string & s )
{
std::cout << "pushing " << s << " into queue" << std::endl;
queue.push( s );
}
std::string Get()
{
std::string strQueueItem = "";
if ( !queue.empty() )
{
strQueueItem = queue.front();
// get front item ...
queue.pop();
// then remove it from queue
}
std::cout << "poping" << strQueueItem << " from queue" << std::endl;
return strQueueItem;
}
// Data members
private:
// Messages are formatted as strings.
std::queue<std::string> queue;
};
/* Proxy */
/* DEFINITION: A proxy provides an interface to the Servant's methods. For each method
invocation by a client, the proxy creates a concrete Method Request object and uses
the Scheduler to insert the method request object into the Activation List.
EXAMPLE: AO_Proxy provides an interface for AO_Servant. Note that the proxy need
not serialize access to its methods because it does not change state after it is
created. Multiple client threads in a process can then share the proxy. Note
that it is the Scheduler and the Activation List that are responsible for the necessary
synchronization */
class AO_Proxy
{
// Public Interface (Same as the AO_Servant)
public:
/* Create a Put method request object, and insert into
activation list via the
scheduler */
void Put( const std::string & s );
/* Create a Get method request object, and insert into
activation list via the
scheduler. The Get method request object is initialized with a Message Future
object to hold retrieved result */
MessageHandle Get();
/* Delegate the empty method invocation to the servant */
bool Empty() const;
// Data members
private:
AO_Servant m_Servant;
AO_Shceduler m_Scheduler;
};
void AO_Proxy::Put( const std::string & s )
{
std::cout << "Proxy: Putting " << s << "into a Method Request" << std::endl;
MethodRequest* pobPut = new MRPut( &m_Servant, s);
m_Scheduler.Insert( pobPut );
}
MessageHandle AO_Proxy::Get()
{
std::cout << "Proxy: Getting a Method Request" << std::endl;
MessageHandle result("");
MethodRequest* pobGet = new MRGet( &m_Servant, result);
m_Scheduler.Insert( pobGet );
return result;
}
bool AO_Proxy::Empty() const
{
return m_Servant.Empty();
};
/* Future */
/* Body: Represents the shared object. No class except the handle class can instantiate
and destroy Body class. Therefore, constructors/destructors must be private */
class MessageBody
{
// Constructors/Destructors
private:
/* private access to prevent uncontrolled instantiation AND deletion */
MessageBody(const std::string & str) : strMessage(str) {}
~MessageBody(){}
// Friends
/* Handle has access to Body class internals, including constructor/destructor */
friend class MessageHandle;
// Public interface
public:
std::string GetData() const
{ return strMessage; }
void SetData( const std::string & str) { strMessage = str; }
// Data members
private:
long
m_lRefCounter;
std::string strMessage; /*
Messages are implemented as strings */
};
/* Handle: Provides access to the Body class. Handle class is the only class
allowed to access the Body class. Note that all Handle classes are passed in the
main program by value, and are therefore allocated and destroyed automatically.
The message future is implemented as a counted pointer to simplify memory
management for dynamically allocated MessageHandle objects. */
class MessageHandle
{
// Constructor/Destructor/Operators
public:
MessageHandle( const std::string str );
/* Destroying the MessageHandle means one less user of the MessageBody class.
If the reference count on the Body class is zero, the Body object must be delete */
~MessageHandle();
/* Copying the handle object */
MessageHandle( const MessageHandle & src );
/* Assigning the handle object */
MessageHandle& operator=(const MessageHandle & rhs);
/* Assigning a message result (sets the
Data Ready event) */
MessageHandle& operator=( const std::string & str);
/* The Body class can only be access by the MessageHandle class*/
MessageBody* operator->();
// Public Interface
public:
std::string result(DWORD dwTimeOut) const;
// Data members
private:
MessageBody* m_pBody; // The Body object can only be accessed by
MessageHandle
HANDLE m_hDataReady;
};
MessageHandle::MessageHandle( const std::string str )
{
// Create body
m_pBody = new MessageBody( str );
m_pBody->m_lRefCounter = 1;
// Create data ready event
m_hDataReady = CreateEvent( NULL, TRUE, FALSE, NULL);
if (NULL == m_hDataReady)
std::cout << "Failed to create Data Ready event" << std::endl;
}
/* Destroying the MessageHandle means one less user of the MessageBody class.
If the refernece count on the Body class is zero, the Body object must be delete */
MessageHandle::~MessageHandle()
{
--m_pBody->m_lRefCounter;
if (m_pBody->m_lRefCounter == 0)
delete m_pBody;
}
/* Copying the handle object */
MessageHandle::MessageHandle( const MessageHandle & src )
{
/* Both objects now share the same body. We can increment ref count on either
m_pBody */
m_hDataReady = src.m_hDataReady;
m_pBody = src.m_pBody;
m_pBody->m_lRefCounter++;
}
/* Assigning the handle object */
MessageHandle& MessageHandle::operator=(const MessageHandle & rhs)
{
rhs.m_pBody->m_lRefCounter++;
if (--m_pBody->m_lRefCounter <= 0)
delete m_pBody;
m_pBody = rhs.m_pBody;
m_hDataReady = rhs.m_hDataReady;
return *this;
}
/* Assigning a message result. Note how we set the event here to indicate that
to consumer handler thread that result is available */
MessageHandle& MessageHandle::operator=( const std::string & str)
{
// Initial body with result
m_pBody->SetData( str );
// Inform waiting clients (if any) that result is available
if (! SetEvent( m_hDataReady ) )
std::cout << "Failed to set data ready event" << std::endl;
return *this;
}
/* The Body class can only be access by the MessageHandle class*/
MessageBody* MessageHandle::operator->()
{
return m_pBody;
}
std::string MessageHandle::result(DWORD dwTimeOut) const
{
std::string strResult;
DWORD dwRet = WaitForSingleObject( m_hDataReady, INFINITE /*dwTimeOut*/ );
switch( dwRet )
{
case WAIT_OBJECT_0:
strResult = m_pBody->GetData();
break;
case WAIT_TIMEOUT:
strResult = "Timed Out!";
break;
default:
strResult = "Error waiting for result";
};
// Handle no longer required. Clean it up
if ( !CloseHandle( m_hDataReady ) )
std::cout << "Failed to close handle" << std::endl;
return strResult;
};
/* Method Request */
/* DEFINITION: Method Request declare an interface for all concrete method request
classes. This interface typically declares two methods; IsRunnable() and Call().
Note that there should be one concrete method request class for each method defined
in the proxy.
Note that Method Request is similar to a Command object in the sense that it is used
to call the servant from another thread. */
class MethodRequest
{
// Public interface
public:
/* Checks when it is possible to execute the Method Request object*/
virtual bool IsRunnable() = 0;
/* Execute a method on the servant */
virtual void Call() = 0;
};
/* Class MRPut is a concrete MethodRequest object that corresponds
to AO_Proxy::put(...) */
class MRPut : public MethodRequest
{
// Constructors/Destructor
public:
MRPut(AO_Servant* ser, const std::string & msg) :
m_pServant(ser), m_strResult(msg) {}
// Public interface
public:
/* Since the AO_Servant queue is not bounded by an upper limit, we can always
call its Put() methods (if the queue was size-bounded, we can only call the
servant's Put if the queue is not full */
bool IsRunnable() { return true; }
// Call the appropriate method on the servant
void Call() { m_pServant->Put( m_strResult ); }
// Data members
private:
AO_Servant* m_pServant;
std::string m_strResult;
};
/* Class Get is a concrete MethodRequest object that corresponds
to AO_Proxy::get(...) */
class MRGet : public MethodRequest
{
// Constructors/Destructor
public:
MRGet(AO_Servant* ser, const MessageHandle& msg) : m_pServant(ser), m_Result(msg) {}
// Public interface
public:
// Cannot call a Get method until the queue is not empty
bool IsRunnable(); { return (!m_pServant->Empty());
}
// Call the appropriate method on the servant
void Call() { m_Result = m_pServant->Get(); }
// Data members
private:
AO_Servant* m_pServant;
MessageHandle m_Result;
};
/* Activation List */
/*
DEFINITION: A synchronized buffer that is shared between client threads and the
thread in which the active object and its scheduler run. A scheduler can be used
with timing mechanisms to determine how long it should wait for a method to execute
(this is particularly important is the Activation list is bounded by a specific size
and it needs to wait for its buffer to empty some of its entries
EXAMPLE: The ActivationList class must be thread-safe because it is shared by multiple
threads. It uses a std::list<> as the internal queue */
class ActivationList
{
// Data members
private:
typedef std::list<MethodRequest*> LST_METHODREQUEST;
LST_METHODREQUEST::iterator it;
LST_METHODREQUEST
m_lst;
CRITICAL_SECTION
m_cs;
// Constructors/Destructor
public:
/* Constructors/Destructor: Create/Delete a critical section. Scoped locking is an
ideal pattern for using critical sections in classes */
ActivationList() { InitializeCriticalSection( &m_cs
); }
~ActivationList(); { DeleteCriticalSection( &m_cs );
}
// Public interface
public:
void Insert( MethodRequest* pM );
LST_METHODREQUEST::iterator Remove( LST_METHODREQUEST::iterator );
friend class AO_Shceduler;
};
/* We must serialize all access to the list. Note that push_front is used to insert
items into the front of the list. This is directly related to AO_Shceduler::Dispatch()
which iterates continuously over the Activation list. Note that using push_back() would
cause an infinite loop when AO_Shceduler::Dispatch() iterates over the activation list.
*/
void ActivationList::Insert( MethodRequest* pM )
{
EnterCriticalSection( &m_cs );
m_lst.push_front( pM );
LeaveCriticalSection( &m_cs );
}
/* We must serialize all access to the list */
ActivationList::LST_METHODREQUEST::iterator ActivationList::Remove( LST_METHODREQUEST::iterator it )
{
LST_METHODREQUEST::iterator itRet;
EnterCriticalSection( &m_cs );
itRet = m_lst.erase( it );
LeaveCriticalSection( &m_cs );
return itRet;
}
/* Scheduler */
/* DEFINITION: The scheduler manages the activation list and executes methods whose
synchronization constraints are met. Typically, the scheduler's public interface
provides one method for the proxy to insert method requests into the Activation List
and another method that dispatches method requests to the servant */
class AO_Shceduler
{
// Constructors/Destructors
public:
/* Constructor spins Scheduler thread */
AO_Shceduler();
// public interface
public:
/* Proxy method: The Scheduler and this method are handled in the proxy's thread */
void Insert(MethodRequest* pMR) { m_ActList.Insert( pMR); }
/* Retrieves method request objects from activation list and dispatches them
to their Servant. This method runs in a separate thread */
virtual void Dispatch();
// Helpers
private:
/* Scheduler executes Dispatch() inside this thread function */
static unsigned __stdcall SchedulerThread( void*);
// Data members
private:
ActivationList m_ActList;
unsigned long m_ulThreadHandle;
unsigned int m_uThreadID;
};
/* Creates a new thread to manager the Activation list*/
AO_Shceduler::AO_Shceduler()
{
// Spin new thread
m_ulThreadHandle = _beginthreadex( NULL,
0,
SchedulerThread,
(void*)this,
0,
// Run immediately
& m_uThreadID);
if (0 == m_ulThreadHandle)
{
//
Need proper error checking
std::cout << "ERROR: Failed to create thread" << std::endl;
}
}
/* Scheduler thread manages Activation List through Dispatch() */
unsigned __stdcall AO_Shceduler::SchedulerThread( void* pParam )
{
AO_Shceduler *pOAScheduler = static_cast<AO_Shceduler*>(pParam);
pOAScheduler->Dispatch();
return 0;
}
void AO_Shceduler::Dispatch()
{
/* Iterate
continuously for entries in the Activation List. See Possible
Enhancements item 2 at end of this section to improve performance */
for( ; ; )
{
/* Iterator over the activation list. Note how we remove list items while
iterating over it. See lists in the STL section for an explanation
*/
m_ActList.it = m_ActList.m_lst.begin();
for ( ; m_ActList.it != (m_ActList.m_lst.end()) ; )
{
// Select a method request whose guard evaluates to true
MethodRequest* pCurrent = (MethodRequest*)(*m_ActList.it);
// Output some debugging information
std::cout << "list size is " << m_ActList.m_lst.size() << std::endl;
std::cout << "element type is: " << typeid( *pCurrent ).name() << std::endl;
if (pCurrent->IsRunnable())
{
// This method is runnable. Remove it from the list and run it
m_ActList.it = m_ActList.Remove( m_ActList.it );
pCurrent->Call();
delete pCurrent;
}
}
} // for(;;)
}
/* Wraps a proxy object and runs it
in its own thread. See Consumer Handler
Thread section */
class ConsumerHandler
{
// Constructors/Destructor
public:
// Spawns the active object's thread
ConsumerHandler();
// Public interface
public:
void Put( const std::string & str ) { m_Proxy.Put( str ); }
// Data members
private:
AO_Proxy m_Proxy;
unsigned int m_uiThreadID;
// Helpers
static unsigned __stdcall ActiveObjectThread( void* );
};
/* Constructor spawns active object's thread*/
ConsumerHandler::ConsumerHandler()
{
/* Need proper error checking and
handling. Also need a proper way to terminate
this thread */
_beginthreadex( NULL, 0, ActiveObjectThread, this, 0, &m_uiThreadID);
}
unsigned __stdcall ConsumerHandler::ActiveObjectThread( void* pParam)
{
ConsumerHandler *This = static_cast<ConsumerHandler*>(pParam);
/* Enter an infinite loop to run active object.
Again, need a proper method to
terminate this thread */
for( ; ; )
{
// Obtain message text. result() method blocks 3 seconds until the
// servant executes the method
MessageHandle msgHandle = This->m_Proxy.Get();
std::string strMessage = msgHandle.result( 3000 );
// Now transmit message over to some remote client
// ...
}
return 0;
}
int main(int argc, char* argv[])
{
/* Create a consumer handler. Spawns a thread that waits on
messages dispatched
by the Scheduler */
ConsumerHandler ob;
/* Now send a couple of message to the proxy via the handler. This could be
done inside a thread. In fact, there could be many threads to simulate sending
simultaneous messages to the proxy to a specific consumer
handler*/
ob.Put( " (Message one) " );
ob.Put( " (Message two) " );
ob.Put( " (Message three) " );
/* Infinite loop until user exits program
with 'q' */
char cUserInput;
while (std::cin >> cUserInput)
{
if (cUserInput == 'q')
break;
}
return 0;
}