Skip to content

Process Manager

Process Manager (sometimes called saga) is the pattern for workflows that span several messages over time. “When we’ve seen inventory reserved and payment captured for this order, ship it.” The bus correlates each incoming message to a persisted state object, hands the handler the state, and saves any changes the handler makes when it returns.

Unlike Routing Slip, the path is not fixed up front. The process manager decides what happens next based on what it has already seen — which makes it the right shape for workflows with branches, races, and timeouts.

  • The workflow spans multiple messages arriving at different times, possibly in different orders.
  • Deciding what to do next requires knowing what has already happened.
  • You need the coordination state to survive process restarts.

If the steps are a fixed ordered chain, prefer Routing Slip — it carries the itinerary in the message and needs no persistence. If the steps are independent, Pub/Sub is enough.

Three types. The messages that drive the workflow, and the persisted state:

Contracts/Workflow.cs
using ServiceConnect.Interfaces;
public sealed class OrderSubmitted(Guid correlationId) : Message(correlationId)
{
public string OrderNumber { get; init; } = string.Empty;
}
public sealed class InventoryReserved(Guid correlationId) : Message(correlationId)
{
public string OrderNumber { get; init; } = string.Empty;
}
public sealed class PaymentCaptured(Guid correlationId) : Message(correlationId) { }
public sealed class FulfillmentState : IProcessManagerData
{
public Guid CorrelationId { get; set; }
public string OrderNumber { get; set; } = string.Empty;
public bool IsSubmitted { get; set; }
public bool InventoryReserved { get; set; }
public bool PaymentCaptured { get; set; }
public bool IsCompleted { get; set; }
}

IProcessManagerData requires exactly one thing — a Guid CorrelationId. Everything else is your workflow’s own state: what step we’re on, what was submitted, what has come back. The messages carry the same CorrelationId, which is how the bus finds the right state record.

A process handler implements IProcessHandler<TData, TMessage> for each message type in the workflow. One class commonly handles them all — the state object is the shared piece:

Orchestrator/FulfillmentProcessHandler.cs
// WorkflowQueues is a record registered as a singleton in DI and injected here.
public sealed class FulfillmentProcessHandler(WorkflowQueues queues) :
IProcessHandler<FulfillmentState, OrderSubmitted>,
IProcessHandler<FulfillmentState, InventoryReserved>,
IProcessHandler<FulfillmentState, PaymentCaptured>
{
private readonly WorkflowQueues _queues = queues;
public async Task HandleAsync(OrderSubmitted message, FulfillmentState data, IConsumeContext context, CancellationToken cancellationToken = default)
{
if (data.IsSubmitted) return; // idempotent — saw this already
data.OrderNumber = message.OrderNumber;
data.IsSubmitted = true;
await context.Bus.SendAsync(
new ReserveInventory(message.CorrelationId) { OrderNumber = message.OrderNumber },
new SendOptions { EndPoint = _queues.InventoryQueueName });
}
public async Task HandleAsync(InventoryReserved message, FulfillmentState data, IConsumeContext context, CancellationToken cancellationToken = default)
{
if (data.InventoryReserved) return;
data.InventoryReserved = true;
await context.Bus.SendAsync(
new CapturePayment(message.CorrelationId),
new SendOptions { EndPoint = _queues.PaymentQueueName });
}
public Task HandleAsync(PaymentCaptured message, FulfillmentState data, IConsumeContext context, CancellationToken cancellationToken = default)
{
if (data.PaymentCaptured) return Task.CompletedTask;
data.PaymentCaptured = true;
data.IsCompleted = true;
return Task.CompletedTask;
}
}

Three things to notice:

  • The bus hands you data pre-loaded. If a state record already exists for this CorrelationId, you receive it. If not, you receive a freshly-constructed empty one (hence the new() constraint on TData). The first message in a workflow effectively creates the record.
  • Mutations persist automatically. When HandleAsync returns cleanly, the bus writes the state back. You don’t call save.
  • State flags carry intent. IsSubmitted, InventoryReserved, PaymentCaptured let the handler recognise a replay — an at-least-once redelivery of the same message — and noop instead of double-processing. The flag checks at the top of each handler are the idempotency gate.

By default, the bus maps message.CorrelationId to data.CorrelationId. That is what Messages means when it says the correlation id is load-bearing.

When the default isn’t right — a message correlates on OrderNumber, for instance, because it was produced by a service that doesn’t know the workflow id — override ConfigureMapper:

void IProcessHandler<FulfillmentState, OrderSubmitted>.ConfigureMapper(IProcessManagerPropertyMapper mapper)
{
mapper.ConfigureMapping<FulfillmentState, OrderSubmitted>(
d => d.OrderNumber,
m => m.OrderNumber);
}

The bus will look up the state record by OrderNumber instead. Use this sparingly — correlation id is cheaper and doesn’t risk collisions.

State has to live somewhere that survives crashes. ServiceConnect ships a MongoDB provider; register it on the bus builder:

Orchestrator/Program.cs
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => { t.Host = "localhost"; });
builder.ConfigureQueues(q => q.QueueName = "fulfillment-orchestrator");
builder.UseMongoDbPersistence(options =>
{
options.ConnectionString = "mongodb://localhost:27017";
options.DatabaseName = "fulfillment";
});
});

The first handler invocation for a given CorrelationId inserts the document; subsequent ones update it. Completing the workflow does not delete it — completed sagas are a record of what happened, which you may want to query or archive.

