Extending .NET Remoting

Summary

Introduction

Using information from the last two sections, this section presents examples of how and where to extend sinks in order to perform custom processing. Examples of how sinks can be extended to perform custom processing include:

As a basic rule, before actually beginning to extend the .NET Remoting Framework, you need to determine whether you will be working on the original message (the underlying dictionary) or on the serialized message that you will get from the stream. For example, when creating a compression/encryption sink, the actual message content is of no importance, instead you need to work with the stream by compressing it at the client side and decompressing it at the server side. But for a debugging and tracing sinks, it is the actual content of the message that is important and not the stream.

As a general pattern, whenever you implement your own sink, you will need to create two classes: the sink itself which, depending on the location of the sink, should derive either from IClientChannelSink or IServerChannelSink, and a provider, which again, depending on the location of the sink, should derive either from IClientChannelSinkProvider or IServerChannelSinkProvider

Compression Sink

Note: The implementation files for the client-side compression sink and the server-side compression sink can be used as starting points to implement custom sinks along with their sink providers.

As mentioned previously, in this scenario it is the stream of the message rather than the message itself that is important. The planned client-side sink chain followed by the planned server-side sink chain are shown below:

From the above, we can identify classes that need to be written:

Implementing Client-Side Sink

Implementing Server-Side Sink

Creating the Sink Providers

Using the Sinks

The following screens illustrate output from both server and client with traces in the custom sinks. In the following screens, the server was started first, followed by the client:

Extending the Compression Sinks

There is at least one serious problem with the above server-side sink: it does not detect if a stream is compressed or not, and will always try to decompress it. If a stream is uncompressed, decompressing it will generate errors.

This leads to the general requirement: As the message object travels back and forth between a client and a server, how can you send extra information in a standard way such as both client and/or server can query those values in and then take appropriate action as and when required? Generally, this can be done by adding additional fields to the ITransportHeaders object that is passed as a parameter to ProcessMessage() and AsyncProcessRequest(). These headers are then transferred to the server-side sink and can be obtained by using the ITransportHeaders object that the it receives as a parameter to tis ProcessMessage() method. By convention, these additional headers should start with the prefix 'X'.

The following shows the relevant methods in the client-side sink (used in the CompressionSinks project) with code to update and check headers as required:

/* client sink. Because we derive from IClientChannelSink interface, we must implement all its methods (AsyncProcessRequest, AsyncProcessResponse, GetRequestStream, ProcessMessage) */
public class CompressionClientSink : BaseChannelSinkWithProperties, IClientChannelSink
{
    ...

    /* IClientChannelSink implementation */

    // Give this sink the chance to process the message synchronously
    public void ProcessMessage( IMessage msg, 
                                ITransportHeaders requestHeaders, Stream requestStream,
                                out ITransportHeaders responseHeaders, out Stream responseStream)
    {
        Console.WriteLine( "CompressionClientSink ProcessMessage" );

        /* TODO: Implement pre-processing before sending the message to the next sink in the chain
        In the compression case, requestStream must be sent to a compression function that returns
        a compressed request stream */
   
     requestStream = SomeCompressionFunction( requestStream );
        requestHeaders["X-Compress"] = "yes";

        /* Call the next sink in the chain */
        snkNext.ProcessMessage( msg, requestHeaders, requestStream, out responseHeaders, out responseStream );

        /* TODO: Processing from sink further in the chain has completed. Now do any post-processing
        before returning to the previous sink in the chain. In the compression case, uncompress the 
        response */
        string strCompress = (string)requestHeaders["X-Compress"];
        if (strCompress != null && strCompress == "yes")

            responseStream = SomeUnCompressionFunction( responseStream );
    }

    // Give this sink the chance to process the message asynchronously
    public void AsyncProcessRequest( IClientChannelSinkStack sinkStack, IMessage msg, 
                                     ITransportHeaders headers, Stream stream)
    {
        Console.WriteLine( "CompressionClientSink AsyncProcessRequest" );

        /* TODO: Implement pre-processing before sending the message to the next sink in the chain. 
        In the compression case, compress the given stream */
        stream = SomeCompressionFunction( stream );
        requestHeaders["X-Compress"] = "yes";

        /* Push this sink into the provided stack in order to be notified when the asynchronous processing
        returns */
        sinkStack.Push( this, null );

        /* Then call the next sink in the chain */
        snkNext.AsyncProcessRequest( sinkStack, msg, headers, stream );
    }

