IMessageProcessor
Overview
Section titled “Overview”IMessageProcessor is a chain-of-responsibility hook that IMessageDispatcher invokes during the consume pipeline. Each processor participates in exactly one phase — pre-deserialization if RunBeforeDeserialization returns true, post-deserialization otherwise — never both. The dispatcher runs the pre-deserialization iteration first, then (after type resolution, before-consuming filters, and deserialization) runs the post-deserialization iteration. Multiple implementations can be registered; within each phase the dispatcher iterates them in DI registration order and short-circuits on the first that returns ProcessResult.Handled.
Built-in processors handle the framework’s own concerns — reply routing, aggregator accumulation, saga dispatch, stream delivery, and standard handler invocation. Custom processors extend the same chain to implement bespoke consume patterns without replacing the dispatcher or modifying existing processors.
Typical reasons to add a custom IMessageProcessor:
- A header-driven short-circuit for deprecated wire versions that should be silently dropped.
- Custom routing logic that redirects messages before the handler chain runs.
- A tracing processor that records per-phase activity IDs alongside handler dispatch.
See Error Handling for how processor failures interact with the nack and retry pipeline.
Reference
Section titled “Reference”ProcessResult
Section titled “ProcessResult”public enum ProcessResult{ Handled, NotHandled}The return value that controls whether the dispatcher continues iterating processors.
Handled— the processor consumed the message; the dispatcher stops iterating and no subsequent processor runs for this delivery.NotHandled— the processor did not act on the message; the dispatcher continues to the next registered processor.
RunBeforeDeserialization
Section titled “RunBeforeDeserialization”bool RunBeforeDeserialization => false;When true, this processor runs before the message body is deserialized. The default implementation returns false. Override and return true to opt into pre-deserialization execution.
Pre-deserialization processors receive message = null; messageType is whatever value the dispatcher chooses to pass at this phase. The bundled default dispatcher passes typeof(Message), but a custom IMessageDispatcher is free to pass a more specific resolved type if it can do so cheaply. Treat messageType as informational only in this phase — code that needs the concrete CLR type should opt out of pre-deserialization processing and let the dispatcher resolve the type itself.
ProcessAsync
Section titled “ProcessAsync”Task<ProcessResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object? message, IDictionary<string, object> headers, Envelope envelope, CancellationToken cancellationToken = default);Processes one message delivery within the dispatcher’s pipeline.
Parameters
messageBytes— the raw serialised payload. Available in both pre- and post-deserialization phases.messageType— the resolved CLRTypefor the message. After deserialization it is the concrete message type (for exampletypeof(OrderPlaced)). In the pre-deserialization phase the value is dispatcher-defined; the bundled default dispatcher passestypeof(Message)as a placeholder.message— the deserialized message instance, ornullfor pre-deserialization processors.headers— the transport headers accompanying the delivery.envelope— the constructedEnvelopecontaining correlation id, reply address, and other routing metadata.cancellationToken— propagated from the transport consumer.
Returns. ProcessResult.Handled to stop further processor iteration, or ProcessResult.NotHandled to continue.
Implementing
Section titled “Implementing”Ordering
Section titled “Ordering”Processors are iterated in DI registration order. Register processors in the order you want them to run. Pre-deserialization processors are always iterated before post-deserialization processors, regardless of registration order.
Pre- vs post-deserialization
Section titled “Pre- vs post-deserialization”| Phase | RunBeforeDeserialization | message | messageType |
|---|---|---|---|
| Pre-deserialization | true | null | typeof(Message) |
| Post-deserialization | false (default) | deserialized instance | concrete type |
Use the pre-deserialization phase for:
- Header-driven short-circuits where you want to avoid the cost of deserialization entirely.
- Raw-bytes inspection or routing based on wire format metadata.
Use the post-deserialization phase for:
- Logic that needs the strongly typed message object.
- Aggregation, saga dispatch, handler invocation, or reply routing.
Short-circuit semantics
Section titled “Short-circuit semantics”Returning ProcessResult.Handled prevents all subsequent processors — including the built-in handler processor — from running. Use this intentionally. If you return Handled from a custom processor but the message was not actually delivered to a handler, the transport will still acknowledge it to the broker (because Success = true), and the message will be silently dropped.
Error contract
Section titled “Error contract”Exceptions thrown from ProcessAsync bubble to IMessageDispatcher, which converts them to ConsumeEventResult { Success = false, Exception = ex }. The transport then nacks or dead-letters the delivery. Do not catch exceptions inside a processor unless you plan to recover and return a meaningful ProcessResult. Swallowing exceptions here masks failures and produces confusing ack behaviour.
Skeletal stub
Section titled “Skeletal stub”using ServiceConnect.Interfaces;
public sealed class CustomMessageProcessor : IMessageProcessor{ // Override to true to run before deserialization. public bool RunBeforeDeserialization => false;
public Task<ProcessResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object? message, IDictionary<string, object> headers, Envelope envelope, CancellationToken cancellationToken = default) { // Return Handled to stop the chain; NotHandled to continue. return Task.FromResult(ProcessResult.NotHandled); }}Header-driven short-circuit for deprecated wire versions
Section titled “Header-driven short-circuit for deprecated wire versions”The following processor drops messages that carry an unsupported X-Wire-Version header. It runs before deserialization — there is no point paying the deserialization cost for messages that will be discarded.
using Microsoft.Extensions.Logging;using ServiceConnect.Interfaces;
public sealed class DeprecatedVersionProcessor : IMessageProcessor{ private const string WireVersionHeader = "X-Wire-Version"; private const int MinSupportedVersion = 2;
private readonly ILogger<DeprecatedVersionProcessor> _logger;
public DeprecatedVersionProcessor(ILogger<DeprecatedVersionProcessor> logger) => _logger = logger;
// Run before deserialization — no point paying the cost for deprecated messages. public bool RunBeforeDeserialization => true;
public Task<ProcessResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object? message, IDictionary<string, object> headers, Envelope envelope, CancellationToken cancellationToken = default) { if (headers.TryGetValue(WireVersionHeader, out var raw) && int.TryParse(raw?.ToString(), out var version) && version < MinSupportedVersion) { _logger.LogWarning( "Dropping message with unsupported wire version {Version} " + "(minimum supported: {Min}). CorrelationId: {CorrelationId}", version, MinSupportedVersion, envelope.CorrelationId);
// Return Handled to short-circuit — no further processors or handlers run. return Task.FromResult(ProcessResult.Handled); }
// Version is acceptable; let the standard processor chain continue. return Task.FromResult(ProcessResult.NotHandled); }}Register the processor during bus startup. Processors are iterated in registration order, so placing this one first ensures deprecated messages are dropped before any other work:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(svc => svc.AddSingleton<IMessageProcessor, DeprecatedVersionProcessor>());});When OrderService receives an OrderPlaced message carrying X-Wire-Version: 1, DeprecatedVersionProcessor fires before deserialization, logs the drop, and returns Handled. The transport acknowledges the delivery (it was processed, just not dispatched), and no subsequent processor runs. Messages with version 2 or higher pass through transparently.
See also
Section titled “See also”- Error Handling — how processor failures interact with retry and dead-letter policy
IMessageDispatcher— the orchestrator that iterates processorsIHandlerRegistry— the marker interface for startup-time handler validationIMessageProcessingMiddleware— the middleware chain that wraps post-deserialization processor execution