Synchronous (request-response) messaging with NMS and ActiveMQ

ActiveMQ / Coding

Recently I’ve had the pleasure (?) of working with and learning a little about using Apache’s ActiveMQ with .NET using NMS.  One of the issues I ran into was how to do synchronous messaging.  This post is based on the following resources:

You’ll need to download the Apache.NMS.ActiveMQ library, which can be found at http://activemq.apache.org/nms/activemq-downloads.html.  When I tested this example I was using version 1.5.1.2341 of Apache.NMS.ActiveMQ and version 1.5.0.2194 of Apache.NMS.  The Apache.NMS library will be included in the “lib” folder of the binary download.  For both the server and client examples you’ll need to add references to Apache.NMS and Apache.NMS.ActiveMQ with .NET framework >= 3.5.

I’m assuming a default setup of ActiveMQ here – just start-it up and go!  I hope that the code comments pretty-much explain the flow of things, but if something doesn’t make sense feel free to ask.  To run the example, start-up the server and then the client and start sending messages!

The Server (Consumer)

First things first, we’ll create the server-end of the application.  This code will setup a consumer to listen for new messages on the send queue as well as a producer which will be used to send response messages back to the client.

public class ActiveMQServer
{
    private string uri, destinationQueue;
    private ISession session;
    private IMessageProducer replyProducer;

    public ActiveMQServer(string uri, string destinationQueue)
    {
        this.uri = uri;
        this.destinationQueue = destinationQueue;

        // Create the connection factory which will be used to create new connections to the queue.  We set the URI to that of the ActiveMQ server.
        var connectionFactory = new ConnectionFactory(uri);
        IConnection connection;

        try
        {
            // Create and open a new connection to ActiveMQ.
            connection = connectionFactory.CreateConnection();
            connection.Start();

            // Create a new session.
            this.session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

            // Get a handle to the destination queue by its queue name.
            var queue = this.session.GetDestination(destinationQueue);

            // Create and setup a producer to send response messages back to the client.  Note that we do not
            // set a destination here.
            this.replyProducer = this.session.CreateProducer();
            this.replyProducer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            // Create and setup a consumer to consume (receive) messages from the queue that the client is sending messages to.
            var consumer = this.session.CreateConsumer(queue);
            // Wire-up an event to be fired when a message is received from the queue.
            consumer.Listener += new MessageListener(consumer_Listener);
        }
        catch (NMSException ex)
        {
            Console.WriteLine("1: {0}", ex);
        }
    }

    /// <summary>
    /// This method is fired upon receipt of a message from the client queue.  It will print the received message to the console and then
    /// send a response message back to the client.
    /// </summary>
    void consumer_Listener(IMessage message)
    {
        try
        {
            // Create the response message.  We'll send a simple text-based message back.
            var response = this.session.CreateTextMessage();

            // Determine the text to send back to the client.
            var textMessage = message as ITextMessage;
            if (textMessage == null)
                response.Text = "Expected to receive an ITextMessage...";
            else
            {
                Console.WriteLine("Received: " + textMessage.Text);
                response.Text = "Thank you for your message: " + textMessage.Text;
            }

            // Set the correlation ID to that of the received message.
            response.NMSCorrelationID = message.NMSCorrelationID;

            // Send the response message to the reply-to destination as received in the message header.  This is the
            // temporary queue that we created in the client application.
            this.replyProducer.Send(message.NMSReplyTo, response);
        }
        catch (NMSException ex)
        {
            Console.WriteLine("2: {0}", ex);
        }
    }
}

The Client (Producer)

Next, we’ll create the client which will push messages into the queue and listen for responses.  This implementation allows for multiple messages to be sent before any responses are received because it uses a response buffer and correlation ID’s to map received messages to their original request counterparts.  Here we’re using temporary queues, which only survive in ActiveMQ as long as the connection is open, to receive messages from the server.  You could also use a permanent queue if you wanted and use a selector to consume the correct response messages for the client.

public class ActiveMQClient
{
    private string uri, destinationQueue;
    private IMessageProducer producer;
    private Dictionary<string, AsyncMessageHelper> responseBuffer;
    private ITemporaryQueue temporaryQueue;
    private ISession session;
    private IConnection connection;