Without a persistence provider, the process manager has nowhere to put state and the bus will fail fast at startup. This is the correct behaviour — silently forgetting state between messages would be worse.

Fresh-copy contract for custom persistence

Section titled “Fresh-copy contract for custom persistence”

Every call to FindDataAsync<T> must return a fresh IPersistenceData<T>.Data reference. Two successive loads for the same correlation id must produce independent objects — mutations to one must not affect the other. Both built-in providers comply: the InMemory provider deep-clones on every read; MongoDB deserialises a new object from the wire. Custom implementations (Postgres, SQL Server, Redis, …) must do the same. See IProcessManagerFinder for details.

The InMemory provider’s deep-clone uses System.Text.Json to round-trip saga data, which covers the regular-public-property shape every saga in this codebase uses but does not match the BSON layer’s introspection. Two specific shapes are unsupported by the InMemory persistor and will silently lose state through the round-trip:

  • Polymorphic nested values. A property declared as a base type or interface that holds a derived runtime instance round-trips as the declared type, losing the derived properties. Annotate the base with [JsonDerivedType(typeof(Derived), "discriminator")] to teach STJ about the polymorphism, or store the runtime type as a discriminator field and reconstruct on read.
  • Explicit-interface auto-properties. A property declared as an explicit-interface implementation (Guid IFoo.FooId { get; set; }) is invisible to STJ and round-trips as the field default. Expose the backing field as a public property if you need it preserved through InMemory.

The MongoDB provider has neither limitation — BSON’s _t discriminator handles polymorphism, and BsonClassMap discovers explicit-interface auto-properties. Use the MongoDB provider for any saga that depends on these shapes.

Each message type needs its own DI registration, because the bus resolves per message:

services.AddTransient<IProcessHandler<FulfillmentState, OrderSubmitted>, FulfillmentProcessHandler>();
services.AddTransient<IProcessHandler<FulfillmentState, InventoryReserved>, FulfillmentProcessHandler>();
services.AddTransient<IProcessHandler<FulfillmentState, PaymentCaptured>, FulfillmentProcessHandler>();
services.AddSingleton<IReadOnlyList<HandlerReference>>(new List<HandlerReference>
{
new() { HandlerType = typeof(FulfillmentProcessHandler), MessageType = typeof(OrderSubmitted) },
new() { HandlerType = typeof(FulfillmentProcessHandler), MessageType = typeof(InventoryReserved) },
new() { HandlerType = typeof(FulfillmentProcessHandler), MessageType = typeof(PaymentCaptured) },
});

The HandlerReference list tells the dispatch pipeline which message types route to which handler type. The transient DI registrations tell the container how to build one when a message arrives. Both are needed.

Some workflows need to react to nothing happening. “If we haven’t received PaymentCaptured within 24 hours, cancel the order.” The pattern for that is RequestTimeoutAsync:

public async Task HandleAsync(InventoryReserved message, FulfillmentState data, IConsumeContext context, CancellationToken cancellationToken = default)
{
data.InventoryReserved = true;
// Use data.CorrelationId — the saga's own id — not message.CorrelationId.
// CancellationToken.None: the token here cancels the timeout-store insert, not the
// scheduled delivery. Once the row is in the store the delay fires regardless of
// bus shutdown — passing context.CancellationToken would abandon the insert mid-shutdown
// and the timeout would never be scheduled. Pass None when the schedule must survive
// graceful shutdown; pass context.CancellationToken only if you genuinely want the
// schedule to be cooperatively abandoned on bus stop.
await context.Bus.RequestTimeoutAsync(data.CorrelationId, TimeSpan.FromHours(24), CancellationToken.None);
}
public Task HandleAsync(TimeoutMessage message, FulfillmentState data, IConsumeContext context, CancellationToken cancellationToken = default)
{
if (data.IsCompleted) return Task.CompletedTask; // payment already arrived — ignore
// compensate: release inventory, notify the customer, mark cancelled
data.IsCompleted = true;
return Task.CompletedTask;
}

RequestTimeoutAsync schedules a TimeoutMessage carrying the same correlation id to be delivered back to this queue after the delay. Handle it like any other message — add IProcessHandler<FulfillmentState, TimeoutMessage> to the class.

Timeouts require the bus to be told they’re in use, because a background service polls the schedule. Turn it on in bus configuration:

builder.ConfigureBus(bus =>
{
bus.EnableProcessManagerTimeouts = true;
});

If the workflow completes before the timeout fires, the state flag check in the timeout handler is what stops you from compensating a workflow that already succeeded.

Timeout dispatch is at-most-once while the lease holds

Section titled “Timeout dispatch is at-most-once while the lease holds”

The timeout polling service operates under a lease that prevents two competing instances from dispatching the same TimeoutMessage simultaneously. Within a single lease period, dispatch is at-most-once: after sending the timeout message the polling service performs a second check to confirm the lease still holds before advancing the record. If the lease was concurrently acquired by another instance between the send and the re-check, the duplicate send is detected and the record is not advanced, so the second instance does not resend. This does not guarantee exactly-once across process restarts — a crash between send and re-check can cause redelivery, which is why the idempotency flag check in the timeout handler (if (data.IsCompleted) return) is always necessary.

  • Routing Slip — simpler alternative when the sequence is fixed.
  • Aggregator — collect a window of messages and act on the batch. Conceptually related: both accumulate state across messages.
  • Messages — why the correlation id has to be right.