MSMQ recovery with .NET

Using MSMQ with message recovery in .NET

MSMQ is a bit old stuff and there are bunch of other products out here which perform better in most of the scenarios, but MSMQ is still used in a lot of solutions and Microsoft still did not announce support ending for MSMQ so it will be for sure some more time around.

One of the stuff that is missing as out of the box feature in MSMQ is recovery. In scenarios you get the message from the queue and try to process, and process fails, in some of the cases you might want to try to process the message again. Of course, you can just re-invoke the processing method and process it again, but if you want to do it with specific delay (if your processing depends on some other remote resource processing result) and you want to repeat that for the certain number of times (not to try re-processing till the end of time).

One of the approaches is to store messages in a temporary collection. Since queue are often read with multiple threads, it means we need to ensure thread safe access to that recovery temporary collection. The easiest way is to use ConcurrentQueue which is basically simple in-memory queue you can use in code.
The disadvantage of this code is that recovery is not safe, meaning if anything goes bad and you end up with code crash, you in memory queue is lost.
Second disadvantage is that in case of high message number, if you keep failing t process, your in-memory queue will grow and use more of memory, which will potentially cause you code to crash leading to first disadvantage and recovery data loss.

Recovery

The second approach I come up with is a bit more complex but a lot safer in terms of recovery data storing. I decided to go with two MSMQs. First queue is processing queue from which messages are read for processing while second is used for recovery and messages are checked for age and number of retrials.
Once message is old enough for re-processing it is picked up from the recovery queue and moved to processing queue. Checking the age and number of retrials is done by peeking the recovery queue.

Note

To improve performances of the MSMQ operations and still keep the messages readable from MSMQ MMC, I am using JSO image fromatter which is described in this article /aspnet/2015/october/msmq-json-message-formatter/

To make all these data available some additional steps need to done. MSMQ message itself does not have retrial count, so this value needs be a part of the object which is carried by the MSMQ message.


RecoverableMessage


namespace Msmq.MessageRecovery
{
    public abstract class RecoverableMessage
    {
        public DateTime OriginalTimeCreated { get; set;}
        public int RetryCount { get; set; }
        

        public RecoverableMessage()
        {
            
        }

        public void IncreaseRetryCount()
        {
            this.RetryCount += 1;
        }
    }
}

    

All objects which are stored in processing MSMQ must inherit this class to ensure retrial count value available. One more step is limiting the add method to only types which inherit this class in the wrapper class which will be described in details.

Since we have two MSMQs, the operations are split in two wrapper/manager classes which will take care of their own queue (processing and recovery MSMQ)

ProcessingManager


namespace Msmq.MessageRecovery.QueueManagers
{

    internal class ProcessingManager<T> : IDisposable where T : RecoverableMessage
    {
        public delegate void ProcessingMessageHandler(T message);
        public delegate void ProcessingFaileMessageHandler(T message, Exception ex);

        public event ProcessingMessageHandler MessageProcessed;
        public event ProcessingFaileMessageHandler MessageProcessingFailed;

        private String messageQueuePath;
        private String MessageQueuePath
        {
            get
            {
                return messageQueuePath;
            }
        }


        private int numberOfReadingTasks = 1;
        private int NumberOfReadingTasks
        {
            get
            {
                return numberOfReadingTasks;
            }
        }

        private bool isDisposing = false;

        private MessageQueue processingQueue;
        CancellationTokenSource cancelTokenSource;
        CancellationToken cancelToken;
        private IMessageProcessor<T> messageProcessor;


        public ProcessingManager(string messageQueuePath, IMessageProcessor<T> messageProcessor, int numberOfReadingTasks = 1)
        {
            if (numberOfReadingTasks < 1)
                throw new ArgumentOutOfRangeException("numberOfReadingTasks", "You need to have at least one task for reding from the processing queue");

            
            cancelTokenSource = new CancellationTokenSource();
            cancelToken = cancelTokenSource.Token;

            this.messageProcessor = messageProcessor;
            this.numberOfReadingTasks = numberOfReadingTasks;
            this.messageQueuePath = messageQueuePath;
            processingQueue = new MessageQueue(this.messageQueuePath);
            processingQueue.Formatter = new Formatters.JsonMessageFormatter<T>();


            for (int t = 0; t < this.numberOfReadingTasks; t++)
            {
                Task.Factory.StartNew(() =>
                {
                    while (!cancelToken.IsCancellationRequested)
                    {
                        Message processingMessage = processingQueue.Receive();

                        if (processingMessage != null)
                        {
                            T message = processingQueue.Formatter.Read(processingMessage) as T;

                            if (message != null)
                            {
                                if (!messageProcessor.ProcessMessage(message))
                                {
                                //Put to recovery queue
                                if (MessageProcessingFailed != null)
                                    {
                                        MessageProcessingFailed(message, messageProcessor.GetProcessingException());
                                    }

                                }
                                else
                                {
                                //Processing succeeded 
                                if (MessageProcessed != null)
                                    {
                                        MessageProcessed(message);
                                    }
                                }

                            }
                        }
                    }
                }, cancelToken);
            }
        }


