|
Scaling the Outbox Pattern
Read on: my website / Read time: 14 minutes
|
|
|
|
The .NET Weekly is brought to you by:
Postman Virtual Developer Meetup
Join Dan Von Nostrand (Integration Support Manager @ Avenu) for his meetup. He'll demonstrate using Postman Partner Workspaces, writing automated tests for UAT, automatically generating documentation, and expediting client development through organized collections.
|
Stop coding common elements!
Syncfusion offers a developer's dream - over 1,800 prebuilt UI components for web, mobile, and desktop platforms. Focus on your app's unique features, not repetitive UI tasks. A free community license is also available for individual developers and startups.
|
|
|
|
In last week's newsletter, I talked about implementing the Outbox pattern. It's a crucial tool for reliable distributed messaging. But implementing it is just the first step.
The real challenge? Scaling it to handle massive message volumes.
Today, we're taking it up a notch. We'll start with a basic Outbox processor and transform it into a high-performance engine capable of handling over 2 billion messages daily.
Let's dive in!
|
|
|
Starting Point
This is our starting point. We have an OutboxProcessor that polls for unprocessed messages and publishes them to a queue. The first few things we can tweak are the frequency and batch size.
Let's assume that we run the OutboxProcessor continuously. I increased that batch size to 1000 .
How many messages are we able to process?
I'll run the Outbox processing for 1 minute and count how many messages were processed.
The baseline implementation processed 81,000 messages in one minute or 1,350 MPS (messages per second).
Not bad, but let's see how much we can improve this.
|
|
|
Measuring Each Step
You can't improve what you can't measure. Right? So, I'll use a Stopwatch to measure the total execution time and the time each step takes.
Notice that I also split the publish and update steps. It's so I can measure the time for publishing and updating separately. This will be important later because I want to optimize each step separately.
With the baseline implementation, here are the execution times for each step:
- Query time: ~70ms
- Publish time: ~320ms
- Update time: ~300ms
Now, onto the fun part!
|
|
|
Optimizing Read Queries
The first thing I want to optimize is the query for fetching unprocessed messages. Performing a SELECT * query will have an impact if we don't need all the columns (hint: we don't).
Here's the current SQL query:
We can modify the query to return only the columns we need. This will save us some bandwidth but will not significantly improve performance.
Let's examine the execution plan for this query. You'll see it's performing a table scan. I'm running this on PostgreSQL, and here's what I get from EXPLAIN ANALYZE :
Now, I'll create an index that "covers" the query for fetching unprocessed messages. A covered index contains all the columns needed to satisfy a query without accessing the table itself.
The index will be on the occurred_on_utc and processed_on_utc columns. It will include the id , type , and content columns. Lastly, we'll apply a filter to index unprocessed messages only.
Let me explain the reasoning behind each decision:
- Indexing the
occurred_on_utc will store the entries in the index in ascending order. This matches the ORDER BY occurred_on_utc statement in the query. This means the query can scan the index without sorting the results. The results are already in the correct sort order.
- Including the columns we select in the index allows us to return them from the index entry. This avoids reading the values from the table rows.
- Filtering for unprocessed messages in the index satisfies the
WHERE processed_on_utc IS NULL statement.
Caveat: PostgreSQL has a maximum index row size of 2712B (don't ask how I know). The columns in the INCLUDE list are also part of the index row (B-tree tuple). The content column contains the serialized JSON message, so it's the most likely culprit to make us exceed this limit. There's no way around it, so my advice is to keep your messages as small as possible. You could exclude this column from the INCLUDE list for a minor performance hit.
Here's the updated execution plan after creating this index:
Because we have a covered index, the execution plan only contains an Index Only Scan and Limit operation. There's no filtering or sorting that needs to happen, which is why we see a massive performance improvement.
What's the performance impact on the query time?
- Query time: 70ms → 1ms (-98.5%)
|
|
|
Optimizing Message Publishing
The next thing we can optimize is how we're publishing messages to the queue. I'm using the IPublishEndpoint from MassTransit to publish to RabbitMQ.
To be more precise here, we're publishing to an exchange. The exchange will then route the message to the appropriate queue.
But how can we optimize this?
A micro-optimization we can do is introduce a cache for the message types used in serialization. Performing reflection constantly for every message type is expensive, so we'll do the reflection once and store the result.
The cache can be a ConcurrentDictionary , and we'll use GetOrAdd to retrieve the cached types.
I'll extract this piece of code to the GetOrAddMessageType helper method:
This is what our message publishing step looks like. The biggest problem is we're waiting for the Publish to complete by awaiting it. The Publish takes some time because it's waiting for confirmation from the message broker. We're doing this in a loop, which makes it even less efficient.
We can improve this by publishing the messages in a batch. In fact, the IPublishEndpoint has a PublishBatch extension method. If we peek inside, here's what we'll find:
So we can transform the collection of messages into a list of publishing tasks that we can await using Task.WhenAll .
What's the improvement for the message publishing step?
- Publish time: 320ms → 289ms (-9.8%)
As you can see, it's not significantly faster. But this is needed for us to benefit from other optimizations I have in store.
|
|
|
Optimizing Update Queries
The next step in our optimization journey is addressing the query updating the processed Outbox messages.
The current implementation is inefficient because we send one query to the database for each Outbox message.
If you didn't get the memo by now, batching is the name of the game. We want a way to send one large UPDATE query to the database.
We have to construct the SQL for this batch query manually. We'll use the DynamicParameters type from Dapper to provide all the parameters.
This will produce a SQL query that looks something like this:
Instead of sending one update query per message, we can send one query to update all messages.
This will obviously give us a noticeable performance benefit:
- Update time: 300ms → 52ms (-82.6%)
|
|
|
How Far Did We Get?
Let's test out the performance improvement with the current optimizations. The changes we made so far focus on improving the speed of the OutboxProcessor .
Here are the rough numbers I'm seeing for the individual steps:
- Query time: ~1ms
- Publish time: ~289ms
- Update time: ~52ms
I'll run the Outbox processing for 1 minute and count the number of processed messages.
The optimized implementation processed 162,000 messages in one minute or 2,700 MPS.
For reference, this allows us to process more than 230 million messages per day.
But we're just getting started.
|
|
|
Parallel Outbox Processing
If we want to take this further, we have to scale out the OutboxProcessor . The problem we could face here is processing the same message more than once. So, we need to implement some form of locking on the current batch of messages.
PostgreSQL has a convenient FOR UPDATE statement that we can use here. It will lock the selected rows for the duration of the current transaction. However, we must add the SKIP LOCKED statement to allow other queries to skip the locked rows. Otherwise, any other query will be blocked until the current transaction is completed.
Here's the updated query:
To scale out the OutboxProcessor , we simply run multiple instances of the background job.
I'll simulate this using Parallel.ForEachAsync , where I can control the MaxDegreeOfParallelism .
We can process 179,000 messages in one minute or 2,983 MPS with five (5) workers.
I thought this was supposed to be much faster. What gives?
Without parallel processing, we were able to get ~2,700 MPS.
A new bottleneck appears: publishing the messages in batches.
The publish time went from ~289ms to ~1,540ms.
Interestingly, if you multiply the base publish time (for one worker) by the number of workers, you roughly get to the new publish time.
We're wasting a lot of time waiting for the acknowledgment from the message broker.
How can we fix this?
|
|
|
Batching Message Publishing
RabbitMQ supports publishing messages in batches. We can enable this feature when configuring MassTransit by calling the ConfigureBatchPublish method. MassTransit will buffer messages before sending them to RabbitMQ, to increase throughput.
With only this small change, let's rerun our test with five workers.
This time around, we're able to process 1,956,000 messages in one minute.
Which gives us a blazing ~32,500 MPS.
This is more than 2.8 billion processed messages per day.
I could call it a day here, but there's one more thing I want to show you.
|
|
|
Turning Off Publisher Confirmation (Dangerous)
One more thing you can do (which I don't recommend) is turn off publisher confirmation. This means that calling Publish won't wait until the message is confirmed by the broker (ack'd). It could lead to reliability issues and potentially losing messages.
That being said, I did manage to get ~37,000 MPS with publisher confirmation turned off.
|
|
|
Key Considerations for Scaling
While we've achieved impressive throughput, consider these factors when implementing these techniques in a real-world system:
- Consumer Capacity: Can your consumers keep up? Boosting producer throughput without matching consumer capacity can create backlogs. Consider the entire pipeline when scaling.
- Delivery Guarantees: Our optimizations maintain at-least-once delivery. Design consumers to be idempotent to handle occasional duplicate messages.
- Message Ordering: Parallel processing with
FOR UPDATE SKIP LOCKED may cause out-of-order messages. For strict ordering, consider the Inbox pattern on the consumer side to buffer messages. An Inbox allows us to process messages in the correct order, even if they arrive out of sequence.
- Reliability vs. Performance Trade-offs: Turning off publisher confirmation increases speed but risks message loss. Weigh performance against reliability based on your specific needs.
By addressing these factors, you'll create a high-performance Outbox processor that integrates smoothly with your system architecture.
|
|
|
Summary
We've come a long way from our initial Outbox processor. Here's what we accomplished:
- Optimized database queries with smart indexing
- Improved message publishing with batching
- Streamlined database updates with batching
- Scaled out Outbox processing with parallel workers
- Leveraged RabbitMQ's batch publishing feature
The result? We boosted processing from 1,350 messages per second to an impressive 32,500 MPS. That's over 2.8 billion messages per day!
Scaling isn't just about raw speed - it's about identifying and addressing bottlenecks at each step. By measuring, optimizing, and rethinking our approach, we achieved massive performance gains.
That's all for today. Hope this was helpful.
P.S. You can find the source code here.
|
|
|
Whenever you're ready, there are 3 ways I can help you:
|
|
Pragmatic Clean Architecture: This comprehensive course will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture. Join 3,100+ engineers |
(COMING SOON) RESTful APIs in ASP.NET Core: You will learn how to build production-ready RESTful APIs using the latest ASP.NET Core features and best practices. It includes a fully functional UI application that we'll integrate with the REST API. Join the waitlist
|
|
|
You received this email because you subscribed to our list. You can unsubscribe at any time.
Update your profile | Dragiše Cvetkovića 2, Niš, - 18000
|
|
|
|
|