IProcessHandler<TData, TMessage>
Overview
Section titled “Overview”IProcessHandler<TData, TMessage> is the contract you implement for each message type that drives a process manager (saga). The dispatch pipeline loads the correlated TData from the configured IProcessManagerFinder, invokes HandleAsync (passing the per-message context directly as a parameter), and persists any mutations to data when the method returns. Implement the interface once per (TData, TMessage) pair in your workflow — a single class commonly implements the interface several times, one per message it reacts to.
See Process Manager for the conceptual tour.
Reference
Section titled “Reference”The interface is generic over the persisted state type and the message type; both type parameters are constrained:
public interface IProcessHandler<TData, TMessage> where TData : class, IProcessManagerData, new() where TMessage : MessageThe new() constraint lets the framework construct a fresh state object the first time a correlation id is seen.
HandleAsync
Section titled “HandleAsync”Task HandleAsync(TMessage message, TData data, IConsumeContext context, CancellationToken cancellationToken = default);Invoked once the framework has correlated message to the data record. The per-message context is passed directly so the handler is safe across concurrent dispatches. Mutations to data are persisted when the returned task completes successfully; returning a faulted task (or throwing) prevents persistence and propagates to the pipeline’s retry and error-handling policy.
Parameters
message— the deserialised message; guaranteed non-null when invoked by the dispatcher.data— the persisted state record for this correlation id. The first message in a workflow receives a freshly-constructed, default-valued instance; subsequent messages receive the record as last written.context— the per-message consume context (bus handle, correlation id, reply helper); guaranteed non-null when invoked by the dispatcher.cancellationToken— sourced from the transport consume context; signals cooperative shutdown. Pass it through to downstream awaits so long-running steps unwind cleanly when the bus stops consuming.
ConfigureMapper
Section titled “ConfigureMapper”void ConfigureMapper(IProcessManagerPropertyMapper mapper){ mapper.ConfigureMapping<TData, TMessage>(d => d.CorrelationId, m => m.CorrelationId);}Configures the correlation between TMessage and TData. The default implementation maps CorrelationId on both sides — override it when a message correlates on some other property (for example, an OrderId produced by a service that does not know the workflow id).
Parameters
mapper— the mapper the framework uses to register message-to-data correlation expressions for this handler.
Reacting to OrderPlaced in a shipping saga
Section titled “Reacting to OrderPlaced in a shipping saga”public sealed class ShippingSaga : IProcessHandler<ShippingSagaData, OrderPlaced>{ public async Task HandleAsync(OrderPlaced message, ShippingSagaData data, IConsumeContext context, CancellationToken cancellationToken = default) { if (data.Status != OrderStatus.Pending) { // Idempotent: a redelivery of OrderPlaced for a saga that has already // advanced past acknowledgement is a no-op. return; }
data.OrderId = message.OrderId; data.Status = OrderStatus.Acknowledged;
await context.Bus.PublishAsync(new ShipmentRequested(message.CorrelationId) { OrderId = message.OrderId, }); }}The handler mutates data in place — the framework writes the record back when HandleAsync returns — and publishes ShipmentRequested as the next step of the workflow. The state-flag check at the top of the method is the idempotency gate for at-least-once redelivery. IBus is reached through context.Bus; no constructor injection of IBus is needed.
See also
Section titled “See also”- Process Manager — concept
IProcessManagerData— related referenceIProcessManagerPropertyMapper— related referenceIProcessManagerFinder— related reference