Skip to content

ISendMessageMiddleware

ISendMessageMiddleware is the outbound counterpart to IMessageProcessingMiddleware — it wraps every publish and send with an await next() chain so you can inspect or mutate headers, add correlation metadata, or surround the transport call in a try/finally. ISendMessagePipeline is the orchestrator that composes the registered outbound middleware and is invoked by the bus for every outgoing message.

See Filters for the conceptual model and when to reach for a middleware over an outgoing IFilter.

Task ProcessAsync(SendContext context, SendMessageDelegate next, CancellationToken cancellationToken);

Processes an outgoing message and optionally hands off to the next middleware by awaiting next.

Parameters

  • context — a SendContext value that bundles all data for the outgoing message (see properties below).
  • next — the delegate that invokes the remaining middleware and, eventually, the transport. Not calling it short-circuits the send.
  • cancellationToken — cancels the operation before the transport acknowledges.

SendContext properties

  • Message — the strongly-typed outgoing Message instance; always present (required).
  • MessageType — the CLR Type of the outgoing message, resolved before the call.
  • MessageBytes — the already-serialized payload that will be written to the transport.
  • Headers — the mutable string-valued header dictionary for the outgoing envelope.
  • EndPoint — the explicit destination endpoint for a point-to-point send, or null for a broadcast publish.
  • RoutingKey — the routing key used when publishing, or null for a direct send.
  • Operation — a SendOperation discriminator (Publish, Send, or Request) so middleware can branch by operation type without inspecting other fields. Request covers the three request/reply call sites — SendRequestAsync, SendRequestMultiAsync, and PublishRequestAsync — which thread their typed message through the same outgoing pipeline.

Remarks. Middleware runs in registration order on the way out and unwinds in reverse. Because context.Headers is passed through the chain, middleware composes cleanly — each stage can stamp its own metadata before delegating.

Why SendContext? Bundling the outgoing data into a single parameter object lets observability middleware see the strongly-typed Message alongside RoutingKey, EndPoint, and Operation for every outgoing event — all the context a tracing or auditing hook needs without separate parameters. It also means future additions to the outgoing contract are non-breaking: new fields appear on SendContext and existing middleware continues to compile unchanged.


public delegate Task SendMessageDelegate(SendContext context, CancellationToken cancellationToken);

The continuation passed to each middleware as next. Invoking it advances the send pipeline one step toward the transport.


Task ExecutePublishMessagePipelineAsync(
SendContext context,
CancellationToken cancellationToken = default);

Runs the configured outbound middleware chain for a publish and hands off to the transport.

Parameters

  • context — the SendContext describing the outgoing publish (see SendContext properties above). The bus populates Operation = SendOperation.Publish before invoking the pipeline.
  • cancellationToken — cancels the pipeline before the broker acknowledges.

Remarks. Called by the bus inside PublishAsync. Middleware sees every published message before the transport does.


Task ExecuteSendMessagePipelineAsync(
SendContext context,
CancellationToken cancellationToken = default);

Runs the configured outbound middleware chain for a point-to-point send.

Parameters

  • context — the SendContext describing the outgoing send. The bus populates Operation = SendOperation.Send (or SendOperation.Request for request/reply call sites) and an explicit EndPoint before invoking the pipeline.
  • cancellationToken — cancels the pipeline before the broker acknowledges.

Remarks. Called by the bus inside SendAsync and the request/reply send paths. Shares its middleware chain with ExecutePublishMessagePipelineAsync — a single middleware sees every outbound message regardless of shape.


ValueTask DisposeAsync();

Disposes the pipeline and any middleware scopes it owns. Inherited from IAsyncDisposable; called by the host during bus shutdown.

Stamping a correlation header before publish

Section titled “Stamping a correlation header before publish”
public sealed class CorrelationStampingMiddleware : ISendMessageMiddleware
{
public Task ProcessAsync(
SendContext context,
SendMessageDelegate next,
CancellationToken cancellationToken)
{
// Only stamp if the caller has not already supplied a correlation id.
if (!context.Headers.ContainsKey("CorrelationHeader"))
{
var activityId = Activity.Current?.Id;
if (!string.IsNullOrEmpty(activityId))
{
context.Headers["CorrelationHeader"] = activityId;
}
}
context.Headers["SentAt"] = DateTimeOffset.UtcNow.ToString("O");
return next(context, cancellationToken);
}
}
// Registration during startup.
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example");
builder.ConfigureQueues(queues => queues.QueueName = "payment-processor");
builder.AddSendMessageMiddleware<CorrelationStampingMiddleware>();
});

CorrelationStampingMiddleware runs in front of every outbound message from PaymentProcessor. When Activity.Current is set — typically by an inbound consume span or an ASP.NET Core request — its id is copied onto the outgoing envelope as CorrelationHeader so consumers downstream can thread the trace together. The middleware then delegates to next, which eventually hands the envelope to the transport.