Showing posts with label Queue Storage Service. Show all posts
Showing posts with label Queue Storage Service. Show all posts

Monday, July 22, 2013

Handling Azure Storage Queue poison messages

This post will talk about what to do now that we are handling poison messages in our Azure Storage Queues.

First, let's review what we'll done so far.


Messages that continuously fail to process will end up in the Error Queue. Someone asked me why do we need error queues at all? We could simply log the errors and delete the message right? Well, if you have a really efficient and pro-active DevOps team I suppose logging errors along with the original messages ought to be enough.
Someone will review why the message failed and if it was only a transient error then he could send the original message again in the queue.

We could also store failed messages into an Azure Storage Table.
Then we could simply monitor new entries in this table and act on it. Again, the original message should be stored in the table so we could send it again if we choose.

I think the best reason to use a queue for error message is if you want to have an administrative tools to monitor, review and re-send messages. In this case the queue mechanics let's you handle those messages like any other process using queues do.

For me one of the unpleasant side effect of using error queues is that all the queues in my system are now multiplied by two (one normal and one error queue). It's not too bad if your naming scheme is consistent but even then if you do operational work using a tool like Cerebrata Azure Management Studio or even from Visual Studio's Server Explorer you will feel overwhelmed by the quantity of queues.

Managing queue messages with Cerebrata Azure Management Studio
Managing queue messages with Visual Studio Server Explorer

Whatever you do, I suggest you always at least log failures properly.  Later while debugging the issue you will be thankful to easily match the failure logs with the message who caused it.

Friday, June 21, 2013

Windows Azure Storage Queue with error queues

Windows Azure Storage Queue are an excellent way to execute tasks asynchronously in Azure. For example a web application could let all the heavy processing to a Worker Role instead of doing it itself. That way requests will complete faster. Queues can also be used to decouple communication between two separated applications or two components of the same application.

One problem with asynchronous processing is what to do when the operation fails? When using synchronous patterns like a direct call we usually return an error code or a message or throw an exception. When we use queues to delegate the execution to another process we can't notify the originator directly. One thing we can do is to send a message back to the originator through another queue, like a callback. This is interesting when the process produce a result normally, an error in this case is only another kind of result.


Another way is to create an error queue (or dead letter queue) for poison messages where we put all messages that failed processing. This way we get a list of all the failing messages to review, find out what was the problem with them and figure out what to do about it. For example we can retry the message by moving it back to the main queue so it can be processed again.


Now, let see how we can implement an error queue using Windows Azure Storage Queue.

Implementation of an error queue


First we will initialize the queues. For each queue we also create an '<queuename>-error' queue.
var storageAccount = CloudStorageAccount.Parse("UseDevelopmentStorage=true");
var queueClient = storageAccount.CreateCloudQueueClient();

this.taskQueueReference = queueClient.GetQueueReference("task");
this.taskErrorQueueReference = queueClient.GetQueueReference("task-error");
 
this.taskQueueReference.CreateIfNotExists();
this.taskErrorQueueReference.CreateIfNotExists();

Next we add a few messages with one that will cause the processing to fail (to simulate failures)
this.taskQueueReference.AddMessage(
    new CloudQueueMessage("Message " + DateTime.UtcNow.Ticks));

this.taskQueueReference.AddMessage(
    new CloudQueueMessage("Message " + DateTime.UtcNow.Ticks));

this.taskQueueReference.AddMessage(
    new CloudQueueMessage("Error " + DateTime.UtcNow.Ticks));

Finally the code to actually poll the queue for messages. Usually polling is done in an infinite loop but when no message is fetched it is a good practices to wait a while before polling again to prevent unnecessary transaction cost and IO (each call to GetMessages is 1 transaction). Depending on the need for the queue to react rapidly to new messages this may go between a few seconds for critical tasks to a few minutes for non critical tasks. Also I'm using a retry mechanism here, meaning that I'll try to process a message a few times before I really consider it in error (poison). If we don't delete a message after fetching it then after some time it goes back in the queue to be processed again. This mean all tasks we want to process using queues should be idempotent
private void PollQueue()
{
 IEnumerable<CloudQueueMessage> messages;
 
 do
 {
  messages = this.taskQueueReference
   .GetMessages(8, visibilityTimeout: TimeSpan.FromSeconds(10));

  foreach (var message in messages)
  {
   bool result = false;
   try
   {         
    result = this.ProcessMessage(message);
    
    if (result) this.taskQueueReference.DeleteMessage(message);
   }
   catch (Exception ex)
   {
    this.Log(message.AsString, ex);
   }
   
   if (!result && message.DequeueCount >= 3)
   {
    this.taskErrorQueueReference.AddMessage(message);
    this.taskQueueReference.DeleteMessage(message);
   }
  }
 } while (messages.Any());
}

private bool ProcessMessage(CloudQueueMessage message)
{
 if (message.AsString.StartsWith("Error")) throw new Exception("Error!");
 
 return true;
}

First I'm fetching messages by batch of 8 in this case. In one transaction you can fetch between 1 and 32 messages. Also I set the visibilityTimeout to 10 seconds. This means the messages won't be visible to anyone during that time. Usually you want to set the visibility timeout based on how much time should be required to process all the messages of the batch. If we don't have the time to delete the messages from the queue before the timeout elapse another worker could fetch the message and start processing it again. So we should balance the time to process all the messages in one batch with how much time we want to allow between retries.

Next we process the message. If the processing is successful we simply return true so the message can be deleted from the queue. If processing failed we have two options, return false or throw an exception. I simply return false instead of throwing an exception most of the time when the failure is expected.

Finally, we check how many times we unsuccessfully tried to process the message and if we reached our limit (in this case 3 times). If we did then it's time to send that message to the error queue and delete it from the normal queue.

Next time we will look at how we want to handle the messages in the error queue. You can find this post here.