    // Give this sink the chance to process the message asynchronously
    public void AsyncProcessResponse( IClientResponseChannelSinkStack sinkStack, object state, 
                                      ITransportHeaders headers, Stream stream)
    {
        Console.WriteLine( "CompressionClientSink AsyncProcessResponse" );

        /* TODO: Processing from sink further in the chain has completed. Now do any post-processing
        before returning to the previous sink in the chain. In the compression case, uncompress the 
        response */

        string strCompress = (string)requestHeaders["X-Compress"];
        if (strCompress != null && strCompress == "yes")
            stream = SomeUnCompressionFunction( stream );

        /* Then call the next sink via the sink stack. IClientResponseChannelSinkStack.AsyncProcessResponse()
        pops one sink from the stack and calls its AsyncProcessRespone() */

        sinkStack.AsyncProcessResponse( headers, stream );
    }
}

The server-side sink's ProcessMessage() works differently. Recall that when a message reaches this method, it is not yet determined whether the method will be executed synchronously or asynchronously. Therefore, the sink pushes itself into the sink stack that will be used when replying asynchronously.

As the AsyncProcessResponse() method for the sink has to know whether the original request has been compressed or not, you will need to use the second parameter of sinkStack.Push() which is called during ProcessMessage(). In this parameter you can put any object that will let you later determine the state of the request. The server-side implementation for ProcessMessage() and AsyncProcessResponse() is shown below:

/* Server sink. Because we derive from IServerChannelSink interface, we must implement all its methods (AsyncProcessResponse, GetRequestStream, ProcessMessage) */
public class CompressionServerSink : BaseChannelSinkWithProperties, IServerChannelSink
{
    ...

    /* IServerChannelSink implementation */

    // Give this sink the chance to process the message
    public ServerProcessing ProcessMessage( IServerChannelSinkStack sinkStack,
                                            IMessage requestMsg, 
                                            ITransportHeaders requestHeaders, 
                                            Stream requestStream,
                                            out IMessage responseMsg,
                                            out ITransportHeaders responseHeaders, 
                                            out Stream responseStream)
    {
        bool bIsCompressed = false'
        Console.WriteLine( "CompressionServerSink.ProcessMessage" );

        /* TODO: Implement pre-processing before sending the message to the next sink in the chain
        In the compression case, requestStream must be decompressed */
        string strCompress = (string)requestHeaders["X-Compress"];
        if (strCompress != null && strCompress == "yes")
        {
            requestStream = SomeDecompressionFunction( requestStream );
            bIsCompressed = true;
        }

        /* Push this sink into the stack and forward the call to the next sink in the stack */
        sinkStack.Push( this, bIsCompressed );

        /* Call the next sink in the chain */
        ServerProcessing serverProcessing = snkNext.ProcessMessage( sinkStack, 
                                                                    requestMsg, 
                                                                    requestHeaders, 
                                                                    requestStream, 
                                                                    out responseMsg, 
                                                                    out responseHeaders, 
                                                                    out responseStream );

        /* TODO: Processing from sink further in the chain has completed. Now do any post-processing
        before returning to the previous sink in the chain. In the compression case, compress the 
        response */
        if (serverProcessing == ServerProcessing.Complete)
        {
            // Compress the response if necessary
            if (bIsCompressed)
            { 
                responseStream = SomeUnCompressionFunction( responseStream );
                requestHeaders["X-Compress"] = "yes";
            }
        }

        /* Return status information to the previous sink */
        return serverProcessing;
    }

    // Give this sink the chance to process the response to an asynchronous method call
    public void AsyncProcessResponse( IServerResponseChannelSinkStack sinkStack, object state, IMessage msg,
                                      ITransportHeaders headers, Stream stream)
    {
        Console.WriteLine( "CompressionServerSink.AsyncProcessResponse" );

        // Get the compression flag from the state object
        bool bCompressed = (bool)state;

        /* TODO: Do any post-processing before returning to the previous sink in the chain. In the compression case, 
        compress the response */

        if (bCompressed)
        { 
            stream = SomeUnCompressionFunction( stream );
            headers["X-Compress"] = "yes";
        }


        /* Then call the next sink via the sink stack */
        sinkStack.AsyncProcessResponse( msg, headers, stream );
    }