        public void AddMessage(T message)
        {
            this.processingQueue.Send(message);
        }

        public void Dispose()
        {
            if (!isDisposing)
            {
                isDisposing = true;
            }
        }
    }
}

    

ProcessingManager class is a class which is simpler than recovery. The responsibilities of this class are:

  • Allow adding only object of type which inherits RecoverableMessage
  • Reading messages from processing queue and sending them to process class for processing
  • Raise event in case of successful or failed processing providing the failed object itself in the event arguments.

IMessageProcessor

IMessageProcessor interface is used for the processor class which will receive object from the message body for processing. Since constructor of ProcessingManager class requires IMessageProcessor, you can create your own class which implements your own processing logic for the data carried in MSMQ message body.


namespace Msmq.MessageRecovery
{
    public interface IMessageProcessor<T> where T : RecoverableMessage
    {
        bool ProcessMessage(T message);
        Exception GetProcessingException();
    }
}

    

RecoveryManager

This class is responsible for managing failed messages and sending them back to re-processing.

  • Receive message for reprocessing
  • Check if maximum retry count is reached
  • Raise event that maximum number of retrials reached
  • Check if the message age is reached and message should be send for re-processing
  • Raise and event when putting message for re-processing

namespace Msmq.MessageRecovery.QueueManagers
{

    
    internal class RecoveryManager<T> : IDisposable where T : RecoverableMessage
    {
        public delegate void ReprocessMessageHandler(T message);
        public event ReprocessMessageHandler ReprocessMessage;

        private TimeSpan messageProcessDelay;
        public TimeSpan MessageProcessDelay
        {
            get
            {
                return messageProcessDelay;
            }
        }

        private String messageQueuePath;
        private String MessageQueuePath
        {
            get
            {
                return messageQueuePath;
            }
        }

        private bool isDisposing = false;
        private MessageQueue recoveryQueue;
        CancellationTokenSource cancelTokenSource;
        CancellationToken cancelToken;

        public RecoveryManager(string messageQueuePath, TimeSpan messageProcessDelay)
        {
            cancelTokenSource = new CancellationTokenSource();
            cancelToken = cancelTokenSource.Token;

            this.messageQueuePath = messageQueuePath;
            this.messageProcessDelay = messageProcessDelay;

            recoveryQueue = new MessageQueue(this.messageQueuePath);
            recoveryQueue.Formatter = new Formatters.JsonMessageFormatter<T>();
            recoveryQueue.MessageReadPropertyFilter.ArrivedTime = true;
            recoveryQueue.DefaultPropertiesToSend.Priority = MessagePriority.High;

            Task.Factory.StartNew(() =>
            {
                MessageEnumerator enumerator = recoveryQueue.GetMessageEnumerator2();

                while (!cancelToken.IsCancellationRequested)
                {

                    while (enumerator.MoveNext())
                    {
                        Message recoveryMessage = recoveryQueue.Peek();

                        if (recoveryMessage != null && DateTime.Now - recoveryMessage.ArrivedTime >= this.messageProcessDelay)
                        {
                            Message recoveryProcessMessage = recoveryQueue.ReceiveById(recoveryMessage.Id);
                            if (recoveryProcessMessage != null)
                            {
                                T message = recoveryMessage.Body as T;
                                //Put back to processing que
                                if (message != null && ReprocessMessage != null)
                                {
                                    ReprocessMessage(message);
                                }
                            }
                        }
                    }

                    enumerator.Reset();

                    Thread.Sleep(this.messageProcessDelay);
                }

            }, cancelToken);
        }

