IMessageProcessingMiddleware
Overview
Section titled “Overview”IMessageProcessingMiddleware wraps the inbound consume pipeline with an await next() chain, so you can observe and/or transform each message before and after the handler runs. It is the right tool whenever you need to surround the handler in a try/finally, open a DI scope, record an Activity, or implement retry, logging, and outbox patterns.
See Filters for the conceptual tour and the trade-offs against an IFilter predicate.
Reference
Section titled “Reference”ProcessAsync
Section titled “ProcessAsync”Task<ConsumeEventResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object message, IDictionary<string, object> headers, Envelope envelope, MessageProcessingDelegate next, CancellationToken cancellationToken);Processes an inbound message and optionally hands off to the next stage in the middleware chain by invoking next.
Parameters
messageBytes— the raw payload as it arrived from the transport, before deserialization.messageType— the resolved CLR type ofmessage.message— the deserialized message instance that will eventually reach the handler.headers— the mutable header dictionary, carried through the chain.envelope— the current message envelope, shared across every stage.next— the delegate that invokes the remaining middleware and, eventually, the handler. Not calling it short-circuits the chain.cancellationToken— cancels processing when the consumer loop is stopping.
Returns. A ConsumeEventResult — typically the value returned by next, though middleware may surface its own result (for example, a failure surfaced from a catch).
Remarks. Middleware runs in registration order on the way in and unwinds in reverse on the way out, mirroring the familiar ASP.NET Core request pipeline. Use try/finally around the next call to guarantee teardown work runs even when the handler throws.
MessageProcessingDelegate
Section titled “MessageProcessingDelegate”public delegate Task<ConsumeEventResult> MessageProcessingDelegate( ReadOnlyMemory<byte> messageBytes, Type messageType, object message, IDictionary<string, object> headers, Envelope envelope, CancellationToken cancellationToken);The continuation passed to each middleware as next. Invoking it advances the pipeline one step; awaiting the result lets the caller observe what the remainder of the chain returned.
Recording an Activity span per consumed message
Section titled “Recording an Activity span per consumed message”public sealed class OpenTelemetryConsumeMiddleware : IMessageProcessingMiddleware{ private static readonly ActivitySource Source = new("ServiceConnect.Consume");
public async Task<ConsumeEventResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object message, IDictionary<string, object> headers, Envelope envelope, MessageProcessingDelegate next, CancellationToken cancellationToken) { using var activity = Source.StartActivity( $"Consume {messageType.Name}", ActivityKind.Consumer);
activity?.SetTag("messaging.system", "serviceconnect"); activity?.SetTag("messaging.message.type", messageType.FullName); if (headers.TryGetValue("MessageId", out var messageId)) { activity?.SetTag("messaging.message.id", messageId?.ToString()); }
try { var result = await next(messageBytes, messageType, message, headers, envelope, cancellationToken); activity?.SetStatus(ActivityStatusCode.Ok); return result; } catch (Exception ex) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); activity?.AddException(ex); throw; } }}
// Registration during startup.services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.ConfigureQueues(queues => queues.QueueName = "shipping-service"); builder.AddMessageProcessingMiddleware<OpenTelemetryConsumeMiddleware>();});OpenTelemetryConsumeMiddleware sits outermost in the consume pipeline of the ShippingSaga service. Every inbound message opens a consumer Activity span, tags it with the message type and id, and closes it on the way out — propagating an exception also marks the span as errored before rethrowing so downstream middleware still observes the failure.
See also
Section titled “See also”- Filters — concept
- Observability — concept
ISendMessageMiddleware— related referenceIFilter— related reference