IMessageDispatcher
Overview
Section titled “Overview”IMessageDispatcher is the top-level orchestrator that the transport consumer calls for every incoming message. When a raw delivery arrives from the broker, the framework invokes DispatchAsync with the raw bytes, the wire type name, and the transport headers. From that point on, the dispatcher is responsible for the complete consume pipeline:
- Extract
FullTypeNamefrom headers and build theEnvelope. Throw if the header is absent. - Run
IMessageProcessorentries whereRunBeforeDeserialization == true. If any returnsProcessResult.Handled, short-circuit withSuccess = true. - Resolve the CLR
TypeviaIMessageTypeRegistry.TryResolve(messageType, out type). If the type is not registered and the message is not a reply, returnSuccess = false. - Run
IFilterbefore-consuming filters. If any returnsfalse, short-circuit withSuccess = true. - If the delivery carries a
ResponseMessageIdheader and aReplyProcessoris registered, invoke it.Handled→Success = true; otherwise returnSuccess = falsewith an exception. - Deserialise the message body via
IMessageSerializer. - Run
IMessageProcessorentries whereRunBeforeDeserialization == falsethrough theIMessageProcessingMiddlewarechain. - Run after-consuming filters in a
finallyblock so they fire even on failure. - Return a
ConsumeEventResultdescribing the outcome.
ServiceConnect ships a default implementation (ServiceConnect.Services.MessageDispatcher) that follows this exact order. Replace it only when every other extension point — IFilter, IMessageProcessor, IMessageProcessingMiddleware — is insufficient for your use case.
See Handlers for the broader dispatch model.
Reference
Section titled “Reference”DispatchAsync
Section titled “DispatchAsync”Task<ConsumeEventResult> DispatchAsync( ReadOnlyMemory<byte> messageBytes, string messageType, IReadOnlyDictionary<string, object> headers, CancellationToken cancellationToken = default);Executes the full consume pipeline for one message delivery.
Parameters
messageBytes— the raw serialised payload as received from the broker.messageType— the wire type name of the message (for example"OrderService.Messages.OrderPlaced"). Used to resolve the CLR type viaIMessageTypeRegistry.headers— the transport headers accompanying the delivery. Values are typed asobjectbecause some broker clients expose typed header values rather than plain strings.cancellationToken— propagated from the transport consumer; signals that processing should stop cleanly.
Returns. A ConsumeEventResult describing whether the message was processed successfully. Success = true causes the transport to acknowledge the delivery; Success = false or a non-null Exception causes the transport to nack, requeue, or dead-letter according to its policy.
Implementing
Section titled “Implementing”Order of operations
Section titled “Order of operations”A custom IMessageDispatcher must preserve the default order of operations. The built-in filters, processors, and middleware all assume this sequence; departing from it produces undefined behaviour:
- Extract
FullTypeNameheader; throwInvalidOperationExceptionif absent. BuildEnvelope. - Pre-deserialization processors (
RunBeforeDeserialization == true) — short-circuit withSuccess = trueonProcessResult.Handled. - Type resolution via
IMessageTypeRegistry— returnSuccess = falsefor unknown, non-reply types. - Before-consuming filters — short-circuit with
Success = trueonfalse. - Reply short-circuit — if
ResponseMessageIdheader is present and aReplyProcessoris registered, invoke it.Handled→Success = true; otherwise returnSuccess = falsewith an exception. - Deserialise via
IMessageSerializer. - Post-deserialization processors (
RunBeforeDeserialization == false) through the middleware chain. - After-consuming filters in
finally. - Return
ConsumeEventResult.
Error-handling contract
Section titled “Error-handling contract”Exceptions thrown inside DispatchAsync should be caught and surfaced via ConsumeEventResult { Success = false, Exception = ex }. The transport layer reads Success and Exception to decide the ack/nack outcome; if an exception escapes the method entirely, the transport has no structured way to act on it and may crash the consumer loop. Catch at the outermost boundary and convert.
After-filter guarantee
Section titled “After-filter guarantee”After-consuming filters must run in a finally block, unconditionally. They carry diagnostics and cleanup logic (trace span completion, metrics recording, resource release) that must fire even when processing fails. Omitting the finally wrapper is a silent correctness bug.
Observability
Section titled “Observability”The dispatcher is the natural home for a root ActivitySource span that wraps the entire consume pipeline. Starting the activity before step 1 and stopping it in the same finally as the after-consuming filters gives a single, bounded span covering every phase of message processing.
Lifetime
Section titled “Lifetime”Register a custom IMessageDispatcher as a singleton. The dispatcher is invoked on every message delivery; registering as transient or scoped creates unnecessary allocations on a hot path. The default MessageDispatcher implementation is registered as a singleton.
Skeletal stub
Section titled “Skeletal stub”using System.Diagnostics;using ServiceConnect.Interfaces;
public sealed class CustomMessageDispatcher : IMessageDispatcher{ private readonly IMessageTypeRegistry _typeRegistry; private readonly IMessageSerializer _serializer;
public CustomMessageDispatcher( IMessageTypeRegistry typeRegistry, IMessageSerializer serializer) { _typeRegistry = typeRegistry; _serializer = serializer; }
public async Task<ConsumeEventResult> DispatchAsync( ReadOnlyMemory<byte> messageBytes, string messageType, IReadOnlyDictionary<string, object> headers, CancellationToken cancellationToken = default) { try { // 1–7: implement full pipeline here. throw new NotImplementedException(); } catch (Exception ex) { return new ConsumeEventResult { Success = false, Exception = ex }; } }}Tracing dispatcher with a root Activity span
Section titled “Tracing dispatcher with a root Activity span”The following example delegates to the default IMessageDispatcher and wraps it in a single root Activity that covers the entire consume pipeline. Because it does not re-implement the pipeline, only the root span is available — not per-phase spans. That is the right trade-off when all you need is one bounded trace entry per delivery.
using System.Diagnostics;using ServiceConnect.Interfaces;
public sealed class TracingMessageDispatcher : IMessageDispatcher{ private static readonly ActivitySource Source = new("OrderService.Messaging");
private readonly IMessageDispatcher _inner;
public TracingMessageDispatcher(IMessageDispatcher inner) => _inner = inner;
public async Task<ConsumeEventResult> DispatchAsync( ReadOnlyMemory<byte> messageBytes, string messageType, IReadOnlyDictionary<string, object> headers, CancellationToken cancellationToken = default) { using var root = Source.StartActivity( $"consume {messageType}", ActivityKind.Consumer);
root?.SetTag("messaging.message_type", messageType);
ConsumeEventResult result; try { result = await _inner.DispatchAsync(messageBytes, messageType, headers, cancellationToken); } catch (Exception ex) { root?.SetStatus(ActivityStatusCode.Error, ex.Message); return new ConsumeEventResult { Success = false, Exception = ex }; }
if (!result.Success) root?.SetStatus(ActivityStatusCode.Error, result.Exception?.Message ?? "dispatch failed");
return result; }}Register the custom dispatcher during bus startup. Registering a custom IMessageDispatcher replaces the default implementation entirely:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(svc => svc.AddSingleton<IMessageDispatcher, TracingMessageDispatcher>());});See also
Section titled “See also”- Handlers — the handler model and how dispatch reaches handler invocation
IHandlerRegistry— the marker interface for startup-time handler validationIMessageProcessor— the chain-of-responsibility hook invoked at each phase of dispatch