        public void AddMessage(T message)
        {
            //Increase the count
            message.IncreaseRetryCount();
            this.recoveryQueue.Send(message);
        }

        public void Dispose()
        {
            if (!isDisposing)
            {
                cancelTokenSource.Cancel();
                isDisposing = true;
            }
        }
    }
}

    

MessageManager

To simplify logic for the outside world, additional class is added to orchestrate between these two manager classes. If you check again, ProcessingManager and RecoveryManager are internal classes and accessible only inside the same library dll.
On the other hand, MessageManager is public class which is accessible from outside. The constructor itself requires all the constructor parameters for ProcessingManager and RecoveryManager since it creates instances of these two.


namespace Msmq.MessageRecovery
{

    public class MessageManager<T> where T : RecoverableMessage
    {
        #region Events

        public delegate void MessageEventHandler(T message);
        public delegate void MessageExceptionEventHandler(T message, Exception ex);

        /// <summary>
        /// Messsage processing failed and set for recovery
        /// </summary>
        public event MessageExceptionEventHandler ProcessingMessageFailed;

        /// <summary>
        /// Message processing faile with no recovery option
        /// </summary>
        public event MessageExceptionEventHandler ProcessingMessageFailedUnrecoverable;

        /// <summary>
        /// Message from processing queue processed
        /// </summary>
        public event MessageEventHandler ProcessingMessageSucceeded;

        /// <summary>
        /// Message took from recovery to processing queue
        /// </summary>
        public event MessageEventHandler ReprocessMessage;

        /// <summary>
        /// Maximum recovery count reached
        /// </summary>
        public event MessageEventHandler MaxReprocessCountReached;
        #endregion

        #region Fields
        TimeSpan recoveryDelay;
        RecoveryManager<T> recoveryManager;
        ProcessingManager<T> processingManager;
        int MaxRetryCount;


        #endregion

        #region Properties

        public List<Type> RecoverableExceptions
        {
            get; set;
        }

        #endregion

        #region Constructors
        public MessageManager(String receivingQueuePath, String recoveryQueuePath, IMessageProcessor<T> messageProcessor) :
            this(receivingQueuePath, recoveryQueuePath, messageProcessor, TimeSpan.FromMinutes(1), 3)
        {

        }
        public MessageManager(String receivingQueuePath, String recoveryQueuePath, IMessageProcessor<T> messageProcessor, TimeSpan recoveryDelay, int maxRetryCount, int numberOfReadingTasks = 1)
        {
            if (messageProcessor == null)
                throw new ArgumentNullException("messageProcessor", "messageProcessor cannot be null");

            if (maxRetryCount <= 0)
                throw new ArgumentOutOfRangeException("maxRetryCount", "maxRetryCount must be an integer value starting from 1");

            this.recoveryDelay = recoveryDelay;
            this.MaxRetryCount = maxRetryCount;
            processingManager = new ProcessingManager<T>(receivingQueuePath, messageProcessor, numberOfReadingTasks);
            processingManager.MessageProcessingFailed += ProcessingManager_MessageProcessingFailed;
            processingManager.MessageProcessed += ProcessingManager_MessageProcessed;

            recoveryManager = new RecoveryManager<T>(recoveryQueuePath, this.recoveryDelay);
            recoveryManager.ReprocessMessage += RecoveryManager_ReprocessMessage;
        }

        #endregion

        #region Methods

        public void Add(T message)
        {
            this.processingManager.AddMessage(message);
        }

        #endregion



        private void RecoveryManager_ReprocessMessage(T message)
        {
            if (this.ReprocessMessage != null)
            {
                this.ReprocessMessage(message);
            }
            processingManager.AddMessage(message);
        }

        private void ProcessingManager_MessageProcessed(T message)
        {
            if (this.ProcessingMessageSucceeded != null)
            {
                this.ProcessingMessageSucceeded(message);
            }
        }

