Skip to content

IMessageDispatcher

IMessageDispatcher is the top-level orchestrator that the transport consumer calls for every incoming message. When a raw delivery arrives from the broker, the framework invokes DispatchAsync with the raw bytes, the wire type name, and the transport headers. From that point on, the dispatcher is responsible for the complete consume pipeline:

  1. Extract FullTypeName from headers and build the Envelope. Throw if the header is absent.
  2. Run IMessageProcessor entries where RunBeforeDeserialization == true. If any returns ProcessResult.Handled, short-circuit with Success = true.
  3. Resolve the CLR Type via IMessageTypeRegistry.TryResolve(messageType, out type). If the type is not registered and the message is not a reply, return Success = false.
  4. Run IFilter before-consuming filters. If any returns false, short-circuit with Success = true.
  5. If the delivery carries a ResponseMessageId header and a ReplyProcessor is registered, invoke it. HandledSuccess = true; otherwise return Success = false with an exception.
  6. Deserialise the message body via IMessageSerializer.
  7. Run IMessageProcessor entries where RunBeforeDeserialization == false through the IMessageProcessingMiddleware chain.
  8. Run after-consuming filters in a finally block so they fire even on failure.
  9. Return a ConsumeEventResult describing the outcome.

ServiceConnect ships a default implementation (ServiceConnect.Services.MessageDispatcher) that follows this exact order. Replace it only when every other extension point — IFilter, IMessageProcessor, IMessageProcessingMiddleware — is insufficient for your use case.

See Handlers for the broader dispatch model.

Task<ConsumeEventResult> DispatchAsync(
ReadOnlyMemory<byte> messageBytes,
string messageType,
IReadOnlyDictionary<string, object> headers,
CancellationToken cancellationToken = default);

Executes the full consume pipeline for one message delivery.

Parameters

  • messageBytes — the raw serialised payload as received from the broker.
  • messageType — the wire type name of the message (for example "OrderService.Messages.OrderPlaced"). Used to resolve the CLR type via IMessageTypeRegistry.
  • headers — the transport headers accompanying the delivery. Values are typed as object because some broker clients expose typed header values rather than plain strings.
  • cancellationToken — propagated from the transport consumer; signals that processing should stop cleanly.

Returns. A ConsumeEventResult describing whether the message was processed successfully. Success = true causes the transport to acknowledge the delivery; Success = false or a non-null Exception causes the transport to nack, requeue, or dead-letter according to its policy.

A custom IMessageDispatcher must preserve the default order of operations. The built-in filters, processors, and middleware all assume this sequence; departing from it produces undefined behaviour:

  1. Extract FullTypeName header; throw InvalidOperationException if absent. Build Envelope.
  2. Pre-deserialization processors (RunBeforeDeserialization == true) — short-circuit with Success = true on ProcessResult.Handled.
  3. Type resolution via IMessageTypeRegistry — return Success = false for unknown, non-reply types.
  4. Before-consuming filters — short-circuit with Success = true on false.
  5. Reply short-circuit — if ResponseMessageId header is present and a ReplyProcessor is registered, invoke it. HandledSuccess = true; otherwise return Success = false with an exception.
  6. Deserialise via IMessageSerializer.
  7. Post-deserialization processors (RunBeforeDeserialization == false) through the middleware chain.
  8. After-consuming filters in finally.
  9. Return ConsumeEventResult.

Exceptions thrown inside DispatchAsync should be caught and surfaced via ConsumeEventResult { Success = false, Exception = ex }. The transport layer reads Success and Exception to decide the ack/nack outcome; if an exception escapes the method entirely, the transport has no structured way to act on it and may crash the consumer loop. Catch at the outermost boundary and convert.

After-consuming filters must run in a finally block, unconditionally. They carry diagnostics and cleanup logic (trace span completion, metrics recording, resource release) that must fire even when processing fails. Omitting the finally wrapper is a silent correctness bug.

The dispatcher is the natural home for a root ActivitySource span that wraps the entire consume pipeline. Starting the activity before step 1 and stopping it in the same finally as the after-consuming filters gives a single, bounded span covering every phase of message processing.

Register a custom IMessageDispatcher as a singleton. The dispatcher is invoked on every message delivery; registering as transient or scoped creates unnecessary allocations on a hot path. The default MessageDispatcher implementation is registered as a singleton.

using System.Diagnostics;
using ServiceConnect.Interfaces;
public sealed class CustomMessageDispatcher : IMessageDispatcher
{
private readonly IMessageTypeRegistry _typeRegistry;
private readonly IMessageSerializer _serializer;
public CustomMessageDispatcher(
IMessageTypeRegistry typeRegistry,
IMessageSerializer serializer)
{
_typeRegistry = typeRegistry;
_serializer = serializer;
}
public async Task<ConsumeEventResult> DispatchAsync(
ReadOnlyMemory<byte> messageBytes,
string messageType,
IReadOnlyDictionary<string, object> headers,
CancellationToken cancellationToken = default)
{
try
{
// 1–7: implement full pipeline here.
throw new NotImplementedException();
}
catch (Exception ex)
{
return new ConsumeEventResult { Success = false, Exception = ex };
}
}
}

Tracing dispatcher with a root Activity span

Section titled “Tracing dispatcher with a root Activity span”

The following example delegates to the default IMessageDispatcher and wraps it in a single root Activity that covers the entire consume pipeline. Because it does not re-implement the pipeline, only the root span is available — not per-phase spans. That is the right trade-off when all you need is one bounded trace entry per delivery.

using System.Diagnostics;
using ServiceConnect.Interfaces;
public sealed class TracingMessageDispatcher : IMessageDispatcher
{
private static readonly ActivitySource Source = new("OrderService.Messaging");
private readonly IMessageDispatcher _inner;
public TracingMessageDispatcher(IMessageDispatcher inner)
=> _inner = inner;
public async Task<ConsumeEventResult> DispatchAsync(
ReadOnlyMemory<byte> messageBytes,
string messageType,
IReadOnlyDictionary<string, object> headers,
CancellationToken cancellationToken = default)
{
using var root = Source.StartActivity(
$"consume {messageType}",
ActivityKind.Consumer);
root?.SetTag("messaging.message_type", messageType);
ConsumeEventResult result;
try
{
result = await _inner.DispatchAsync(messageBytes, messageType, headers, cancellationToken);
}
catch (Exception ex)
{
root?.SetStatus(ActivityStatusCode.Error, ex.Message);
return new ConsumeEventResult { Success = false, Exception = ex };
}
if (!result.Success)
root?.SetStatus(ActivityStatusCode.Error, result.Exception?.Message ?? "dispatch failed");
return result;
}
}

Register the custom dispatcher during bus startup. Registering a custom IMessageDispatcher replaces the default implementation entirely:

services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example");
builder.AddRegistration(svc =>
svc.AddSingleton<IMessageDispatcher, TracingMessageDispatcher>());
});
  • Handlers — the handler model and how dispatch reaches handler invocation
  • IHandlerRegistry — the marker interface for startup-time handler validation
  • IMessageProcessor — the chain-of-responsibility hook invoked at each phase of dispatch