    public ActiveMQClient(string uri, string destinationQueue)
    {
        this.uri = uri;
        this.destinationQueue = destinationQueue;
        this.responseBuffer = new Dictionary<string, AsyncMessageHelper>();

        // Create the connection factory which will be used to create new connections to the queue.  We set the URI to that of the ActiveMQ server.
        var connectionFactory = new ConnectionFactory(this.uri);

        try
        {
            // Create and open a new connection to ActiveMQ.
            connection = connectionFactory.CreateConnection();
            connection.Start();

            // Create a new session.
            this.session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

            // Get a handle to the destination queue by its queue name.
            var destination = session.GetDestination(destinationQueue);

            // Setup a message producer to send message to the queue the server is consuming from.
            this.producer = session.CreateProducer(destination);
            this.producer.DeliveryMode = MsgDeliveryMode.NonPersistent;

            // Create a temporary queue for this session.  This queue will survive until the connection is closed, and then it
            // is automatically deleted by ActiveMQ.  This temporary queue will be used by the consumer (server) to send response
            // messages to.
            this.temporaryQueue = session.CreateTemporaryQueue();

            // Create a consumer to listen for messages on the temporary queue we just created.
            var responseConsumer = session.CreateConsumer(temporaryQueue);
            // Wire up an event which will be fired upon receipt of response messages.
            responseConsumer.Listener += new MessageListener(responseConsumer_Listener);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
    }

    /// <summary>
    /// Disposes of the ActiveMQ connection.
    /// </summary>
    public void Stop()
    {
        try
        {
            this.connection.Dispose();
        }
        catch (Exception)
        {
        }
    }

    /// <summary>
    /// Sends a message and blocks until a response is received or the timeout expires.
    /// </summary>
    /// <param name="message"></param>
    /// <returns></returns>
    public IMessage SendMessage(IMessage message, int timeout = 10000)
    {
        // Create a unique correlation ID which we will use to map response messages to request messages.
        var correlationID = Guid.NewGuid().ToString();
        message.NMSCorrelationID = correlationID;

        // Set the reply-to header to the temporary queue that we created so that the server knows where to return messages.
        message.NMSReplyTo = this.temporaryQueue;

        // Create a new AsyncMessageHelper.  This class is a helper class to make it easier for us to map response messages to request messages.
        using (var asyncMessageHelper = new AsyncMessageHelper())
        {
            // Add the async helper to the response buffer.
            lock (this.responseBuffer)
                this.responseBuffer[correlationID] = asyncMessageHelper;

            // Send the message to the queue.
            this.producer.Send(message);

            // Wait for a response for up to [timeout] seconds.  This blocks until the timeout expires or a message is received (.Set() is called on the trigger then, allowing execution to continue).
            asyncMessageHelper.Trigger.WaitOne(timeout, true);

            // Either the timeout has expired, or a message was received with the same correlation ID as the request message.
            IMessage responseMessage;
            try
            {
                // The Message property on the async helper will not have been set if no message was received within the timeout period.
                if (asyncMessageHelper.Message == null)
                    throw new TimeoutException("Timed out while waiting for a response.");

                // We got the response message, cool!
                responseMessage = asyncMessageHelper.Message;
            }
            finally
            {
                // Remove the async helper from the response buffer.
                lock (this.responseBuffer)
                    this.responseBuffer.Remove(correlationID);
            }

            // Return the response message.
            return responseMessage;
        }
    }

    /// <summary>
    /// Creates a text message for use with the client.
    /// </summary>
    public ITextMessage CreateTextMessage(string text = null)
    {
        return text == null
            ? this.session.CreateTextMessage()
            : this.session.CreateTextMessage(text);
    }

    #region Event Handlers

    /// <summary>
    /// This event will fire when a response message is received from the temporary queue.  It will attempt to map received messages back
    /// to the response buffer.
    /// </summary>
    void responseConsumer_Listener(IMessage message)
    {
        // Look for an async helper with the same correlation ID.
        AsyncMessageHelper asyncMessageHelper;
        lock (this.responseBuffer)
        {
            // If no async helper with the same correlation ID exists, then we've received some erranious message that we don't care about.
            if (!this.responseBuffer.TryGetValue(message.NMSCorrelationID, out asyncMessageHelper))
                return;
        }

        // Set the Message property so we can access it in the send method.
        asyncMessageHelper.Message = message;

        // Fire the trigger so that the send method stops blocking and continues on its way.
        asyncMessageHelper.Trigger.Set();
    }

    #endregion

    #region Nested Classes

    /// <summary>
    /// Helper class which assists with keeping track of message timeouts, blocking the sending thread and carrying messages through the async eventing back
    /// to the synchronous call.
    /// </summary>
    private class AsyncMessageHelper : IDisposable
    {
        public IMessage Message { get; set; }
        public AutoResetEvent Trigger { get; private set; }

        public AsyncMessageHelper()
        {
            this.Trigger = new AutoResetEvent(false);
        }

        #region IDisposable Members

        public void Dispose()
        {
            this.Trigger.Dispose();
            this.Message = null;
        }

        #endregion
    }

    #endregion
}

As you can see, what we’re doing is using an AutoResetEvent to block in the SendMessage() call until either the timeout was reached or a message was received.  This is made possible by the use of the AsyncMessageHelper class which we create an instance of for each outbound message and map it to the correlation ID of the message, and then add it to the response buffer.  When a message is received from the temporary queue, it looks up the AsyncMessageHelper and sets the Message property, and then sets the AutoResetEvent once, releasing the block in the SendMessage() call.

The Final Result

When you run the examples at the bottom of this page, you’ll see the following:

Server

Client

 

Multithreading

I decided to run a test with multithreading using the client to see how it does, and it seems to work quite well out-of-the-box. I used the following code to generate the requests:

static void MultiThreadedTest()
{
    Console.WriteLine("Beginning parallel test...");
    Parallel.For(1, 11, i =>
        {
            var requestMessage = client.CreateTextMessage("Parallel test #" + i);
            var sw = Stopwatch.StartNew();
            try
            {
                Console.WriteLine("=> Sending #{0}", i);
                var responseMessage = client.SendMessage(requestMessage);
                Console.WriteLine("=> Response[{0}ms]: {1}", sw.ElapsedMilliseconds, ((ITextMessage)responseMessage).Text);
            }
            catch (TimeoutException)
            {
                Console.WriteLine(" => Timeout (#{0})!", i);
                return;
            }
        });
    Console.WriteLine("Parallel test completed.");
}

The results printed to the consoles of the client and server were as follows:

Client

Creating and starting the client...
Client started! Press CTRL+C to exit.

Beginning parallel test...
=> Sending #3
=> Sending #1
=> Sending #5
=> Sending #7
=> Sending #9
=> Response[48ms]: ECHO(Parallel test #7)
=> Sending #8
=> Response[56ms]: ECHO(Parallel test #1)
=> Sending #2
=> Response[54ms]: ECHO(Parallel test #5)
=> Sending #6
=> Response[50ms]: ECHO(Parallel test #9)
=> Sending #10
=> Response[3ms]: ECHO(Parallel test #8)
=> Sending #4
=> Response[57ms]: ECHO(Parallel test #3)
=> Response[3ms]: ECHO(Parallel test #2)
=> Response[3ms]: ECHO(Parallel test #6)
=> Response[3ms]: ECHO(Parallel test #10)
=> Response[4ms]: ECHO(Parallel test #4)
Parallel test completed.

Server

Creating and starting the server...
Server started! Press CTRL+C to exit.

Received: Parallel test #7
Received: Parallel test #1
Received: Parallel test #9
Received: Parallel test #5
Received: Parallel test #3
Received: Parallel test #8
Received: Parallel test #2
Received: Parallel test #6
Received: Parallel test #10
Received: Parallel test #4
Cyle


Programming enthusiast. I've been intrigued by computers since I was 12, staying in at recess to code QBASIC on the old Apple II. Background in the payment industry, particularly in card switching and related system architecture. Lover of high-performance distributed architecture. Huge fan of the new NoSQL wave. Open source fanatic.

6 Comments

  1. Chris B
    15 August 11, 12:38pm

    You may find that it behaves quite strangely when you’ve got multiple messages going through your client code simultaneously. You’ll need to ensure you’re using a correlation ID to ensure that you get the correct response to the message you’re sending.

    Ah the joys of async programming.

    My preferred route to this particular pattern is to have a single ActiveMQ session, and a pre-declared request destination in your client, then within your SendMessage() method I’d have:

                var tempQueue = session.CreateTemporaryQueue();
    
                using (IMessageConsumer consumer = session.CreateConsumer(tempQueue))
                using (IMessageProducer producer = session.CreateProducer(destination))
                {
    
                    ITextMessage request = session.CreateTextMessage();
                    request.NMSReplyTo = tempQueue;
    
                    producer.Send(request);
    
                    var message = consumer.Receive(TimeSpan.FromSeconds(60)) as ITextMessage; // Wait a minute for a response
    
                   // Do something with our response
                        tempQueue.Delete(); // Cleanup
    
    }
    

    … this way you let NMS do the donkey work of keeping things synchronised. There’s no noticable speed penalty for this either.

    • 15 August 11, 1:15pm

      There is a correlation ID in there :-)

      public IMessage SendMessage(IMessage message, int timeout = 10000)
          {
              // Create a unique correlation ID which we will use to map response messages to request messages.
              var correlationID = Guid.NewGuid().ToString();
              message.NMSCorrelationID = correlationID;
              ...
      

      So, multiple threads sending messages via the same client at the same time won’t be an issue in that regard.

      Your snippet is perfectly valid – the upside to the example code is that it only uses one temporary queue, consumer and producer for the life of the client vs. creating new ones with each outbound message, which involves several round-trips to the broker (which of course is not really an issue if the broker is local).

      Async programming is a twisted world :-)

  2. Jim Gomes
    22 August 11, 8:08pm

    Great article, and thanks for contributing to the community! About the only thing I would point out is that you have actually programmed to my preferred level of NMS API and referencing the Apache.NMS.ActiveMQ.dll assembly directly is not necessary. Because your example is programmed to the general NMS API, the particular provider can be chosen at runtime. There is only one change that needs to be made to facilitate this:

    var connectionFactory = new ConnectionFactory(uri);

    changes to

    var connectionFactory = NMSConnectionFactory.CreateConnectionFactory(uri);

    This then allows you to use connection URIs like the following:

    activemq:tcp://activemqhost:61616
    activemq:failover:tcp://activemqhost:61616

    or for TIBCO connections:

    ems:tcp://tibcohost:7222

    The NMSConnectionFactory class will parse the URI and instantiate the correct provider implementation at runtime. The rest of your code doesn’t have to change at all since you aren’t using anything that is provider specific. Once this change is made, I think you would then be able to remove the reference to Apache.NMS.ActiveMQ.dll from your projects, and just distribute it with the application.

    • 22 August 11, 8:22pm

      Thanks! That is a good point and I will update the article to reflect your suggestion as soon as I get a spare moment :-)

  3. Justin Teaw
    04 October 12, 9:25pm

    Hi Cyle,

    Thanks for the nice article. I am looking for articles to learn more about the nms api and spring nms. I was wondering if you looked at spring’s nmstemplate class? It seems to provide synchronous message.

    On a similar note, I have been looking to see if there is an easy way to guarantee processing of the message. If the consumer let’s say does go down, the message is not lost and will be processed again from the queue once it comes up again.

    Any thoughts on that?

    Thanks,

    Justin

    • 24 October 12, 2:53pm

      Hi Justin!

      Back when I wrote this article I had a quick look through Spring’s implementation, but my personal stance on Spring is that it is quite a bulky (albeit very feature-rich) framework. That’s why I wanted to go for something a little lighter-weight and straight-to-the-point. The above code will quite efficiently and effectively accomplish both synchronous and asynchronous messaging for you :-) Though note that it was written against an older version of the NMS API than is available today.

      It is actually the broker’s job to guarantee processing (well, delivery) of the message. Whether this works between broker restarts, however, will depend on your persistence/replication configuration in your broker. If your consumer does die, for whatever reason, the broker will queue those messages up and wait for your consumer to come back online and then (re)deliver any messages which it (1) never sent, and/or (2) didn’t receive an acknowledgement for (configuration dependent as well).

      Hope that helps to answer your questions!

      Cheers,
      Cyle

Respond to Justin Teaw