        private void ProcessingManager_MessageProcessingFailed(T message, Exception ex)
        {
            if (this.RecoverableExceptions != null &&
                this.RecoverableExceptions.Any() &&
                !this.RecoverableExceptions.Contains(ex.GetType())) //Check if exception type is recoverable
            {
                if (this.ProcessingMessageFailedUnrecoverable != null)
                {
                    this.ProcessingMessageFailedUnrecoverable(message, ex);
                }
            }
            else {
                if (message.RetryCount < MaxRetryCount)
                {
                    if (this.ProcessingMessageFailed != null)
                    {
                        this.ProcessingMessageFailed(message, ex);
                    }
                    recoveryManager.AddMessage(message);
                }
                else
                {
                    //Max retry count excided
                    if (this.MaxReprocessCountReached != null)
                    {
                        this.MaxReprocessCountReached(message);
                    }
                }
            }
        }
    }
}

    

In the end, to use the msmq recovery we just need to create an instance of ProcessingManager and add handle methods for its events.

Sample code

To start use the code we need to make a small adjustments to follow our logic in application. First we need to create our own message content object which inherits RecoverableMessage class.

namespace MsmqRecoveryTest
{
    public class DummyMessageModel: RecoverableMessage
    {
        public Guid Key { get; set; }
    }
}

    

Second thing is the message processor which will process the message from MSMQ. This processor needs to implement IMessageProcessor. For the test purposes we will just simulate message failing by returning False value from the ProcessMessage method.


namespace MsmqRecoveryTest
{
    public class DummyMessageProcessor : IMessageProcessor<DummyMessageModel>
    {
        public Exception GetProcessingException()
        {
            return new ArgumentOutOfRangeException("Some dummy exception");
        }

        public bool ProcessMessage(DummyMessageModel message)
        {
            throw new NotImplementedException();
        }

    }
}

    

Now we have all the necessary elements to start using the MSMQ recovery

namespace MsmqRecoveryTest{class Program{static void Main(string[] args){string processQueuePath = @".\private$\messages.toprocess";string recoveryQueuePath = @".\private$\messages.torecover";


            var recMgr = new MessageManager<DummyMessageModel>(
                processQueuePath,
                recoveryQueuePath,
                new DummyMessageProcessor(),
                TimeSpan.FromSeconds(5),
                3
                );

            recMgr.RecoverableExceptions = new List<Type>() {
                typeof(ArgumentNullException),
                typeof(ArgumentOutOfRangeException)
            };

            recMgr.ProcessingMessageFailedUnrecoverable += RecMgr_ProcessingMessageFailedUnrecoverable;
            recMgr.ProcessingMessageFailed += RecMgr_ProcessingMessageFailed;
            recMgr.ReprocessMessage += RecMgr_ReprocessMessage;
            recMgr.MaxReprocessCountReached += RecMgr_MaxReprocessCountReached;

            for (int i = 0; i < 1; i++)
            {
                recMgr.Add(new DummyMessageModel() { Key = Guid.NewGuid() });
            }

            Console.ReadLine();
        }

     

        private static void RecMgr_ProcessingMessageFailed(DummyMessageModel message, Exception ex)
        {
            Console.WriteLine("{0} Message processing failed", DateTime.Now.ToLongTimeString());
        }

        private static void RecMgr_ReprocessMessage(DummyMessageModel message)
        {
            Console.WriteLine("{0} Reprocessing message from recovery queue", DateTime.Now.ToLongTimeString());
        }

        private static void RecMgr_MaxReprocessCountReached(DummyMessageModel message)
        {
            Console.WriteLine("{0} Max number of recovery attemts reached", DateTime.Now.ToLongTimeString());
        }

        private static void RecMgr_ProcessingMessageFailedUnrecoverable(DummyMessageModel message, Exception ex)
        {
            Console.WriteLine("{0} UNRECOVERABLE EXCEPTION OCCURED", DateTime.Now.ToLongTimeString());
        }
    }
}

    

The latest code can be downloaded from this article or from GitHub from project URL https://github.com/dejanstojanovic/MSMQ-Message-Recovery

References

Disclaimer

Purpose of the code contained in snippets or available for download in this article is solely for learning and demo purposes. Author will not be held responsible for any failure or damages caused due to any other usage.


About the author

DEJAN STOJANOVIC

Dejan is a passionate Software Architect/Developer. He is highly experienced in .NET programming platform including ASP.NET MVC and WebApi. He likes working on new technologies and exciting challenging projects

CONNECT WITH DEJAN  Loginlinkedin Logintwitter Logingoogleplus Logingoogleplus

JavaScript

read more

SQL/T-SQL

read more

Umbraco CMS

read more

PowerShell

read more

Comments for this article