Skip to content

Aggregator<T>

Aggregator<T> is the base class for a consumer that buffers messages of a single type and processes them as a batch once a flush condition (size, time, or both) is met. AggregatorSnapshot is the record the persistence layer returns when it materialises the buffered records back into messages plus a count of records it could not deserialise. Reach for an aggregator when the natural unit of work is the batch — telemetry rollups, periodic reconciliations, the Scatter-Gather fan-in — rather than the individual message.

See Aggregator for the conceptual tour.

public abstract class Aggregator<T> where T : Message

Aggregator<T> is an abstract class: the framework resolves one per dispatch, invokes the configured flush triggers, and calls ExecuteAsync with the buffered batch.

public abstract TimeSpan Timeout()

Gets the maximum amount of time to wait before dispatching the current batch.

Returns. A strictly positive TimeSpan. TimeSpan.Zero and Timeout.InfiniteTimeSpan are rejected by the registry at startup — every aggregator must declare a finite time-based flush path so buffered messages always have a route to dispatch.


public abstract int BatchSize()

Gets the maximum number of messages to buffer before dispatching the batch.

Returns. A strictly positive integer. Zero and negative values are rejected by the registry at startup.


public abstract Task ExecuteAsync(IReadOnlyList<T> messages, CancellationToken cancellationToken = default)

Processes a completed batch. The framework calls this on a background flush after materialising the buffered records through the configured IAggregatorPersistor; throwing from ExecuteAsync leaves the batch in the persistor so the next flush retries it.

Parameters

  • messages — the messages collected for the batch, in the order the persistor returned them. Sort inside ExecuteAsync if intra-batch ordering matters.
  • cancellationToken — token to observe for cancellation; flows through from the dispatcher.

Remarks. ExecuteAsync is async-first — it runs on a background flush timer and returns a batch outcome, not a handler result. The CancellationToken flows through so long-running I/O can be cancelled cleanly; await async work directly rather than reaching for sync-over-async bridges.

public interface IAggregatorSnapshot
{
IReadOnlyList<IHasCorrelationId> ResolvedMessages { get; }
IReadOnlyList<Guid> ResolvedIds { get; }
int UnresolvedCount { get; }
}

The contract returned by IAggregatorPersistor.GetSnapshotAsync — implement it on your own snapshot type if a custom persistor needs to surface fields the built-in record does not carry (for example, a per-record timestamp). Most persistors return the framework-supplied AggregatorSnapshot record below.

public sealed record AggregatorSnapshot(
IReadOnlyList<IHasCorrelationId> ResolvedMessages,
IReadOnlyList<Guid> ResolvedIds,
int UnresolvedCount) : IAggregatorSnapshot

The default IAggregatorSnapshot implementation. A point-in-time capture of aggregator messages returned by the configured IAggregatorPersistor. Carries the deserialised messages, the ids of the underlying storage records, and the count of records that could not be resolved — typically because the CLR type the record was serialised against has been renamed or removed.

IReadOnlyList<IHasCorrelationId> ResolvedMessages { get; init; }

Gets the deserialised messages recovered from the persistor. Typed as IReadOnlyList<IHasCorrelationId> because every aggregated record carries a correlation id; the framework casts to T (where T : Message and Message implements IHasCorrelationId) before calling Aggregator<T>.ExecuteAsync.


IReadOnlyList<Guid> ResolvedIds { get; init; }

Gets the storage-record ids for the resolved messages. The persistor uses these ids to remove the flushed records after a successful ExecuteAsync.


int UnresolvedCount { get; init; }

Gets the number of storage records that could not be resolved to a CLR type. A non-zero value usually indicates a message contract rename since the records were written; the unresolved rows remain in storage and do not participate in the flush.


public static AggregatorSnapshot Empty { get; } = new([], [], 0);

Gets an empty snapshot with no resolved or unresolved records. Persistor implementations return Empty instead of allocating a fresh empty instance per call.

Collecting carrier quotes before acting on them

Section titled “Collecting carrier quotes before acting on them”
public sealed class QuoteAggregator : Aggregator<ShippingQuote>
{
private readonly IBus _bus;
public QuoteAggregator(IBus bus) => _bus = bus;
public override int BatchSize() => 25;
public override TimeSpan Timeout() => TimeSpan.FromSeconds(30);
public override async Task ExecuteAsync(IReadOnlyList<ShippingQuote> messages, CancellationToken cancellationToken = default)
{
if (messages.Count == 0) return;
var correlationId = messages[0].CorrelationId;
var cheapest = messages.OrderBy(q => q.Price).First();
await _bus.PublishAsync(new ShippingQuotesReceived(correlationId)
{
CarrierCount = messages.Count,
SelectedCarrier = cheapest.Carrier,
SelectedPrice = cheapest.Price,
});
}
}

QuoteAggregator flushes either once 25 quotes have arrived or every 30 seconds — whichever happens first. ExecuteAsync publishes the fan-in event ShippingQuotesReceived carrying the winning quote; the async publish is awaited directly, and the supplied CancellationToken flows through to cooperating work. If PublishAsync throws, the batch remains in the persistor and the next flush retries it.