The Ruminant Pattern
This post is inspired by the Gang of Four's brilliant work Design Patterns. Also I need to thank Heath Turnage for suggesting the solution that I now call the Ruminant Pattern.
Intent
Provide a way to resume failed steps of a non-atomic sequence with minimal resources.
Motivation
Often times I find myself needing to execute a sequence of operations that cannot be made atomic, where the operations may fail for various reasons, where the failed operations can be re-attempted, and where successful operations should not be re-run. Consider for instance a series of operations like
{
var orderRequest = GetNextOrderFromQueue();
SendClientReceivedNotification(orderRequest);
var order = CreateOrder(orderRequest);
NotifyVendors(order);
SendClientInProgressNotification(order);
}
Imagine a scenario where each one of those operations involved a separate API call, and that a transaction solution that rolls back past operations is impracticle or impossible. If the CreateOrder step threw an exception because a database was unavailable, you could theoretically recover from the error by re-sending the same message to the queue to start the whole process all over again. But you'd re-send the received notification to the client, and this might be less than ideal. What if the vendor API could not be notified? If you re-send the same message to the queue after the failing vendor API came back online, you'd create a new order and re-send the client a received notification. One way to solve this might be to create as many queues and consumers as there are operations:
Then if an operation failed at any point in the sequence, you could recover from the error by re-sending the message that failed to the queue where it failed. The code may look something like the following spread across several consumer assemblies:
{
var orderRequest = GetNextOrderFromQueue();
SendClientReceivedNotification(orderRequest);
SendOrderRequestToCreateOrderQueue(orderRequest);
}
{
var orderRequest = GetNextOrderFromCreateOrderQueue();
var order = CreateOrder(orderRequest);
SendOrderToNotifyVendorsQueue(order);
}
{
var order = GetNextOrderFromNotifyVendorsQueue();
NotifyVendor(order);
SendOrderToNotifyClientOrderInProgressQueue(order);
}
{
var order = GetNextOrderFromNotifyClientOrderInProgressQueue();
SendClientInProgressNotification(order);
}
Now the number of queues and consumers is equal to the number of atomic operations. You can probably see how this looks a little resource intensive. But what if an order can have multiple vendors like this?
{
var order = GetNextOrderFromNotifyVendorsQueue();
foreach (var service in order.Services)
NotifyVendor(service);
SendOrderToNotifyClientOrderInProgressQueue(order);
}
If one vendor notification API temporarily fails and we attempt to recover by re-queueing the message, we re-notify all previously successful vendor APIs. Could we try to make as many queues as there are vendors? How would we coordinate when they all succeed? This seems impractical.
A better solution might be to create a data structure that can retain the state of the sequence of operations and pass it back to the same queue that started the sequence. Then we only have one queue and one queue-consumer for this sequence of operations, and any temporary failure can be re-attempted by re-sending the message associated with the last failure.
This idea of regurgitating a message back up to the queue from which it originated for the purposes of moving on to the a next stage of processing reminds me of a cow chewing its cud.
Applicability
Use the Ruminant pattern when
- Queues or consumers are limited
- A sequence of operations cannot be made atomic, failed operations can be re-attempted but successful operations should not be re-attempted
Structure
Participants
Producer
Sends a message to the queue.
Queue
Persists messages and releases them to a consumer in the order that they were added.
Consumer
Receives messages from a queue and performs a sequence of operations based on the content of the message.
Collaborations
- Normally a single factory produces strategies to handle the messages consumed from the queue based on the state of the consumed message.
- A strategy handles a message to perform an atomic sequence of operations that can be re-attempted if there is a temporary exception.
Consequences
The Ruminant pattern has the following benefits and liabilities:
- Limits the number of resources necessary to retry parts of a sequence that cannot be made atomic. We only need the only queue and the one queue-consumer.
- Limits the scaffolding code and maintenance of multiple consumers used to complete a sequence of operations. We don't have to duplicate the code necessary to scaffold out multiple queue-consumers.
- It creates bulkier messages designed to describe what stage of the sequence they apply to. The messages now include properties like "ClientReceivedNotificationSent": true.
- It obscures the sequence of operations and transfers that knowledge to something like a strategy factory. A strategy factory may be employed to determine which strategy should operate on a message based on properties like "ClientReceivedNotificationSent": true. The order of operations will be based on message properties and not necessarily listed in sequential order.
Implementation
Here are some useful techniques for implementing the Ruminant pattern.
- Message handling strategy factory. Depending on the state of the message passed to the queue consumer, produce a single, testable strategy to handle the message that matches that state.
- Send the same message that was consumed back to the originating queue after first modifying a property that guarantees the next operation in the sequence. Do this if there is another operation that follows the current atomic-operation-that-could-be-re-attempted-if-their-was-a-failure. For instance if a message was recieived like { "orderRequest": {...} } and the current strategy sent the client a "received" notification, you should re-queue to the originating queue a message that looked like { "orderRequest": {...}, "receivedNotification": true }.
Sample
We'll apply the Ruminant pattern to creating the order discussed earlier.
A consumer that accepts messages from a queue would be scaffolded to receive messages and deserialize them before handing them off to a message handler. The implementation may look something like this. For simplicity, the following example assumes dependencies are injected by a dependency injection framework.
class MessageHandler : IMessageHandler
{
IHandlerFactory _factory;
MessageHandler(IHandlerFactory factory)
{
_factory = factory;
}
void Handle(CreateOrderDto message)
{
var strategy = _factory.Get(message);
strategy.Handle(message);
}
}
class HandlerFactory : IHandlerFactory
{
IServiceProvider _services;
HandlerFactory(IServiceProvider services)
{
_services = services;
}
IMessageHandler Get(CreateOrderDto message)
{
if (message.Services.All(x => x.VendorNotified))
{
return _services.Get(NotifyClientOrderInProgressMessageHandler);
}
else if (message.OrderCreated)
{
return _services.Get(NotifyVendorMessageHandler);
}
else if (message.SentClientReceivedNotification)
{
return _services.Get(CreateOrderMessageHandler);
}
else
{
return _services.Get(SendClientReceivedNotificationMessageHandler);
}
}
}
class SendClientReceivedNotificationMessageHandler : IMessageHandler
{
ISender _sender;
IClientOrderReceivedNotifier _notifier;
MessageHandler(ISender sender, IClientOrderReceivedNotifier notifier)
{
_sender = sender;
_notifier = notifier;
}
void Handle(CreateOrderDto message)
{
_notifier.Notify(message.Client);
message.SentClientReceivedNotification = true;
_sender.Send(message);
}
}
class CreateOrderMessageHandler : IMessageHandler
{
ISender _sender;
ICreateOrderService _service;
MessageHandler(ISender sender, ICreateOrderService service)
{
_sender = sender;
_service = service;
}
void Handle(CreateOrderDto message)
{
_service.Create(message.Order);
message.OrderCreated = true;
_sender.Send(message);
}
}
class NotifyVendorMessageHandler : IMessageHandler
{
ISender _sender;
IVendorNotifier _notifier;
MessageHandler(ISender sender, IVendorNotifier notifier)
{
_sender = sender;
_notifier = notifier;
}
void Handle(CreateOrderDto message)
{
var service = message.Services.First(x => !x.VendorNotified)
_notifier.Notify(service);
service.VendorNotified = true;
_sender.Send(message);
}
}
class NotifyVendorMessageHandler : IMessageHandler
{
ISender _sender;
IVendorNotifier _notifier;
MessageHandler(ISender sender, IVendorNotifier notifier)
{
_sender = sender;
_notifier = notifier;
}
void Handle(CreateOrderDto message)
{
var service = message.Services.First(x => !x.VendorNotified)
_notifier.Notify(service);
service.VendorNotified = true;
_sender.Send(message);
}
}
class NotifyClientOrderInProgressMessageHandler : IMessageHandler
{
IOrderInProgressNotifier _notifier;
MessageHandler(IOrderInProgressNotifier notifier)
{
_notifier = notifier;
}
void Handle(CreateOrderDto message)
{
_notifier.Notify(message.Client);
}
}