Skip to content

Aggregator

Aggregator is the pattern for “don’t handle each message immediately — wait until I have enough of them, then process the group.” Telemetry slices that should be rolled up every 10 seconds. Line items that should be billed in one statement per order. A reconciliation pass that only makes sense once the day’s feeds are all in.

It is the collecting counterpart to Scatter-Gather: scatter-gather collects on the sender side (I sent, I wait for replies); aggregator collects on the receiver side (I consume, I buffer, I flush).

  • Handling each message one at a time is the wrong unit of work — batching is what’s natural.
  • You have a clear flush condition: either a size threshold, a time window, or both.
  • Holding messages briefly before processing is acceptable. Aggregators trade latency for batching.

If each message is independently meaningful and complete, don’t aggregate. If the flush condition is “when every expected message has arrived,” the discrete-correlation shape of a Process Manager fits better than a rolling window.

One message type feeds the aggregator:

Contracts/TelemetrySlice.cs
using ServiceConnect.Interfaces;
public sealed class TelemetrySlice(Guid correlationId) : Message(correlationId)
{
public string Source { get; init; } = string.Empty;
public int Value { get; init; }
}

Messages in a batch don’t need a shared correlation id — each message carries its own. The aggregator groups by arrival, not by correlation.

An aggregator derives from Aggregator<T> and overrides three methods: when to flush by size, when to flush by time, and what to do with the batch:

Consumer/TelemetrySliceAggregator.cs
public sealed class TelemetrySliceAggregator : Aggregator<TelemetrySlice>
{
public override int BatchSize() => 100;
public override TimeSpan Timeout() => TimeSpan.FromSeconds(10);
public override Task ExecuteAsync(IReadOnlyList<TelemetrySlice> messages, CancellationToken cancellationToken = default)
{
var total = messages.Sum(m => m.Value);
Console.WriteLine($"rolled up {messages.Count} slices, total {total}");
return Task.CompletedTask;
}
}

Three things to notice:

  • ExecuteAsync is async-first. It runs on a background flush and returns a batch outcome, not a handler result. The supplied CancellationToken flows through from the dispatcher — honour it for long-running I/O. Await async work directly; no sync-over-async bridges are needed.
  • Both flush paths are mandatory. BatchSize() must return a positive int; Timeout() must return a positive TimeSpan (neither TimeSpan.Zero nor Timeout.InfiniteTimeSpan). The registry rejects subclasses that violate either at startup with InvalidOperationException — the framework needs both paths to guarantee buffered messages always have a route to dispatch. The batch flushes on whichever trigger fires first.
  • There is no correlation key. One aggregator instance buffers every message of its type that arrives on its queue. If you need per-group batching (“aggregate per order”), use a Process Manager keyed on the group id; aggregator is the right shape only when one big pool is what you want.

Generic aggregator subclasses — class MyAggregator<T> : Aggregator<MyMessage> — are rejected at registry construction with InvalidOperationException. A generic subclass’s FullName embeds the assembly-qualified name of its type argument (including version and public-key token), which defeats the version-stable name derivation the persistence layer depends on. Use a concrete (non-generic) class for every aggregator.

An aggregator is registered like a handler — it consumes a message type from a queue — but it is bound to its base class, not to IMessageHandler<T>:

Consumer/Program.cs
services.AddTransient<Aggregator<TelemetrySlice>, TelemetrySliceAggregator>();
services.AddSingleton<IReadOnlyList<HandlerReference>>(new List<HandlerReference>
{
new() { HandlerType = typeof(TelemetrySliceAggregator), MessageType = typeof(TelemetrySlice) },
});
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => { t.Host = "localhost"; });
builder.ConfigureQueues(q => q.QueueName = "telemetry-rollup");
builder.UseMongoDbPersistence(options =>
{
options.ConnectionString = "mongodb://localhost:27017";
options.DatabaseName = "telemetry";
});
});

The MongoDB persistence provider is what makes aggregators safe across restarts. Each buffered message is inserted into a collection keyed by the aggregator name; on restart, the buffer is restored. Flushing removes the batch. If you omit the persistence configuration, the bus has no IAggregatorPersistor; each arriving aggregatable message logs a Warning ("IAggregatorPersistor not registered. Cannot aggregate {MessageType}") and is silently discarded rather than buffered. Wire up persistence — or remove the aggregator registration — before deploying.

Don’t keep state in private fields of the aggregator. The instance is recreated per dispatch — the buffer lives in persistence, not in memory.

Once a flush condition is met, the aggregator:

  1. Loads the current batch from the persistor.
  2. Deserialises it back into T instances.
  3. Calls ExecuteAsync(batch, cancellationToken).
  4. Removes the flushed records from the persistor.

If ExecuteAsync throws, the batch is not removed — it flushes again on the next trigger. This is the expected shape: ExecuteAsync should be idempotent where possible (write to a system that upserts, or tag the flush with a batch id you can deduplicate on).

A batch that arrives out-of-order is still one batch. The aggregator does not sort; if ordering within the batch matters, sort inside ExecuteAsync.

Two anti-patterns worth naming:

  • Accumulating indefinitely. If there’s no flush condition — no size, no timeout — messages pile up. Always set at least one flush trigger.
  • Using the aggregator as a poor-man’s queue. If your real need is “process these in the background,” a normal queue with a competing-consumer worker is simpler and clearer. Reach for aggregator when the batch is the unit of work, not when you want deferred processing.