IAggregatorPersistor
Overview
Section titled “Overview”IAggregatorPersistor is the storage contract that an Aggregator<T> uses to buffer incoming messages between deliveries. When a message arrives the framework calls InsertDataAsync to persist it; once the aggregation condition is met, it calls GetSnapshotAsync to obtain the resolved records, fires the aggregator callback, then removes the records via RemoveSnapshotAsync. The interface also exposes CountAsync, GetDataAsync, and bulk-removal methods for management and recovery paths.
ServiceConnect ships in-memory and MongoDB implementations out of the box. Swap in a custom implementation — Postgres, Redis, DynamoDB — by registering it via ServiceConnectBuilder.AddRegistration.
See Aggregator for the conceptual model.
Reference
Section titled “Reference”InsertDataAsync
Section titled “InsertDataAsync”Task InsertDataAsync(IHasCorrelationId data, string name, string idempotencyKey, CancellationToken cancellationToken = default);Persists a single message payload for the named aggregator instance, idempotent on idempotencyKey within the aggregator’s active row set.
Parameters
data— the message payload to store. Must implementIHasCorrelationId(theMessagebase class does, so any concreteMessageworks).name— the logical aggregator name (for example,"ShippingAggregator"). Used as a partition key — all operations for one aggregator are scoped to this value. The framework derives this value fromhandlerType.FullName(the concrete user subclass, e.g."MyNamespace.TelemetrySliceAggregator"), not from the closed-generic base type.idempotencyKey— a stable per-message identifier (typically the broker-sideMessageId) used to reject re-inserts of the same delivery. A retry-queue redelivery between Insert and the dispatcher’s broker ack will re-enterInsertDataAsyncwith the same key while the prior insert’s row is still buffered; the persistor must skip the second write so the aggregator’sExecutesees each delivery exactly once. Once the row has been removed (snapshot dispatched), the key is no longer tracked.cancellationToken— cancels the storage operation.
Remarks. Each call corresponds to one message delivery. The store must be able to hold multiple records under the same name, distinguishable by a per-record identifier (typically the correlation id embedded in the message). The idempotency-key contract makes the operation safe under at-least-once redelivery.
GetDataAsync
Section titled “GetDataAsync”Task<IReadOnlyList<IHasCorrelationId>> GetDataAsync(string name, CancellationToken cancellationToken = default);Returns every buffered record for the named aggregator as a list of deserialised payloads.
Parameters
name— the logical aggregator name to load.cancellationToken— cancels the load.
Returns. All stored records; an empty list when none are present (never null).
Remarks. This method is used by recovery paths and administrative tooling. For normal aggregation flow, GetSnapshotAsync is preferred because it separates resolved records from those that could not be deserialised.
GetSnapshotAsync
Section titled “GetSnapshotAsync”Task<IAggregatorSnapshot> GetSnapshotAsync(string name, CancellationToken cancellationToken = default);Returns a snapshot of the buffered records, separating those that deserialised successfully from those that could not.
Parameters
name— the logical aggregator name.cancellationToken— cancels the load.
Returns. An IAggregatorSnapshot describing the resolved and unresolved portions of the buffer. See IAggregatorSnapshot below for the member details.
Remarks. The snapshot is a point-in-time view, not a version token. After the aggregator fires, the same snapshot is passed to RemoveSnapshotAsync to remove exactly the records captured in it — any records that arrived after the snapshot was taken remain in the store.
IAggregatorSnapshot
Section titled “IAggregatorSnapshot”IAggregatorSnapshot is the companion interface returned by GetSnapshotAsync. It separates the buffered records into those the store could deserialise (resolved) and those it could not (unresolved).
IAggregatorSnapshot.ResolvedMessages
Section titled “IAggregatorSnapshot.ResolvedMessages”IReadOnlyList<IHasCorrelationId> ResolvedMessages { get; }The deserialised message payloads for all records that could be hydrated. The aggregator callback receives this list. Every element implements IHasCorrelationId (the Message base class does); cast to the concrete saga type when consuming.
IAggregatorSnapshot.ResolvedIds
Section titled “IAggregatorSnapshot.ResolvedIds”IReadOnlyList<Guid> ResolvedIds { get; }The storage identifiers corresponding to ResolvedMessages, in the same order. RemoveSnapshotAsync uses these ids as the delete key set — implementations that store records by (name, correlationId) pairs should match on correlationId.
IAggregatorSnapshot.UnresolvedCount
Section titled “IAggregatorSnapshot.UnresolvedCount”int UnresolvedCount { get; }The number of stored records that could not be deserialised to CLR objects. A positive value indicates schema drift or missing type registrations. The framework counts these towards the total when evaluating count-based conditions but cannot pass them to the aggregator callback.
RemoveDataAsync
Section titled “RemoveDataAsync”Task RemoveDataAsync(string name, Guid correlationId, CancellationToken cancellationToken = default);Removes a single buffered record by its correlation id.
Parameters
name— the logical aggregator name.correlationId— the correlation id of the record to remove.cancellationToken— cancels the operation.
Throws ConcurrencyException when the (name, correlationId) row cannot be located — either because another writer concurrently removed it, or because the caller supplied a mismatched key. All first-party persistors raise this contract on no-op delete; third-party implementations should match it so callers can distinguish a concurrent-removal race from a structural persistence failure.
Remarks. Used by compensating or administrative flows that need to evict one specific message from the buffer without triggering aggregation.
RemoveAllAsync
Section titled “RemoveAllAsync”Task RemoveAllAsync(string name, CancellationToken cancellationToken = default);Removes all buffered records for the named aggregator unconditionally.
Parameters
name— the logical aggregator name.cancellationToken— cancels the operation.
Remarks. Use with care. This is a bulk delete — it removes every record regardless of whether they have been resolved. Useful during saga cancellation or partition reset scenarios.
RemoveSnapshotAsync
Section titled “RemoveSnapshotAsync”Task RemoveSnapshotAsync(string name, IAggregatorSnapshot snapshot, CancellationToken cancellationToken = default);Removes the records captured in a previously loaded snapshot.
Parameters
name— the logical aggregator name.snapshot— the snapshot returned byGetSnapshotAsync; implementations should delete the records whose ids are listed insnapshot.ResolvedIds.cancellationToken— cancels the operation.
Remarks. The contract here is important for correctness under at-least-once delivery. If RemoveSnapshotAsync fails after the aggregator has already fired, the framework retries the delivery; the surviving records are presented again on the next GetSnapshotAsync call and the aggregator fires a second time — duplicate delivery results. Implement this method atomically (for example, inside a transaction) wherever possible, and make the aggregator callback idempotent.
ReleaseSnapshotAsync
Section titled “ReleaseSnapshotAsync”Task ReleaseSnapshotAsync(string name, IAggregatorSnapshot snapshot, CancellationToken cancellationToken = default);Releases the lease held by the supplied snapshot so the rows become immediately re-claimable by a subsequent GetSnapshotAsync.
Parameters
name— the logical aggregator name.snapshot— the snapshot whose lease should be released.cancellationToken— cancels the operation.
Default implementation. Returns Task.CompletedTask immediately — a no-op. This is correct for any persistor that does not stamp a lease during GetSnapshotAsync (the in-memory store and third-party implementations that predate this method fall into this category). The no-op semantics match a persistor where rows are always re-claimable by id alone.
Override requirement. Persistors that stamp a LockedBy/LockExpiresAt pair on rows during snapshot acquisition — the MongoDB persistor does — must override this method to clear those columns for the snapshot’s session id. Without an override, a handler failure leaves the rows leased until the persistor’s lease TTL expires (5 minutes on the MongoDB persistor by default). During that window the next redelivery’s GetSnapshotAsync returns an empty snapshot, and the handler is never re-invoked with the buffered records.
Remarks. Called by the aggregator processor immediately on handler failure, before the delivery is nacked back to the broker. The retry-queue redelivery arrives after the explicit release rather than after the TTL expiry, keeping the aggregate’s observed latency close to the broker’s retry interval.
CountAsync
Section titled “CountAsync”Task<int> CountAsync(string name, CancellationToken cancellationToken = default);Returns the number of buffered records currently held for the named aggregator.
Parameters
name— the logical aggregator name.cancellationToken— cancels the operation.
Returns. The count of persisted records; 0 when the buffer is empty.
Remarks. Used by the framework to evaluate count-based aggregation conditions without loading every record.
CountResolvedAsync
Section titled “CountResolvedAsync”Task<int> CountResolvedAsync(string name, CancellationToken cancellationToken = default);Default-interface method that counts persisted messages whose CLR type is currently resolvable. Drives the batch-size flush gate so unresolved-only batches do not trigger flushes that would produce no work.
Parameters
name— the logical aggregator name.cancellationToken— cancels the operation.
Returns. The number of stored records whose CLR type is currently resolvable.
Default implementation. Delegates to CountAsync — correct for any persistor whose stored records are always type-resolvable (e.g. an in-memory store that holds deserialised IHasCorrelationId instances) and for any deployment where every registered type still has a live CLR mapping. Safe but not optimal: a persistor with a meaningful resolved/unresolved split (e.g. Mongo across a type-rename rollout) should override with a cheap typed predicate to avoid flushing on rows that would only count toward the gate.
Implementers MUST NOT override with a method that mutates state. This method runs on every InsertDataAsync as the batch-size flush gate; an implementation that claims a lease (e.g. by delegating to GetSnapshotAsync on a snapshot-claims-lease persistor) would rotate the lease on every insert and break the per-flush lease invariant.
Implementing
Section titled “Implementing”Concurrency
Section titled “Concurrency”The framework may invoke InsertDataAsync and CountAsync concurrently from multiple consumer threads, potentially for the same name when an aggregator runs with multiple partitions. Implementations must be safe under concurrent access to the same name. A relational store should rely on database-level row locking; an in-memory store needs a ConcurrentDictionary or a per-name SemaphoreSlim.
Transactional expectations
Section titled “Transactional expectations”RemoveSnapshotAsync is the critical path for correctness. If the aggregator callback completes but RemoveSnapshotAsync fails — network timeout, deadlock, process crash — the records remain in the store. On the next GetSnapshotAsync the same records appear again, and the aggregator fires a second time. Design aggregator callbacks to be idempotent, and where the storage engine permits it, wrap the aggregator callback and RemoveSnapshotAsync in a single database transaction or outbox pattern.
Snapshot semantics
Section titled “Snapshot semantics”A snapshot is a view, not a lock. Records written between GetSnapshotAsync and RemoveSnapshotAsync are not included in ResolvedIds and survive the delete. The implementation must therefore delete by id set rather than truncating the partition.
Skeletal implementation sketch
Section titled “Skeletal implementation sketch”public sealed class PostgresAggregatorPersistor : IAggregatorPersistor{ private readonly string _connectionString;
public PostgresAggregatorPersistor(string connectionString) => _connectionString = connectionString;
public Task InsertDataAsync(IHasCorrelationId data, string name, string idempotencyKey, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task<IReadOnlyList<IHasCorrelationId>> GetDataAsync(string name, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task<IAggregatorSnapshot> GetSnapshotAsync(string name, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task RemoveDataAsync(string name, Guid correlationId, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task RemoveAllAsync(string name, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task RemoveSnapshotAsync(string name, IAggregatorSnapshot snapshot, CancellationToken cancellationToken = default) => throw new NotImplementedException();
public Task<int> CountAsync(string name, CancellationToken cancellationToken = default) => throw new NotImplementedException();
// ReleaseSnapshotAsync has a default implementation on the interface that returns // Task.CompletedTask (no-op). For a store that does not lease rows during // GetSnapshotAsync — like this Postgres sketch — the DIM default is correct and no // override is needed. Persistors that stamp a lock on snapshot acquisition (e.g. // MongoDB) must override to clear the lock on failure so rows are immediately // re-claimable.
// CountResolvedAsync has a default implementation on the interface that delegates // to CountAsync — override here if you want a cheaper "type-resolvable rows only" // predicate (matters under type renames).}Postgres-backed aggregator buffer using Npgsql
Section titled “Postgres-backed aggregator buffer using Npgsql”The following example implements IAggregatorPersistor against a Postgres table with an UPSERT-style insert and a set-based delete for snapshot removal.
// Schema (run once during migration):// CREATE TABLE aggregator_messages (// id UUID NOT NULL,// name TEXT NOT NULL,// payload JSONB NOT NULL,// type_name TEXT NOT NULL,// created_at TIMESTAMPTZ NOT NULL DEFAULT now(),// PRIMARY KEY (id, name)// );
public sealed class PostgresAggregatorPersistor : IAggregatorPersistor{ private readonly string _connectionString; private readonly JsonSerializerOptions _jsonOptions;
public PostgresAggregatorPersistor(string connectionString, JsonSerializerOptions jsonOptions) { _connectionString = connectionString; _jsonOptions = jsonOptions; }
public async Task InsertDataAsync( IHasCorrelationId data, string name, string idempotencyKey, CancellationToken cancellationToken = default) { var correlationId = data.CorrelationId; var typeName = data.GetType().AssemblyQualifiedName!; var payload = JsonSerializer.Serialize(data, data.GetType(), _jsonOptions);
await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
// idempotencyKey is unique within the active row set: insert is suppressed if // the same key already exists for the same (name) partition. ON CONFLICT // DO NOTHING gives the at-least-once-redelivery-safe contract the interface // requires. await conn.ExecuteAsync( @"INSERT INTO aggregator_messages (id, name, payload, type_name, idempotency_key) VALUES (@id, @name, @payload::jsonb, @typeName, @idempotencyKey) ON CONFLICT (name, idempotency_key) DO NOTHING", new { id = correlationId, name, payload, typeName, idempotencyKey }); }
public async Task<IReadOnlyList<IHasCorrelationId>> GetDataAsync( string name, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var rows = await conn.QueryAsync<(Guid Id, string Payload, string TypeName)>( "SELECT id, payload::text, type_name FROM aggregator_messages WHERE name = @name ORDER BY created_at", new { name });
return rows .Select(r => (IHasCorrelationId)JsonSerializer.Deserialize(r.Payload, Type.GetType(r.TypeName)!, _jsonOptions)!) .ToList(); }
public async Task<IAggregatorSnapshot> GetSnapshotAsync( string name, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var rows = await conn.QueryAsync<(Guid Id, string Payload, string TypeName)>( "SELECT id, payload::text, type_name FROM aggregator_messages WHERE name = @name ORDER BY created_at", new { name });
var resolved = new List<IHasCorrelationId>(); var resolvedIds = new List<Guid>(); var unresolved = 0;
foreach (var row in rows) { var type = Type.GetType(row.TypeName); if (type is null) { unresolved++; continue; } var obj = JsonSerializer.Deserialize(row.Payload, type, _jsonOptions) as IHasCorrelationId; if (obj is null) { unresolved++; continue; } resolved.Add(obj); resolvedIds.Add(row.Id); }
return new AggregatorSnapshot(resolved, resolvedIds, unresolved); }
public async Task RemoveDataAsync( string name, Guid correlationId, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); await conn.ExecuteAsync( "DELETE FROM aggregator_messages WHERE name = @name AND id = @id", new { name, id = correlationId }); }
public async Task RemoveAllAsync( string name, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); await conn.ExecuteAsync( "DELETE FROM aggregator_messages WHERE name = @name", new { name }); }
public async Task RemoveSnapshotAsync( string name, IAggregatorSnapshot snapshot, CancellationToken cancellationToken = default) { if (snapshot.ResolvedIds.Count == 0) return;
await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
// Delete exactly the ids captured in the snapshot — any records that arrived // after GetSnapshotAsync was called are unaffected. await conn.ExecuteAsync( "DELETE FROM aggregator_messages WHERE name = @name AND id = ANY(@ids)", new { name, ids = snapshot.ResolvedIds.ToArray() }); }
public async Task<int> CountAsync( string name, CancellationToken cancellationToken = default) { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken); var count = await conn.ExecuteScalarAsync<long>( "SELECT COUNT(*) FROM aggregator_messages WHERE name = @name", new { name }); return (int)count; }
// Minimal snapshot value type used above. private sealed record AggregatorSnapshot( IReadOnlyList<IHasCorrelationId> ResolvedMessages, IReadOnlyList<Guid> ResolvedIds, int UnresolvedCount) : IAggregatorSnapshot;}Register the implementation during bus startup:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(services => services.AddSingleton<IAggregatorPersistor>(_ => new PostgresAggregatorPersistor(connectionString, jsonOptions)));});MongoDB aggregator contract
Section titled “MongoDB aggregator contract”The MongoDB implementation (MongoDbAggregatorPersistor) adds several behaviours and exception contracts beyond the base interface.
WriteConcern.Unacknowledged rejected
Section titled “WriteConcern.Unacknowledged rejected”MongoDbAggregatorPersistor requires the IMongoClient to be configured with an acknowledged write concern (w:1 or higher). Construction throws InvalidOperationException for WriteConcern.Unacknowledged. Under w:0, RemoveDataAsync’s IsAcknowledged-gated branch silently succeeds and the documented ConcurrencyException contract on stale-version updates is broken — admitting duplicate aggregate dispatch.
See Persistence Configuration → MongoDB provider contract for the full guard set across all three Mongo stores.
Insertion order semantics
Section titled “Insertion order semantics”Records inserted with identical InsertedAtTicks values — for example, two messages processed within a single DateTime.Tick — are ordered by a per-process monotonic counter (InsertSequence). This ensures that within a single process, records appear in their actual insertion order regardless of clock resolution. Cross-process ties (two processes inserting the same tick value simultaneously) remain unresolved and produce an arbitrary but stable order.
RemoveDataAsync exception contract
Section titled “RemoveDataAsync exception contract”RemoveDataAsync(name, correlationId) distinguishes two error conditions:
KeyNotFoundException— thrown when no records exist for the givennameat all. This indicates a caller error (the aggregator partition does not exist) or a cleanup race where the partition was already fully removed.ConcurrencyException— thrown when records exist fornamebut none match the givencorrelationId. This is a genuine concurrent-removal race; the exception message includes the row count for the partition. The caller may retry or treat it as resolved depending on their semantics.PersistenceException— thrown when the underlying BSON serialization layer raises aBsonException(for example,BsonSerializationExceptionon schema drift). The originalBsonExceptionis wrapped as the inner exception.
Custom implementations targeting a different storage engine should map their equivalent error conditions to the same exception types to maintain cross-provider behavioural consistency.
Index caching
Section titled “Index caching”EnsureIndexesAsync is called on the first operation and its result cached for the lifetime of the persistor instance. Subsequent operations bypass the index-creation round-trip, eliminating per-message overhead on the hot path.
See also
Section titled “See also”- Aggregator — concept
Aggregator<T>— related referenceIPersistenceConfiguration— related reference