IProcessManagerFinder
Overview
Section titled “Overview”IProcessManagerFinder is the storage interface the framework calls to resolve and persist process-manager (saga) state. For every inbound message that targets a saga, the dispatch pipeline calls FindDataAsync<T> to load the correlated state record, invokes the handler, then calls either UpdateDataAsync<T> or DeleteDataAsync<T> depending on whether the saga is still active. InsertDataAsync is called the first time a correlation id is seen and there is no existing record.
ServiceConnect ships in-memory and MongoDB implementations. Substitute a custom implementation — Postgres, SQL Server, DynamoDB — by registering it via ServiceConnectBuilder.AddRegistration.
See Process Manager for the conceptual model.
Reference
Section titled “Reference”FindDataAsync<T>
Section titled “FindDataAsync<T>”Task<IPersistenceData<T>?> FindDataAsync<T>( IProcessManagerPropertyMapper mapper, Message message, CancellationToken cancellationToken = default) where T : class, IProcessManagerData;Locates the persisted state record that correlates with message according to the rules in mapper.
Parameters
mapper— the mapper that holds the registered(data property, message property)correlation expressions for this handler. A production finder walksmapper.Mappingsto extract the lookup value from the message; in straightforward cases the lookup key is simplymessage.CorrelationId.message— the inbound message being dispatched; the correlation key is derived from it.cancellationToken— cancels the database round-trip.
Returns. An IPersistenceData<T> wrapper if a matching record exists; null if no record is found. Never throw for the not-found case — the dispatcher treats null as the trigger to call InsertDataAsync and create a fresh record.
Remarks. The returned IPersistenceData<T> wrapper is passed back verbatim to UpdateDataAsync<T> or DeleteDataAsync<T>. Implementations can carry concurrency metadata (a version column value, an ETag, a row lock handle) on the concrete type that implements IPersistenceData<T> — the interface only exposes Data, so any extra fields are invisible to the framework but available to the implementation when the wrapper comes back.
Fresh-copy contract
Section titled “Fresh-copy contract”Every call to FindDataAsync<T> must return a fresh IPersistenceData<T>.Data object. Two successive calls for the same correlation id must produce two independent references — mutations made to the Data of the first call’s return value must not propagate to the second call’s return value. Both built-in providers (InMemory and MongoDB) comply: InMemoryProcessManagerFinder performs a deep clone on every read; MongoDbProcessManagerFinder deserialises a new object from the wire.
Custom implementations that cache and return the same mutable Data reference across calls violate this contract. The violation is silent during development (the first call’s mutations appear to persist “for free”) but causes state corruption when the same saga record is accessed by more than one concurrent message in the same process.
InsertDataAsync
Section titled “InsertDataAsync”Task InsertDataAsync(IProcessManagerData data, CancellationToken cancellationToken = default);Persists a freshly-constructed state record the first time a correlation id is seen.
Parameters
data— the new state object; the implementation should derive the storage key fromdata.CorrelationId.cancellationToken— cancels the insert.
Remarks. Called exactly once per correlation id lifetime. After the first insert, all subsequent state changes go through UpdateDataAsync<T>.
UpdateDataAsync<T>
Section titled “UpdateDataAsync<T>”Task UpdateDataAsync<T>( IPersistenceData<T> data, CancellationToken cancellationToken = default) where T : class, IProcessManagerData;Persists mutations to an existing state record.
Parameters
data— theIPersistenceData<T>wrapper previously returned byFindDataAsync<T>. The implementation reads concurrency metadata from the wrapper’s concrete type to guard against lost updates.cancellationToken— cancels the update.
Remarks. Implementations using optimistic concurrency should verify that the record’s stored version matches the version captured at FindDataAsync<T> time. If the versions differ — another handler has written the record in the interim — throw a concurrency exception so the dispatch pipeline can retry.
DeleteDataAsync<T>
Section titled “DeleteDataAsync<T>”Task DeleteDataAsync<T>( IPersistenceData<T> data, CancellationToken cancellationToken = default) where T : class, IProcessManagerData;Removes the persisted state record, ending the saga’s lifetime.
Parameters
data— the wrapper previously returned byFindDataAsync<T>. Delete by the storage id or correlation id embedded in the concrete type.cancellationToken— cancels the delete.
IPersistenceData<T>
Section titled “IPersistenceData<T>”IPersistenceData<T> is the thin wrapper that FindDataAsync<T> returns and that UpdateDataAsync<T> / DeleteDataAsync<T> accept. The interface exposes only Data; the concrete type carries whatever concurrency metadata the implementation requires.
public interface IPersistenceData<T> where T : class, IProcessManagerData{ T Data { get; set; }}Data — gets or sets the process-manager state object. The handler mutates this in place; the framework writes it back by passing the same wrapper to UpdateDataAsync<T>. Because Data is mutable, any changes made inside HandleAsync are observed by the finder on the next UpdateDataAsync<T> call without requiring a separate copy step.
Implementations add fields such as int Version, string ETag, or Guid RowId on the concrete class. Because the framework only interacts with the interface, those extra fields survive the round-trip from FindDataAsync<T> through to UpdateDataAsync<T> without the framework touching them.
IVersioned
Section titled “IVersioned”ServiceConnect.Interfaces.IVersioned exposes the optimistic-concurrency version counter from a wrapper without requiring a concrete-type cast:
public interface IVersioned{ long Version { get; }}Both MemoryData<T> (the InMemory provider’s wrapper) and MongoDbData<T> (the MongoDB provider’s wrapper) implement IVersioned. Use it to read the version number in persistence-agnostic code — for example, in a shared test helper that asserts the version was incremented.
IIdentified
Section titled “IIdentified”ServiceConnect.Interfaces.IIdentified exposes a stable storage Id from a wrapper without requiring a concrete-type cast:
public interface IIdentified{ Guid Id { get; }}Both MemoryData<T> and MongoDbData<T> implement IIdentified. The Id is a storage-level surrogate key — it is distinct from IProcessManagerData.CorrelationId (the business correlation key). Use IIdentified to extract the Id from an opaque stored row in persistence-agnostic code: logging, diagnostics, or a test helper that verifies the Id is stable across updates.
MemoryData<T> Id stability contract. InsertDataAsync stamps a fresh Guid.NewGuid() Id. Every subsequent UpdateDataAsync<T> preserves the same Id. Each FindDataAsync<T> returns a deep-cloned wrapper carrying that Id, so the Id you observe via IIdentified is identical across successive reads. This mirrors MongoDB’s _id contract.
InMemory saga store isolation
Section titled “InMemory saga store isolation”InMemoryProcessManagerFinder uses a private SagaProvider instance, separate from the public ICacheProvider / IKeyValueStore registered for general cache use by application code. User code consuming IKeyValueStore cannot read, modify, or delete saga state through that interface. This isolation matches the MongoDB persistor’s behaviour, where saga state lives in dedicated collections that the user-facing API does not expose.
Multi-saga limitation. FindMatchingItem<T> iterates every entry in the partitioned saga provider and the keys are bare correlation-id strings (no type prefix). A caller that runs multiple saga types through a single finder instance will see InvalidOperationException from FindData as soon as the iterator visits a row whose wrapper type does not match T. For multi-saga topologies, run one finder instance per saga type or use the MongoDB persistor.
Implementing
Section titled “Implementing”Concurrency contract
Section titled “Concurrency contract”Multiple handlers may call FindDataAsync<T> for the same correlation id concurrently — for example, when two messages arrive together for the same saga instance. Implementations must protect against lost updates using one of:
- Optimistic concurrency — store a
versioninteger or ETag.FindDataAsync<T>reads and captures it in the returned wrapper.UpdateDataAsync<T>issuesUPDATE … WHERE id = @id AND version = @versionand throwsConcurrencyException(or equivalent) if zero rows are affected. The dispatch pipeline retries on concurrency failure. - Pessimistic locking —
FindDataAsync<T>acquires a row-level lock (SELECT … FOR UPDATE). The lock is held until the transaction completes inUpdateDataAsync<T>orDeleteDataAsync<T>. Requires thatFindDataAsync<T>and the subsequent write run inside the same transaction scope.
Not-found contract
Section titled “Not-found contract”FindDataAsync<T> must return null when no record exists — never throw a KeyNotFoundException or equivalent. The dispatcher distinguishes null (first message, call InsertDataAsync) from a non-null wrapper (existing saga, call UpdateDataAsync<T>).
Insert vs update
Section titled “Insert vs update”InsertDataAsync is called only for the first message that starts a saga. All subsequent messages — including the message that completes the saga — go through FindDataAsync<T> followed by UpdateDataAsync<T> or DeleteDataAsync<T>. Implementations must not call InsertDataAsync from within UpdateDataAsync<T>.
Skeletal Postgres implementation sketch
Section titled “Skeletal Postgres implementation sketch”public sealed class PostgresProcessManagerFinder : IProcessManagerFinder{ private readonly string _connectionString;
public async Task<IPersistenceData<T>?> FindDataAsync<T>( IProcessManagerPropertyMapper mapper, Message message, CancellationToken cancellationToken = default) where T : class, IProcessManagerData { // Production finders walk mapper.Mappings to extract the lookup value; // we correlate on CorrelationId directly here for brevity. await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var row = await conn.QuerySingleOrDefaultAsync<(Guid Id, string State, long Version)?>( "SELECT id, state::text, version FROM process_manager_state WHERE id = @id", new { id = message.CorrelationId });
if (row is null) return null;
var (_, state, version) = row.Value; var data = JsonSerializer.Deserialize<T>(state)!; return new PostgresPersistenceData<T>(data, version); }
public async Task UpdateDataAsync<T>( IPersistenceData<T> data, CancellationToken cancellationToken = default) where T : class, IProcessManagerData { var wrapper = (PostgresPersistenceData<T>)data; var state = JsonSerializer.Serialize(data.Data); await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var rows = await conn.ExecuteAsync( "UPDATE process_manager_state SET state = @state::jsonb, version = version + 1 WHERE id = @id AND version = @version", new { id = data.Data.CorrelationId, state, version = wrapper.Version });
if (rows == 0) throw new ConcurrencyException($"Optimistic concurrency failure for saga {data.Data.CorrelationId}."); }
// ... InsertDataAsync, DeleteDataAsync omitted for brevity}Postgres-backed finder with optimistic concurrency
Section titled “Postgres-backed finder with optimistic concurrency”The following example implements all four members against a process_manager_state table with a version column. Concurrent updates are detected via UPDATE … WHERE id = @id AND version = @version; zero rows affected signals a lost update and throws so the pipeline retries.
// Schema (run once during migration):// CREATE TABLE process_manager_state (// id UUID NOT NULL PRIMARY KEY,// state JSONB NOT NULL,// version INTEGER NOT NULL DEFAULT 0,// updated_at TIMESTAMPTZ NOT NULL DEFAULT now()// );
public sealed class PostgresPersistenceData<T> : IPersistenceData<T> where T : class, IProcessManagerData{ public T Data { get; set; } public long Version { get; }
public PostgresPersistenceData(T data, long version) { Data = data; Version = version; }}
public sealed class PostgresProcessManagerFinder : IProcessManagerFinder{ private readonly string _connectionString; private readonly JsonSerializerOptions _jsonOptions;
public PostgresProcessManagerFinder(string connectionString, JsonSerializerOptions jsonOptions) { _connectionString = connectionString; _jsonOptions = jsonOptions; }
public async Task<IPersistenceData<T>?> FindDataAsync<T>( IProcessManagerPropertyMapper mapper, Message message, CancellationToken cancellationToken = default) where T : class, IProcessManagerData { // Production finders walk mapper.Mappings to extract the lookup value; // we correlate on CorrelationId directly here for brevity. await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var row = await conn.QuerySingleOrDefaultAsync<(Guid Id, string State, long Version)?>( "SELECT id, state::text, version FROM process_manager_state WHERE id = @id", new { id = message.CorrelationId });
if (row is null) return null;
var (_, state, version) = row.Value; var data = JsonSerializer.Deserialize<T>(state, _jsonOptions)!; return new PostgresPersistenceData<T>(data, version); }
public async Task InsertDataAsync( IProcessManagerData data, CancellationToken cancellationToken = default) { var state = JsonSerializer.Serialize(data, data.GetType(), _jsonOptions);
await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
await conn.ExecuteAsync( @"INSERT INTO process_manager_state (id, state, version, updated_at) VALUES (@id, @state::jsonb, 0, now())", new { id = data.CorrelationId, state }); }
public async Task UpdateDataAsync<T>( IPersistenceData<T> data, CancellationToken cancellationToken = default) where T : class, IProcessManagerData { var wrapper = (PostgresPersistenceData<T>)data; var state = JsonSerializer.Serialize(data.Data, _jsonOptions);
await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
var rows = await conn.ExecuteAsync( @"UPDATE process_manager_state SET state = @state::jsonb, version = version + 1, updated_at = now() WHERE id = @id AND version = @version", new { id = data.Data.CorrelationId, state, version = wrapper.Version });
if (rows == 0) throw new ConcurrencyException( $"Optimistic concurrency failure for saga {data.Data.CorrelationId}. " + "The record was modified by another handler since it was loaded."); }
public async Task DeleteDataAsync<T>( IPersistenceData<T> data, CancellationToken cancellationToken = default) where T : class, IProcessManagerData { await using var conn = new NpgsqlConnection(_connectionString); await conn.OpenAsync(cancellationToken);
await conn.ExecuteAsync( "DELETE FROM process_manager_state WHERE id = @id", new { id = data.Data.CorrelationId }); }}Register during bus startup:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(services => services.AddSingleton<IProcessManagerFinder>(_ => new PostgresProcessManagerFinder(connectionString, jsonOptions)));});When the PaymentSaga receives a PaymentAuthorised message, the dispatch pipeline calls FindDataAsync<PaymentSagaData> to load the correlating record, invokes the handler, and on a successful return calls UpdateDataAsync<PaymentSagaData> with the same wrapper. If another consumer processed a concurrent message and incremented version in the interim, UpdateDataAsync<PaymentSagaData> throws and the pipeline retries the whole handler invocation from FindDataAsync<PaymentSagaData> again.
MongoDB saga store contract
Section titled “MongoDB saga store contract”The MongoDB implementation (MongoDbProcessManagerFinder) adds several requirements and behaviours beyond the base interface contract.
WriteConcern.Unacknowledged rejected
Section titled “WriteConcern.Unacknowledged rejected”If the IMongoClient supplied to the MongoDB provider is configured with WriteConcern.Unacknowledged (w:0), MongoDbProcessManagerFinder throws InvalidOperationException during startup — before any consumer host begins polling. Unacknowledged writes disable the optimistic concurrency version checks, which would silently advance the version on missed updates and wedge sagas on the next real conflict.
The same guard applies to ITimeoutStore and IAggregatorPersistor MongoDB implementations; see Persistence Configuration → MongoDB provider contract for the consolidated description.
CorrelationId uniqueness via pre-created unique index
Section titled “CorrelationId uniqueness via pre-created unique index”A startup IHostedService pre-creates a unique index on CorrelationId for each saga data type registered via AddProcessManager<TSaga> before consumer polling starts. This closes the cross-process startup race that could admit duplicate saga rows. The index creation is idempotent — running it again on an existing index is safe.
Property-hierarchy type coercion
Section titled “Property-hierarchy type coercion”FindDataAsync<T> wraps the message-side correlation property value in Expression.Convert against the saga property’s declared type before building the filter. Without this, a message-side int matched against a saga-side long, Nullable<long>, or interface-typed property silently produces zero results. The conversion is applied for all property mappings registered via IProcessManagerPropertyMapper.
Generic saga collection-name sanitization
Section titled “Generic saga collection-name sanitization”Mongo collection names are derived from typeof(T).FullName. For generic saga types, FullName contains characters (+, backticks, [, ], ,) that are illegal in mongosh autocomplete and many Mongo tooling contexts. The MongoDB provider sanitizes all of these characters to _.
Migration required for existing deployments with generic saga types. If your saga data type is generic (for example, OrderSagaData<OrderState>), the unsanitised collection name used by earlier releases contained these illegal characters. After upgrading, rename the existing collection to the sanitized form before the new version starts consuming:
db.runCommand({ renameCollection: "mydb.OrderSagaData`1[[OrderState, MyAssembly]]", to: "mydb.OrderSagaData_1__OrderState__MyAssembly__" })Run this during a maintenance window or blue/green cutover. The exact old and new names depend on the type’s FullName; apply the substitution rule (+, `, [, ], , → _) to derive both sides.
See also
Section titled “See also”- Process Manager — concept
IProcessHandler— related referenceIProcessManagerData— related referenceIPersistenceConfiguration— related reference