    ...
}

Extending the Sinks Further

What if wanted the compression sinks (or any other custom sink that we develop) to read certain parameters from the configuration file? The following shows a configuration file for a client-side sink that expects to read a parameter telling the sink whether to compress or not:

<configuration>
    <system.runtime.remoting>
        <application>
            <channels>
                <channel ref="http">
                    <clientProviders>
                        <formatter ref="soap"/>
                        <provider type="ClientSideCompressionSink.CompressionClientSinkProvider, Sinks" Compress="yes"/>
                    </clientProviders>
                </channel>
            </channels>

            <client>
                <wellknown type="Shared.SomeSAO, General" url="http://localhost:1234/SomeSAO.soap"/>
            </client>
        </application>
    </system.runtime.remoting>
</configuration>

Any parameters in the <provider> element like the Compress attribute should be accessed fro the sink provider's constructor as shown in the following code fragment:

public class CompressionClientSinkProvider : IClientChannelSinkProvider
{
    /* Data members: A sink provider must always provide a data member to point to the next sink provider
    in the chain */
    IClientChannelSinkProvider prvdrNext;
    string strCompress;

    /* Constructor */
    public CompressionClientSinkProvider(IDictionary properties, ICollection providerData)
    {
        Console.WriteLine( "CompressionClientSinkProvider constructor" );

        // Retrieve the value of the Compress attribute to determine whether the sink should compress streams
        strCompress= (string)properties["Compress"];
    }


    ...
}

Passing Runtime Information

The previous compression sinks were IClientChannelSink and IServerChannelSink, which meant that they worked on the resulting stream after the formatter has serialized the message object. Sinks that implement IMessageSink in contrast, work on the message object before it gets formatted. This means that any changes you make to the contents of the IMessage object will be serialized and therefore reflected in the resulting stream (note: attempting to change the contents of the IMessage object in an IClientChannelSink will not cause this change to be propagated to the server because you are changing the message object after it has been serialized). 

One practical usage would be for client-side IMessageSink objects to pass extra runtime information from the client to the server. In this example, we send the client's thread priority to the server so that remote method calls execute using the supplied thread priority.

Call Contexts 

To send arbitrary data from the client to the server, you can use the Message object's logical call context. A call context provides a set of properties that are carried with the execution code path. A call context is represented by the CallContext class which is a specialized collection object similar to TLS (Thread Local Storage) for method calls. Entries can be added to the CallContext object as it travels down and back up the execution code path.

With respect to .NET Remoting, a CallContext object is passed in the IMessage in the "__CallContext" property entry as it flows between contexts and application domains. Message sinks can add and look up entries in the __CallContext as it passes through the message sink chain. CallContext.SetData and CallContext.GetData are used to manage the call context slots in application code. Note that the data slots are unique per call path, that is, the state is not shared across call paths. When a remote method call is made on an object in another AppDomain, the CallContext class generates a LogicalCallContext instance that travels along with the remote call.

Overall Approach to implementing IMessageSink sinks

As mentioned previously, to send arbitrary data use the CallContext object. To implement a message sink derive a class from IMessageSink. But to implement the provider, you will find that there is no IMessageSinkProvider, so you must implement an IClientChannelSinkProvider (for client-side sinks) or IServerChannelSinkProvider (for server-side sinks), even though the sink is in reality an IMessageSink. The problem arises when looking at the CreateSink function of the IClientChannelSinkProvider (or IServerChannelSinkProvider):

IClientChannelSink CreateSink( ... );

This indicates that CreateSink must return an IClientChannelSink in any case. Therefore, we have to extend the sink to derive not only from IMessageSink but also from IClientChannelSink (or IServerChannelSink).  This means that method for IClientChannelSink (or IServerChannelSink) must be implemented as well. But because we are implementing a message sink that should not be positioned after the formatter, these methods should throw an exception. Therefore, based on the above, the overall approach for implementing an IMessageSink sink is:

The Runtime Info Sink project implements a client-side IMessageSink as discussed above to pass runtime information to the server. On the server, the information is picked up by an IServerChannelSink and not an IMessaeSink (to keep things simple), so the implementation of the server-side sink is very identical to that found in CompressionSink project: