|
Building a Better MediatR Publisher With Channels (and why you shouldn't)
Read on: my website / Read time: 6 minutes
|
|
|
|
The .NET Weekly is brought to you by:
|
|
|
I've been meaning to write this article for a while now. This problem has been bugging me, and I finally found the time to address it.
What problem is that?
Well, it's about MediatR's notification publishing mechanism.
MediatR supports simple in-process publish/subscribe capabilities. This lets you broadcast notifications to multiple handlers without coupling them directly to the publisher.
While MediatR's notification system appears asynchronous at first glance, it's not.
By asynchronous, I mean that the publishing thread should not wait for all handlers to complete. Instead, it should return immediately after queuing the notification for processing.
In this article, we'll understand MediatR's notification publishing mechanics. We'll use distributed tracing to examine its execution model, and explore alternatives for true asynchronous processing.
|
|
|
The Notification Publisher
MediatR provides two built-in implementations of its INotificationPublisher interface. They each have distinct characteristics but share one crucial trait: they block the publishing thread until the handlers complete.
Here's the INotificationPublisher interface:
public interface INotificationPublisher
{
Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken);
}
This interface provides the contract for executing notification handlers, but the execution strategy is left to the implementing classes.
By default, MediatR uses the ForeachAwaitPublisher :
public class ForeachAwaitPublisher : INotificationPublisher
{
public async Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken)
{
foreach (var handler in handlerExecutors)
{
await handler.HandlerCallback(notification, cancellationToken).ConfigureAwait(false);
}
}
}
This implementation processes handlers sequentially, ensuring a predictable order of execution.
The alternative TaskWhenAllPublisher offers concurrent execution:
public class TaskWhenAllPublisher : INotificationPublisher
{
public Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken)
{
var tasks = handlerExecutors
.Select(handler => handler.HandlerCallback(notification, cancellationToken))
.ToArray();
return Task.WhenAll(tasks);
}
}
While this publisher executes handlers concurrently, it's crucial to understand that "concurrent" doesn't mean "background processing". The publishing thread still waits for all handlers to complete before continuing.
|
|
|
Proving the Point with OpenTelemetry
To demonstrate the blocking nature of both publishers, let's set up a simple example with OpenTelemetry tracing:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMediatR(cfg =>
{
cfg.RegisterServicesFromAssemblyContaining<Program>();
cfg.NotificationPublisherType = typeof(ForeachAwaitPublisher);
});
builder.Services
.AddOpenTelemetry()
.ConfigureResource(r => r.AddService(DiagnosticConfig.Source.Name))
.WithTracing(tracing =>
tracing
.AddAspNetCoreInstrumentation()
.AddSource(DiagnosticConfig.Source.Name))
.UseOtlpExporter();
var app = builder.Build();
app.MapPost("orders", async (IMediator mediator) =>
{
using var activity = DiagnosticConfig.Source.StartActivity("CreateOrder");
var orderId = Guid.NewGuid();
await mediator.Publish(new OrderCreatedNotification
{
OrderId = orderId,
ParentId = activity?.Id
});
return Results.Ok(orderId);
});
app.Run();
public class OrderCreatedNotification : INotification
{
public Guid OrderId { get; set; }
public string? ParentId { get; set; }
}
public class SlowOrderCreatedHandler(ILogger<SlowOrderCreatedHandler> logger)
: INotificationHandler<OrderCreatedNotification>
{
public async Task Handle(OrderCreatedNotification notification, CancellationToken token)
{
using var activity = DiagnosticConfig.Source.StartActivity(
"SlowOrderCreatedHandler.Handle",
ActivityKind.Internal,
notification.ParentId);
await Task.Delay(2000, token);
logger.LogInformation(
"Slow handler completed for order {OrderId}",
notification.OrderId);
}
}
internal static class DiagnosticConfig
{
internal static readonly ActivitySource Source = new("Order.Service");
}
When we examine the resulting traces, we'll see that the handler execution spans are contained within the HTTP request span, indicating that the request thread is blocked until all handlers complete.
Now, let's see how these publishers behave in practice. I'll add a few more handlers to the mix to make the example more interesting.
ForeachAwaitPublisher Traces
You can see the sequential execution of handlers in the trace visualization. The request span encompasses all handler execution, demonstrating the blocking nature of the ForeachAwaitPublisher .
Limitations of Offset Pagination:
Similarly, the TaskWhenAllPublisher shows concurrent handler execution within the request span. We do get a slight improvement in handler execution time, but the request thread still waits for all handlers to complete before returning.
|
|
|
Building an Async Notification Publisher with Channels
How can we make MediatR's notification publishing truly asynchronous?
We'll implement a custom INotificationPublisher that leverages System.Threading.Channels for true asynchronous processing. This implementation will queue notifications for background processing, allowing the publishing thread to return immediately.
Here's the ChannelPublisher :
public class ChannelPublisher(NotificationsQueue queue) : INotificationPublisher
{
public async Task Publish(
IEnumerable<NotificationHandlerExecutor> handlerExecutors,
INotification notification,
CancellationToken cancellationToken)
{
await queue.Writer.WriteAsync(
new NotificationEntry(handlerExecutors.ToArray(), notification),
cancellationToken);
}
}
public class NotificationsQueue(int capacity = 100)
{
private readonly Channel<NotificationEntry> _queue =
Channel.CreateBounded<NotificationEntry>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
});
public ChannelReader<NotificationEntry> Reader => _queue.Reader;
public ChannelWriter<NotificationEntry> Writer => _queue.Writer;
}
public record NotificationEntry(NotificationHandlerExecutor[] Handlers, INotification Notification);
builder.Services.AddSingleton<NotificationsQueue>();
But this is just part of the solution. We need a background service to process the queued notifications:
public class ChannelPublisherWorker(NotificationsQueue queue) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach (NotificationEntry entry in queue.Reader.ReadAllAsync(stoppingToken))
{
await Parallel.ForEachAsync(entry.Handlers, stoppingToken, async (executor, token) =>
{
await executor.HandlerCallback(entry.Notification, token);
});
}
}
}
builser.Services.AddHostedService<ChannelPublisherWorker>();
This implementation offers several advantages:
- True background processing - the publisher returns immediately after queueing the notification
- Backpressure handling through bounded channel capacity
- Independent handler execution
To use this publisher, register it with MediatR by setting the NotificationPublisherType to be ChannelPublisher :
services.AddMediatR(cfg =>
{
cfg.NotificationPublisherType = typeof(ChannelPublisher);
});
Let's see how this implementation performs in practice.
|
|
|
Comparing Approaches With OpenTelemetry
When we examine the traces with our ChannelPublisher , we'll see a significant difference:
- The HTTP request span completes quickly after queueing the notification
- Handler execution spans appear as separate traces
- Overall system responsiveness improves
This visualization clearly demonstrates the non-blocking nature of our implementation.
But is it worth it?
Here's what you should consider first before adopting this approach:
- The
ChannelPublisher introduces additional complexity compared to the built-in publishers
- Error handling is your responsibility (e.g., retrying failed handlers, dead-letter queue)
- And did I mention idempotent consumers? Yeah... you need those too
- Channels aren't durable - messages are lost if the application crashes
Before you know it, you might find yourself reinventing the wheel with a custom message queueing system.
Instead, consider using a real message broker like RabbitMQ. Combine it with a library like MassTransit or NServiceBus for a robust, scalable, and reliable messaging solution.
|
|
|
Summary
MediatR's notification system is great for simple in-process pub/sub scenarios. However, the built-in publishers can become a bottleneck in high-throughput applications due to their blocking nature.
The ChannelPublisher implementation we explored offers true asynchronous processing. However, it also comes with extra complexity around message handling and delivery guarantees. Managing message persistence, error handling, retries, and idempotency quickly becomes challenging.
If your application requires these features, you'll be better off adopting a mature solution like RabbitMQ or Amazon SQS.
That's all for today. Hope this was helpful.
|
|
|
Whenever you're ready, there are 3 ways I can help you:
|
|
Pragmatic REST APIs: You will learn how to build production-ready REST APIs using the latest ASP .NET Core features and best practices. Join the waitlist to get a special launch discount. Coming out Monday - 24.02.2025! |
|
|
Pragmatic Clean Architecture: This comprehensive course will teach you the system I use to ship production-ready applications using Clean Architecture. Learn how to apply the best practices of modern software architecture. Join 3,900+ engineers |
|
|
|
You received this email because you subscribed to our list. You can unsubscribe at any time.
Update your profile | Dragiše Cvetkovića 2, Niš, - 18000
|
|
|
|
|