Monday, 22 January 2018

Queue based load levelling using Azure Functions

On a current project I've been looking to employ a cloud design pattern known as queue based load levelling, and to implement it using Azure storage components and functions.

The Queue based load levelling pattern

The pattern is useful in a range of situations where there's a need for timely and reliable integration between different software systems. In short, the pattern utilises a queue service as a buffer for messages from one system to the other, allowing them to be passed to the destination system for processing in a controlled manner, and at a rate that won’t overwhelm available resources.

It can be adopted where one software system must send messages to another but for various reasons we want to avoid a direct connection between the two. One good reason we might want to do this is simply to reduce coupling – the two systems will need to agree on a message format, but ideally we don’t want to tie them to each other’s implementation details any further than that.

We may also have a situation where the messages come in a variable or bursting pattern – perhaps few or none for a period of time and then a lot may arrive in one go. If processing the messages at the destination is a relatively expensive operation, there’s a danger of overwhelming it, leading to timeouts and lost messages. By introducing a queue, we decouple the source system from the destination – the source posts messages to the queue that are accepted at whatever speed they arrive. The destination system can then be fed messages at a controlled and consistent rate; one that allows messages to be reliably processed.

The specific scenario to support is a series of messages that come from a client's internal system in XML format. The details contained within them need to be applied to a Sitecore CMS instance in order to update various content items.

Implementing with Azure functions and storage components

We've implemented this initially using two Azure functions, queue, and table storage as illustrated in the following diagram.




The first function project – the "message receiver" – contains an HTTP triggered function that responds to an incoming HTTP POST request accepting an XML message. It performs some validation on it and add, if passing, adds it to the queue. A record is written to the log table in table storage. It will also accept a GET request via a second function to return the status of the message.

The second project – the "message processor" – contains a function set up on a queue trigger, firing off as new messages are detected on the queue. It will be responsible for taking the validated message and passing it to the destination system for processing (in our case by posting it to a Sitecore API end-point).

This was working nicely in initial testing, but we started to find some edge cases where duplicate content was getting created in Sitecore. We narrowed this down to race conditions - a check would be made for a node at a particular path, and if it wasn't there it would be created. But in some cases the processing of another message would get in there first, and we'd end up with two nodes of the same name.

Controlling the speed of queue processing

This I thought should have been covered via some settings available on the queue triggered function to manage how many messages are dequeued at a time and how many instances the function app will scale out to. But seems I may have got confused with what's available for web jobs. In the end I came across this Github issue that indicates that, at least as of the time of writing, supporting singleton (or "process one queue message at a time") isn't supported directly on the functions platform.

So we needed an alternative - some way of having a function first check to see if other messages are already being processed. I read the Azure Web Jobs platform makes use of blob storage leases for this purpose, so tackling the problem in a similar way seemed sensible.

The solution we used in the end was to have the function triggered by the queue message to first try to acquire a lease on a particular blob. If it could get one, the message would be processed and, just before the function terminates as part of a try/finally block, the lease was released. If a second message was processed whilst the first was running, it won't be able to acquire the lease and so instead exits the function just after putting the message back on the queue. Note that it's important here to explicitly put the message back on the on the queue rather than just throw an exception. Doing this will leave the message back on the queue but with the dequeue count incremented, and when this reaches a configured level the platform will deem that message can't be processed and migrate it to a "poison" queue.



Setting these types of global flags can be risky as if an unexpected exception occurs there's a danger that the flag doesn't get reset, and if that were the case, no messages would get processed at all. Fortunately though blob leases when initially acquired can be set to expire after a period of time, so even if not explicitly released there's no risk of the system getting blocked in this way.

We coupled this solution with a second step which was to add messages to the queue initially with a short, random delay in the time that the message will be visible for processing. That way, even if we get a load of messages at once, their presence on the queue and the time at which they are processed (or put back for processing) will be staggered.

I've pulled out a simplified version of our implementation of this pattern in this Github repository.

No comments:

Post a Comment