Consume and outgoing event args
Overview
Section titled “Overview”The bus raises a handful of event-argument DTOs that let observers inspect messages as they pass through the consume and outgoing pipelines. Telemetry, custom logging, and diagnostic middleware are the typical consumers. This page documents ConsumeEventArgs, ConsumeEventResult, OutgoingEventArgs, and the two concrete outgoing types (PublishEventArgs, SendEventArgs).
See Observability for how these DTOs slot into a diagnostic pipeline.
Reference
Section titled “Reference”ConsumeEventArgs
Section titled “ConsumeEventArgs”Carries the raw message data received by the telemetry consume pipeline.
Message
Section titled “Message”public byte[] Message { get; init; } = [];Gets the raw message body bytes as received from the transport, before any deserialisation.
BodySize
Section titled “BodySize”public int BodySize { get; init; }Gets the on-wire body length in bytes. Always populated by the consume middleware, even when Message is the empty sentinel array because no enricher requested the materialised bytes. Used to stamp the OTel messaging.message.body.size attribute correctly on every consume span.
public string Type { get; init; } = string.Empty;Gets the message type name taken from the transport headers. Useful when an observer needs to branch without paying to decode the body.
Headers
Section titled “Headers”public IReadOnlyDictionary<string, object> Headers { get; init; }Gets the transport headers associated with the consumed message. Defaults to an empty case-sensitive dictionary when no headers are supplied.
Remarks. Header values are typed as object because transports expose strings, byte arrays, and numeric values through the same channel. Decode defensively when reading. The dictionary is read-only; middleware that needs to produce a modified header bag must construct a new ConsumeEventArgs with a fresh dictionary.
ConsumeEventResult
Section titled “ConsumeEventResult”Represents the outcome of invoking a consumer callback. Framework-produced and consumer-observed only; the properties are init-only and have no defined meaning when mutated post-construction.
Success
Section titled “Success”public bool Success { get; init; }Whether the consumer callback completed successfully. A false value with a populated Exception is the failure shape.
NotHandled
Section titled “NotHandled”public bool NotHandled { get; init; }Whether the dispatcher ran to completion but no processor claimed the message. Distinct from Success: a handler-less message is not a failure, but callers may want to route it to the error exchange rather than silently acking — see IBusConfiguration.DeadLetterUnhandledMessages.
Exception
Section titled “Exception”public Exception? Exception { get; init; }The exception raised by the consumer, if any. Null on success.
TerminalFailure
Section titled “TerminalFailure”public bool TerminalFailure { get; init; }Whether the failure is terminal — the message is permanently malformed (e.g. a JSON parse failure) and retrying will produce the identical failure. When true, transports bypass the retry queue and route the message directly to the error exchange rather than burning the retry budget on a poison payload.
Remarks. Distinct from Success=false: a terminal failure reflects a structural fault in the payload itself (deserialisation failure), not a transient handler error. Handler-thrown exceptions are non-terminal and remain eligible for retry. Transports that do not honour this flag fall back to the normal retry path.
OutgoingEventArgs
Section titled “OutgoingEventArgs”public abstract class OutgoingEventArgs{ protected OutgoingEventArgs(); // ...members below}Base event payload for outgoing publish and send telemetry. The class is abstract with a protected constructor — only the concrete subtypes (PublishEventArgs, SendEventArgs) can be instantiated, which keeps user code from raising synthetic outgoing events through the framework’s event surface.
Message
Section titled “Message”public Message? Message { get; init; }Gets the outgoing message instance, when the pipeline supplied it. May be null for telemetry raised from paths that operate on raw bytes.
Headers
Section titled “Headers”public IDictionary<string, string> Headers { get; init; }Gets the outgoing transport headers. Unlike the consume side, outgoing headers are a plain string-valued dictionary. init-only so a subscriber can mutate individual entries — e.g. a telemetry hook stamping traceparent — without being able to swap the entire dictionary out and strip the framework’s required MessageType/CorrelationId entries before transport send. The initialiser rejects null with ArgumentNullException.
PublishEventArgs
Section titled “PublishEventArgs”PublishEventArgs : OutgoingEventArgs — outgoing telemetry payload for published messages.
Exchange
Section titled “Exchange”public string Exchange { get; init; } = string.Empty;The transport-side exchange name the message was published to. Empty when the publish was anonymous (e.g. a routing-key-only publish through the default exchange). Telemetry uses this as the messaging.destination.name span attribute.
RoutingKey
Section titled “RoutingKey”public string RoutingKey { get; init; } = string.Empty;The routing key used when publishing the message. For broadcast publishes without a key override the value is empty.
SendEventArgs
Section titled “SendEventArgs”SendEventArgs : OutgoingEventArgs — outgoing telemetry payload for point-to-point sends. Multi-endpoint fan-out (IBus.SendToManyAsync) raises one SendEventArgs per destination — each with its own per-delivery EndPoint. Subscribers that need to correlate fan-out deliveries should match on the message CorrelationId, which stays stable across the per-endpoint events.
EndPoint
Section titled “EndPoint”public string EndPoint { get; init; } = string.Empty;The destination endpoint for this delivery.
Recommended: use the ServiceConnect.Telemetry package
Section titled “Recommended: use the ServiceConnect.Telemetry package”For most applications, builder.AddTelemetry() is the right choice. It wires the built-in publish, send, and consume middleware and handles W3C traceparent/tracestate propagation automatically:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(/* ... */); builder.AddTelemetry(opts => { /* optional enrichment */ });});See Observability — Tracing for full wiring details, enrichment options, and how to register the activity sources with your OTel pipeline.
If you need a custom middleware
Section titled “If you need a custom middleware”When the built-in telemetry isn’t enough — for example, to route spans to a non-OTel sink or apply bespoke sampling logic — implement IMessageProcessingMiddleware (consume side) or ISendMessageMiddleware (send side) directly.
Consume side
Section titled “Consume side”public sealed class OpenTelemetryConsumeMiddleware : IMessageProcessingMiddleware{ private static readonly ActivitySource Source = new("ServiceConnect");
public async Task<ConsumeEventResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object message, IDictionary<string, object> headers, Envelope envelope, MessageProcessingDelegate next, CancellationToken cancellationToken) { // Shape the consume-event args for downstream collectors before we // start the span — the same bag of data a telemetry exporter would // emit on the bus's ConsumeEvent callback. // Headers is IReadOnlyDictionary; pass a copy via AsReadOnly or // construct a new ReadOnlyDictionary wrapper. var consumeArgs = new ConsumeEventArgs { Message = envelope.Body.ToArray(), Type = envelope.Headers.TryGetValue("MessageType", out var t) ? t?.ToString() ?? string.Empty : string.Empty, // ConsumeEventArgs documents Headers as case-sensitive (StringComparer.Ordinal); // copying without the comparer would silently swap in the default // (case-insensitive on platforms where Dictionary<string, …>'s default is invariant). Headers = new Dictionary<string, object>(envelope.Headers, StringComparer.Ordinal), }; // The initializer above works because Dictionary<K,V> implements // IReadOnlyDictionary<K,V>.
using var activity = Source.StartActivity( $"consume {consumeArgs.Type}", ActivityKind.Consumer);
activity?.SetTag("servicebus.message.type", consumeArgs.Type); activity?.SetTag("servicebus.message.size", consumeArgs.Message.Length);
try { var result = await next(messageBytes, messageType, message, headers, envelope, cancellationToken); RecordResult(activity, result); return result; } catch (Exception ex) { RecordResult(activity, new ConsumeEventResult { Success = false, Exception = ex, }); throw; } }
private static void RecordResult(Activity? activity, ConsumeEventResult result) { activity?.SetTag("servicebus.consume.success", result.Success); activity?.SetTag("servicebus.consume.not_handled", result.NotHandled); if (!result.Success) { activity?.SetStatus(ActivityStatusCode.Error, result.Exception?.Message); } }}A custom IMessageProcessingMiddleware is the natural host for observing the consume-side transition. The ConsumeEventArgs shape mirrors the data available to telemetry exporters, and ConsumeEventResult captures the success/failure fork so the span status records the real outcome.
Send side
Section titled “Send side”The outgoing counterpart uses ISendMessageMiddleware with the SendContext parameter object, which carries the strongly-typed Message, MessageType, serialized MessageBytes, Headers, EndPoint, RoutingKey, and Operation:
public sealed class OpenTelemetrySendMiddleware : ISendMessageMiddleware{ private static readonly ActivitySource Source = new("ServiceConnect");
public async Task ProcessAsync( SendContext context, SendMessageDelegate next, CancellationToken cancellationToken) { using var activity = Source.StartActivity( $"publish {context.MessageType.Name}", ActivityKind.Producer);
activity?.SetTag("servicebus.message.type", context.MessageType.FullName); activity?.SetTag("servicebus.message.size", context.MessageBytes.Length); if (context.RoutingKey is not null) { activity?.SetTag("servicebus.routing_key", context.RoutingKey); } if (context.EndPoint is not null) { activity?.SetTag("servicebus.endpoint", context.EndPoint); }
try { await next(context, cancellationToken); } catch (Exception ex) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); throw; } }}See also
Section titled “See also”- Observability — concept
IMessageProcessingMiddleware— related referenceISendMessageMiddleware— related reference