Friday 24 February 2012

Comparing old and new Asynchronous MSMQ Calls using IAsyncResult and the new System.Threading.Tasks.TaskFactory in .Net 4.0.


In this blog entry I thought I would describe the old way of doing Asynchronous Calls and the new way of doing Asynchronous Calls using System.Threading.Tasks.TaskFactory

Recently I have been doing some asynchronous calls using IAsyncResult and MSMQ.
Before I show the code I need to explain how it was done the old fashion way.

In the olden days of .NET 1.0 to 3.5 sp1 you would start by calling a BeginReceive(TimeSpan, Cursor, Object, AsyncCallback) or BeginPeek(TimeSpan, Cursor, Object, AsyncCallback). But because BeginReceive is asynchronous, you can call it to receive a message from the queue without blocking the current thread of execution.
EndPeek is used to read the message that caused the PeekCompleted event to be raised.
Once an asynchronous operation completes, you can call BeginPeek or BeginReceive again in the event handler to keep receiving notifications. The event handler here is the AsyncCallback(void PeekCompleted(IAsyncResult asyncResult)

In that callback of PeekComplete the EndPeek will return the Message from the MSMQ.

Here is the code for .NET 3.5 sp1 or earlier

msgqueueCurrentCursor = queue.CreateCursor();
 
queue.BeginPeek(new TimeSpan(0, 0, 0, 1, 0), msgqueueCurrentCursor, PeekAction.Current, 
0, new AsyncCallback(PeekCompleted));
while (true)
{
     Thread.Sleep(1000);
}
 
 
// Provides an event handler for the PeekCompleted event.
private void PeekCompleted(IAsyncResult asyncResult)
{
   try
   {
      bool bPeeking = false;
 
      // End the asynchronous peek operation.
      Message msg = queue.EndPeek(asyncResult);
 
      lock (queueMessagesToTransmit)
      {
         queueMessagesToTransmit.Add(msg.LookupId, msg);
      }
      bPeeking = true;
      queue.BeginPeek(new TimeSpan(0, 0, 0, 1, 0), msgqueueCurrentCursor, 
PeekAction.Next, 0, new AsyncCallback(PeekCompleted));
   }
   catch (System.Messaging.MessageQueueException msgqueueException)
   {
      if (msgqueueException.MessageQueueErrorCode == 
System.Messaging.MessageQueueErrorCode.IOTimeout)
      {
         queue.BeginPeek(new TimeSpan(0, 0, 0, 1, 0), msgqueueCurrentCursor, 
PeekAction.Next, 0, new AsyncCallback(PeekCompleted));
      }
      else
      {
         Trace.WriteLine(TraceName + string.Format("Message Queue Exception {0}", 
msgqueueException.ToString()));
         return;
      }
   }
   catch (ThreadAbortException thrdExc)
   {
      System.Diagnostics.Trace.WriteLine(TraceName + thrdExc.ToString());
      return;
   }
}

Now with the new Task.Factory in System.Threading.Tasks there is no need for the callback. This is managed with the function Task.Factory.FromAsync as shown below.

public Task<TResult> FromAsync<TResult>(
        IAsyncResult asyncResult,
        Func<IAsyncResult, TResult> endMethod
)

In our case for a BeginReceive from the message queue we could very easily do this without the callback function

Task<Message> ts = Task.Factory.FromAsync<Message>(queue.BeginReceive(), queue.EndReceive);
ts.Wait();
Message msg = ts.Result;

Not there is no typo above the EndReceive is a function parameter.

Now you might be asking this is a blocking call. And the quick answer is yes sort of. The Task.Wait() is actually acts like a Thread.Sleep(). So if you created a thread to do this you wont need to worry about the thread and the code is rather simple as shown above. And other threads can process while you wait for the transfer a message inside the MSMQ.
As with all threads commonly used I have seen they loop with a Thread.Sleep(). This is now no longer needed. This actually gives maximum throughput without blocking therefore getting the best performance for reading from the MSMQ.

Now I include the code in its entirety. This code would be the thread called function running in the background while you do other processing. You would do a Thread.abort() to terminate it.

Here is the code .NET 4.0 or later

try
{
Task<Message> ts = Task.Factory.FromAsync<Message>(queue.BeginPeek(new TimeSpan(1, 0, 0, 1, 0), msgqueueCurrentCursor, PeekAction.Current, 0, null), queue.EndPeek);
ts.Wait();
Message msg = ts.Result;
Console.WriteLine(msg.Id.ToString());
while (true)
{
Task<Message> tsnext = Task.Factory.FromAsync<Message>(queue.BeginPeek(new TimeSpan(1, 0, 0, 1, 0), msgqueueCurrentCursor, PeekAction.Next, 0, null), queue.EndPeek);
      tsnext.Wait();
      Message msg = tsnext.Result;
      Console.WriteLine(msg.Id.ToString());
}
}
catch (ThreadAbortException thrdExc)
{
   System.Diagnostics.Trace.WriteLine(TraceName + thrdExc.ToString());
   return;
}

Please refer to this forum entry where I was unsure of how to accomplish this.


Here is where the new Task.TaskFactory is documented. http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskfactory.aspx

P.S. On a side note I have done this for both sockets, sql and streams. If you need me to blog how to do that then post a comment and I will do my best.

As always happy asynchronous programming

No comments:

Post a Comment