One problem with asynchronous processing is what to do when the operation fails? When using synchronous patterns like a direct call we usually return an error code or a message or throw an exception. When we use queues to delegate the execution to another process we can't notify the originator directly. One thing we can do is to send a message back to the originator through another queue, like a callback. This is interesting when the process produce a result normally, an error in this case is only another kind of result.
Now, let see how we can implement an error queue using Windows Azure Storage Queue.
Implementation of an error queue
First we will initialize the queues. For each queue we also create an '<queuename>-error' queue.
var storageAccount = CloudStorageAccount.Parse("UseDevelopmentStorage=true"); var queueClient = storageAccount.CreateCloudQueueClient(); this.taskQueueReference = queueClient.GetQueueReference("task"); this.taskErrorQueueReference = queueClient.GetQueueReference("task-error"); this.taskQueueReference.CreateIfNotExists(); this.taskErrorQueueReference.CreateIfNotExists();
Next we add a few messages with one that will cause the processing to fail (to simulate failures)
this.taskQueueReference.AddMessage( new CloudQueueMessage("Message " + DateTime.UtcNow.Ticks)); this.taskQueueReference.AddMessage( new CloudQueueMessage("Message " + DateTime.UtcNow.Ticks)); this.taskQueueReference.AddMessage( new CloudQueueMessage("Error " + DateTime.UtcNow.Ticks));
Finally the code to actually poll the queue for messages. Usually polling is done in an infinite loop but when no message is fetched it is a good practices to wait a while before polling again to prevent unnecessary transaction cost and IO (each call to GetMessages is 1 transaction). Depending on the need for the queue to react rapidly to new messages this may go between a few seconds for critical tasks to a few minutes for non critical tasks. Also I'm using a retry mechanism here, meaning that I'll try to process a message a few times before I really consider it in error (poison). If we don't delete a message after fetching it then after some time it goes back in the queue to be processed again. This mean all tasks we want to process using queues should be idempotent
private void PollQueue() { IEnumerable<CloudQueueMessage> messages; do { messages = this.taskQueueReference .GetMessages(8, visibilityTimeout: TimeSpan.FromSeconds(10)); foreach (var message in messages) { bool result = false; try { result = this.ProcessMessage(message); if (result) this.taskQueueReference.DeleteMessage(message); } catch (Exception ex) { this.Log(message.AsString, ex); } if (!result && message.DequeueCount >= 3) { this.taskErrorQueueReference.AddMessage(message); this.taskQueueReference.DeleteMessage(message); } } } while (messages.Any()); } private bool ProcessMessage(CloudQueueMessage message) { if (message.AsString.StartsWith("Error")) throw new Exception("Error!"); return true; }
First I'm fetching messages by batch of 8 in this case. In one transaction you can fetch between 1 and 32 messages. Also I set the visibilityTimeout to 10 seconds. This means the messages won't be visible to anyone during that time. Usually you want to set the visibility timeout based on how much time should be required to process all the messages of the batch. If we don't have the time to delete the messages from the queue before the timeout elapse another worker could fetch the message and start processing it again. So we should balance the time to process all the messages in one batch with how much time we want to allow between retries.
Next we process the message. If the processing is successful we simply return true so the message can be deleted from the queue. If processing failed we have two options, return false or throw an exception. I simply return false instead of throwing an exception most of the time when the failure is expected.
Finally, we check how many times we unsuccessfully tried to process the message and if we reached our limit (in this case 3 times). If we did then it's time to send that message to the error queue and delete it from the normal queue.
Next time we will look at how we want to handle the messages in the error queue. You can find this post here.