Aggregator
Aggregator is the pattern for “don’t handle each message immediately — wait until I have enough of them, then process the group.” Telemetry slices that should be rolled up every 10 seconds. Line items that should be billed in one statement per order. A reconciliation pass that only makes sense once the day’s feeds are all in.
It is the collecting counterpart to Scatter-Gather: scatter-gather collects on the sender side (I sent, I wait for replies); aggregator collects on the receiver side (I consume, I buffer, I flush).
When to use it
Section titled “When to use it”- Handling each message one at a time is the wrong unit of work — batching is what’s natural.
- You have a clear flush condition: either a size threshold, a time window, or both.
- Holding messages briefly before processing is acceptable. Aggregators trade latency for batching.
If each message is independently meaningful and complete, don’t aggregate. If the flush condition is “when every expected message has arrived,” the discrete-correlation shape of a Process Manager fits better than a rolling window.
The contract
Section titled “The contract”One message type feeds the aggregator:
using ServiceConnect.Interfaces;
public sealed class TelemetrySlice(Guid correlationId) : Message(correlationId){ public string Source { get; init; } = string.Empty; public int Value { get; init; }}Messages in a batch don’t need a shared correlation id — each message carries its own. The aggregator groups by arrival, not by correlation.
The aggregator
Section titled “The aggregator”An aggregator derives from Aggregator<T> and overrides three methods: when to flush by size, when to flush by time, and what to do with the batch:
public sealed class TelemetrySliceAggregator : Aggregator<TelemetrySlice>{ public override int BatchSize() => 100; public override TimeSpan Timeout() => TimeSpan.FromSeconds(10);
public override Task ExecuteAsync(IReadOnlyList<TelemetrySlice> messages, CancellationToken cancellationToken = default) { var total = messages.Sum(m => m.Value); Console.WriteLine($"rolled up {messages.Count} slices, total {total}"); return Task.CompletedTask; }}Three things to notice:
ExecuteAsyncis async-first. It runs on a background flush and returns a batch outcome, not a handler result. The suppliedCancellationTokenflows through from the dispatcher — honour it for long-running I/O. Await async work directly; no sync-over-async bridges are needed.- Both flush paths are mandatory.
BatchSize()must return a positive int;Timeout()must return a positiveTimeSpan(neitherTimeSpan.ZeronorTimeout.InfiniteTimeSpan). The registry rejects subclasses that violate either at startup withInvalidOperationException— the framework needs both paths to guarantee buffered messages always have a route to dispatch. The batch flushes on whichever trigger fires first. - There is no correlation key. One aggregator instance buffers every message of its type that arrives on its queue. If you need per-group batching (“aggregate per order”), use a Process Manager keyed on the group id; aggregator is the right shape only when one big pool is what you want.
Generic subclasses are not supported
Section titled “Generic subclasses are not supported”Generic aggregator subclasses — class MyAggregator<T> : Aggregator<MyMessage> — are rejected at registry construction with InvalidOperationException. A generic subclass’s FullName embeds the assembly-qualified name of its type argument (including version and public-key token), which defeats the version-stable name derivation the persistence layer depends on. Use a concrete (non-generic) class for every aggregator.
Registration and persistence
Section titled “Registration and persistence”An aggregator is registered like a handler — it consumes a message type from a queue — but it is bound to its base class, not to IMessageHandler<T>:
services.AddTransient<Aggregator<TelemetrySlice>, TelemetrySliceAggregator>();
services.AddSingleton<IReadOnlyList<HandlerReference>>(new List<HandlerReference>{ new() { HandlerType = typeof(TelemetrySliceAggregator), MessageType = typeof(TelemetrySlice) },});
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(t => { t.Host = "localhost"; }); builder.ConfigureQueues(q => q.QueueName = "telemetry-rollup"); builder.UseMongoDbPersistence(options => { options.ConnectionString = "mongodb://localhost:27017"; options.DatabaseName = "telemetry"; });});The MongoDB persistence provider is what makes aggregators safe across restarts. Each buffered message is inserted into a collection keyed by the aggregator name; on restart, the buffer is restored. Flushing removes the batch. If you omit the persistence configuration, the bus has no IAggregatorPersistor; each arriving aggregatable message logs a Warning ("IAggregatorPersistor not registered. Cannot aggregate {MessageType}") and is silently discarded rather than buffered. Wire up persistence — or remove the aggregator registration — before deploying.
Don’t keep state in private fields of the aggregator. The instance is recreated per dispatch — the buffer lives in persistence, not in memory.
Flush semantics
Section titled “Flush semantics”Once a flush condition is met, the aggregator:
- Loads the current batch from the persistor.
- Deserialises it back into
Tinstances. - Calls
ExecuteAsync(batch, cancellationToken). - Removes the flushed records from the persistor.
If ExecuteAsync throws, the batch is not removed — it flushes again on the next trigger. This is the expected shape: ExecuteAsync should be idempotent where possible (write to a system that upserts, or tag the flush with a batch id you can deduplicate on).
A batch that arrives out-of-order is still one batch. The aggregator does not sort; if ordering within the batch matters, sort inside ExecuteAsync.
When not to aggregate
Section titled “When not to aggregate”Two anti-patterns worth naming:
- Accumulating indefinitely. If there’s no flush condition — no size, no timeout — messages pile up. Always set at least one flush trigger.
- Using the aggregator as a poor-man’s queue. If your real need is “process these in the background,” a normal queue with a competing-consumer worker is simpler and clearer. Reach for aggregator when the batch is the unit of work, not when you want deferred processing.
Reference
Section titled “Reference”Aggregator<T>— aggregator base class and snapshotIAggregatorPersistor— extension point for custom persistence
What comes next
Section titled “What comes next”- Scatter-Gather — the sender-side counterpart.
- Process Manager — when the batch is keyed by a correlation id rather than arrival.
- Competing Consumers — when you want parallelism, not batching.