Observability
Observability in a messaging system is three overlapping views of the same events: logs tell you what the bus did, the audit queue gives you a replayable copy of every message it handled, and headers carry enough metadata to correlate each processed message back to a specific run. ServiceConnect offers all three. This page covers what each one is for, and how to wire them together so an on-call engineer can answer “what happened to message X?”.
The bus logs through Microsoft.Extensions.Logging. No special setup is needed — the ILogger<T> instances in the container are what the bus writes to, so whatever sink you’ve configured (console, file, Seq, Application Insights) receives its logs automatically.
Key log events you’ll see in a healthy system:
- Startup:
"Bus starting to consume on queue {QueueName} for {Count} message types."— fired once duringStartConsumingAsync. If the bus has started, this has logged. - Shutdown:
"Bus stopping message consumption."— fired fromStopConsumingAsync. - Unregistered type (warning):
"Unregistered message type '{TypeName}'. Rejecting"— fired for an incoming message whose type name isn’t in the handler registry. Usually a topology mistake (wrong queue binding) or a forgotten handler registration.
Key log events for failures:
- Dispatch error (error):
"Error dispatching message of type {MessageType}"— fired every time a handler (or a filter, or the deserializer) throws. Always paired with an exception. - Max retries exceeded (error):
"Max retries exceeded for MessageId {MessageId}"— fired when a retried message reachesMaxRetriesand is about to land in the error queue. - Terminal rejection (error):
"Rejecting permanently invalid inbound message with MessageId {MessageId}"— fired for messages that are unparsable on the wire (missing headers, oversized payload) and go straight to the error queue without retries.
Structure log queries around MessageId. Every published message has one (the bus stamps it); it follows the message through retries and into the error queue via the MessageId header. Searching for a single MessageId in your log aggregator gets you the full processing history of that message.
The audit queue
Section titled “The audit queue”When AuditingEnabled = true, every successfully processed message is republished to AuditQueueName (default "audit") with its headers preserved. Setup:
builder.ConfigureQueues(q =>{ q.QueueName = "orders-service"; q.AuditingEnabled = true; q.AuditQueueName = "orders-service.audit";});What audit gives you that logs don’t:
- Full message body. Logs have the exception and the
MessageId; the audit queue has the actual bytes the handler saw. When a bug surfaces days later, you have the payload to reproduce. - Replay. The audit queue is just a queue. Nothing stops you from draining it into the main queue and re-running the workflow against a fixed handler. Treat that as a manual operator action, not an automated retry.
- Downstream analytics. Because audit is a copy of the real traffic, you can point a stream processor (Kafka mirror, Logstash, whatever) at it without being in the critical path.
Two important caveats:
- Byte-stream messages are not audited. The stream packets (Streaming) would flood the audit queue with raw frames; the audit publisher skips anything tagged as a stream.
- Failed messages are not audited. Audit is the success path. Failed messages go to the error queue (Error Handling), and the two queues are deliberately separate.
Size the audit queue. If you keep it around indefinitely it grows linearly with traffic; either consume it to archive storage on a schedule, or set a queue-level TTL or length limit in RabbitMQ.
Standard headers
Section titled “Standard headers”Every message the bus sends carries a consistent set of headers. These are the primitives you stitch telemetry from:
| Header | What it is | Typical use |
|---|---|---|
MessageId | Unique per send | Log correlation, idempotency key |
CorrelationId | The conversation id | Stitch request/reply or a saga’s full trace together |
SourceAddress | Sending queue name | ”Who sent this?” — populated automatically |
DestinationAddress | Receiving queue | ”Where is this going?” |
TimeSent | UTC timestamp at send | Compare to TimeReceived to observe transit time |
TimeReceived | UTC timestamp at consume | End-to-end latency signal |
TimeProcessed | UTC timestamp after handler | Handler duration if you want to split transit from processing |
RetryCount | Attempts so far | Diagnose retry storms; populated by the retry handler |
Exception | JSON-serialised error | Present in error-queue entries only |
All header names live in ServiceConnect.Interfaces.HeaderKeys so you don’t have to spell them. The IConsumeContext on a handler exposes the whole header dictionary — see Handlers.
There are also two opt-in headers:
SourceMachineandDestinationMachine— sender and receiver hostnames. Off by default:IncludeMachineNameInHeaders = trueon the bus config turns them on. Useful for multi-tenant brokers where you need to identify the sender host, but be aware that exposing internal hostnames to every audit consumer is an information-disclosure risk.
Tracing (OpenTelemetry)
Section titled “Tracing (OpenTelemetry)”The ServiceConnect.Telemetry package provides the ActivitySource plumbing for distributed tracing. It exposes a single source (ServiceConnectActivitySource.ActivitySourceName) that emits publish, send, and consume spans, each tagged with the OTel messaging semantic conventions (messaging.system, messaging.destination.name, messaging.rabbitmq.destination.routing_key, messaging.message.id, messaging.message.conversation_id). For RabbitMQ, messaging.destination.name carries the broker-side destination — the exchange name for publishes and the queue name for sends — and messaging.rabbitmq.destination.routing_key carries the publish routing key when present. Operation classification follows the OTel messaging semconv 1.x attributes: messaging.operation.type is publish for the producer side (sends and publishes alike) and process for the consumer side, and messaging.operation.name mirrors that with publish / process. The span also carries server.address (broker hostname or first entry of a cluster list) and server.port (broker TCP port from ITransportConfiguration.ClientSettings["Port"] when present). Backends can aggregate each shape separately by filtering on messaging.operation.type.
dotnet add package ServiceConnect.TelemetryWiring
Section titled “Wiring”Call builder.AddTelemetry() inside AddServiceConnect, then register the single activity source with your OTel tracing builder:
using ServiceConnect.Telemetry;
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(/* ... */); builder.AddTelemetry(opts => { /* optional enrichment */ });});
services.AddOpenTelemetry() .WithTracing(tracing => tracing .AddServiceConnectInstrumentation() .AddOtlpExporter());To disable a specific direction without unregistering the source, use the per-direction flags in ServiceConnectInstrumentationOptions:
builder.AddTelemetry(opts =>{ opts.EnablePublishTelemetry = false; // suppress publish spans opts.EnableSendTelemetry = false; // suppress send spans // opts.EnableConsumeTelemetry is true by default});W3C traceparent/tracestate propagation is unaffected by the enable flags — headers are injected and extracted regardless of whether ServiceConnect emits its own span for that direction.
Pipeline ordering — user outgoing filters run before the telemetry span starts
Section titled “Pipeline ordering — user outgoing filters run before the telemetry span starts”TelemetrySendMiddleware is registered at the outermost position on the send-message middleware pipeline so its activity brackets every other middleware on the way out and on the way back. User outgoing filters, however, are a separate pipeline stage that runs before the send-message middleware on the outgoing path. The order is:
user code → outgoing filters → send-message middleware (telemetry span starts here) → producerTwo consequences worth knowing:
- Outgoing filters do not see the new publish/send span’s
traceparent. The telemetry span is started inside the send-message middleware, after the filters have already run. Filters that need a parent trace context observeActivity.Currentfrom the caller’s ambient context (typically the inbound consume span for handlers that publish, or the application’s root span for app code) — not the publish/send span ServiceConnect is about to create. - A filter that returns
FilterAction.Stop(or throws) aborts the call before the telemetry middleware runs. No publish/send span is created for the blocked delivery. TheOutgoingFiltersBlockedExceptionthrown to the caller carries the user-visible signal; observability backends will see the absence of a span where one was expected.
If you want a span that brackets the entire IBus call — including the filter stage — start one in your application code around the PublishAsync / SendAsync invocation. The framework’s own publish/send span is scoped narrowly to the wire path.
Enrichment
Section titled “Enrichment”Use opts.EnrichWithMessage to attach application-specific tags to a span from the decoded message:
builder.AddTelemetry(opts =>{ opts.EnrichWithMessage = (activity, message) => activity.SetTag("app.correlation_id", message.CorrelationId);});Security note. Do not attach raw payload fields (or raw bytes) as span tags without review — messages may contain PII or regulated data that would then flow to every observability backend.
Propagation
Section titled “Propagation”The framework injects W3C traceparent/tracestate headers onto every outgoing publish and send, and extracts them on the consume side. A span recorded by a subscriber is a direct child of the publisher’s span — a single TraceId threads the full journey across the broker, even across independent processes. See the examples/Telemetry sample for a three-process demonstration.
For bespoke headers that aren’t tracing — redaction, a tenant stamp — the Filters API remains the right hook. Filters and telemetry compose; they don’t compete.
Metrics
Section titled “Metrics”ServiceConnect emits operator-grade metrics via System.Diagnostics.Metrics. The Meter is named ServiceConnect.Bus and is always-on — instruments are zero-cost when no listener subscribes (BCL pattern, same as HttpClient).
Wiring
Section titled “Wiring”For OpenTelemetry users:
services.AddOpenTelemetry().WithMetrics(b => b.AddServiceConnectInstrumentation());Without OpenTelemetry, attach a MeterListener directly:
var listener = new MeterListener();listener.InstrumentPublished = (instrument, l) =>{ if (instrument.Meter.Name == "ServiceConnect.Bus") l.EnableMeasurementEvents(instrument);};listener.Start();Catalogue
Section titled “Catalogue”Tags follow the OpenTelemetry messaging-metrics conventions. All metrics carry messaging.system="rabbitmq" and (where applicable) messaging.destination.name. ServiceConnect-specific metrics live under the messaging.serviceconnect.* sub-namespace.
| Metric | Type | Unit | What it counts |
|---|---|---|---|
messaging.publish.duration | Histogram | s | Wall time of a publish, from start to broker ack |
messaging.process.duration | Histogram | s | Wall time of consumer-side handler dispatch |
messaging.client.published.messages | Counter | {message} | Messages successfully published |
messaging.client.consumed.messages | Counter | {message} | Messages consumed; tagged messaging.outcome=success|error|retry |
messaging.serviceconnect.retry.attempts | Counter | {attempt} | Header-counter retry increments |
messaging.serviceconnect.retry.drops | Counter | {drop} | Messages dropped because retry publishing failed |
messaging.serviceconnect.publish.confirm_timeouts | Counter | {timeout} | Publishes that exceeded the configured publish timeout. Tagged with messaging.system, messaging.operation.type=publish, and messaging.destination.name (the exchange name, or the routing key when exchange is empty — SendAsync routes through the default exchange so the routing key carries the real per-queue destination). |
messaging.serviceconnect.audit.drops | Counter | {drop} | Audit messages that failed to publish |
messaging.serviceconnect.outgoing_filters.blocked | Counter | {message} | Outgoing operations aborted because an outgoing filter returned FilterAction.Stop. No publish/send span is emitted for blocked operations, so this counter is the operator-visible signal for filter-suppressed deliveries. Tagged with messaging.system=serviceconnect and (when known) messaging.message.type. |
messaging.serviceconnect.process.messages.inflight | UpDownCounter | {message} | Currently-dispatched, not-yet-acked messages |
messaging.serviceconnect.aggregator.snapshot_remove_failed_after_dispatch | Counter | {failure} | Aggregator snapshot-remove failures after a successful handler dispatch — indicates the at-least-once duplicate window has been entered for the affected rows |
The error.type tag is added on failure paths via an allow-list mapper. Stable mapped values are: cancelled (OperationCanceledException), timeout (TimeoutException), channel_closed (AlreadyClosedException), broker_unreachable (BrokerUnreachableException), publish_nacked (PublishException), broker_interrupted (OperationInterruptedException and its subclasses). Everything else falls back to the exception’s Type.Name. Exception messages are never used as tags — they’re unbounded cardinality.
Carve-out for messaging.publish.duration: TimeoutException does NOT add error.type=timeout on this metric. A publish-confirm timeout means the broker acknowledgement did not arrive within the configured window, but delivery may still have succeeded — adding error.type here would produce misleading “publish error” signals in duration-based dashboards. Use the messaging.serviceconnect.publish.confirm_timeouts counter as the authoritative signal for confirm-timeouts. All other failure types carry error.type as normal on this metric.
Cardinality
Section titled “Cardinality”Typical deployments — 5-20 queues, 5-20 exchanges, ~5 outcome / error categories — yield ~400-500 active series per metric upper-bound. Operators with very high queue counts (1000+) should consider this when sizing their TSDB.
The messaging.outcome tag is a closed three-value set (success | error | retry) at the consumer-host emit site. The “drop” outcome (retry-publish-failure path) is captured separately on messaging.serviceconnect.retry.drops rather than as a fourth outcome value: when the retry publish fails, InboundMessageProcessor returns processed=true so the broker stops redelivering the poison message, and that signal collapses with normal success at the host emit site — there’s no fourth value to surface without changing the contract.
Note that messaging.serviceconnect.audit.drops is the one metric that doesn’t carry messaging.destination.name — the audit queue is a single global destination, so per-queue cardinality doesn’t apply there.
Per-message tags (messaging.message.id, routing keys, conversation IDs) are deliberately NOT included on metrics — they belong on traces, where one span per message matches the data model.
Tracing vs metrics
Section titled “Tracing vs metrics”Tracing is opt-in via AddTelemetry() (see the Wiring subsection under Tracing above) because span creation has a per-message allocation cost that’s only worth paying when you’ll actually export the spans. Metrics are always-on because instrument emission is free without a listener — same pattern as the .NET BCL libraries (HttpClient, EFCore, Sockets).
Connection-lifecycle logs
Section titled “Connection-lifecycle logs”Beyond metrics, ServiceConnect emits structured Info logs for connection state transitions. Filter on the ServiceConnect.Client.RabbitMQ category:
| Event ID | Event name | Level | Emitted when |
|---|---|---|---|
| 2 | ConnectionOpened | Information | A consumer connection establishes |
| 3 | ProducerConnectionOpened | Information | A producer connection establishes |
| 4 | ConnectionRecovered | Information | RabbitMQ.Client’s auto-recovery restores the connection and replays the topology (exchanges, queues, bindings) on the new channel — TopologyRecoveryEnabled = true so consumer bindings survive cluster failover to a fresh broker node |
| 5 | ConnectionLost | Information | The broker initiates ConnectionShutdown (e.g. broker restart, cluster failover) — NOT escalated to Warning because broker-initiated shutdowns happen for normal operational reasons |
| 6 | AckFailed | Warning | An ack call fails; carries MessageId for correlation |
| 7 | NackFailed | Warning | A nack call fails; carries MessageId for correlation |
Health checks
Section titled “Health checks”ServiceConnect.HealthChecks ships three opt-in IHealthCheck classes for Microsoft.Extensions.Diagnostics.HealthChecks:
| Check | Default name | Observes |
|---|---|---|
BusConsumingHealthCheck | serviceconnect-bus | IBus.IsConsuming |
ConsumerConnectionHealthCheck | serviceconnect-consumer | IConsumer.IsConnected |
ProducerConnectionHealthCheck | serviceconnect-producer | IProducer.IsHealthy |
The package is transport-agnostic — it depends only on ServiceConnect.Interfaces. All three checks are O(1) state inspections; they do not open broker channels or perform AMQP round-trips per probe.
dotnet add package ServiceConnect.HealthChecksWiring
Section titled “Wiring”Pick the calls that match what your host actually does. A consume-only host omits AddServiceConnectProducer; a publish-only host omits both AddServiceConnectConsumer and AddServiceConnectBus (a host that never starts consuming would report IsConsuming permanently false); hosts that do both call all three.
using ServiceConnect.HealthChecks;
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(t => t.Host = "rabbit");});
services.AddHealthChecks() .AddServiceConnectBus(tags: ["live"]) .AddServiceConnectConsumer(tags: ["ready"]) .AddServiceConnectProducer(tags: ["ready"]);
app.MapHealthChecks("/health/live", new HealthCheckOptions { Predicate = c => c.Tags.Contains("live") });app.MapHealthChecks("/health/ready", new HealthCheckOptions { Predicate = c => c.Tags.Contains("ready") });Broker-initiated cancellation
Section titled “Broker-initiated cancellation”When the broker cancels a ServiceConnect consumer (queue deleted, queue policy expired, mirror promoted), the consumer’s deliveries stop. ServiceConnect detects this via AMQP’s basic.cancel event and propagates the signal:
IBus.IsConsumingreturnsfalse.BusConsumingHealthCheckreportsUnhealthy.
Operator action: investigate the broker-side cause, fix it (re-create the queue with the right arguments, restore the policy, etc.), then restart the host. ServiceConnect does not auto-redeclare the queue — that would defeat an operator’s deliberate deletion.
Steady-state semantics
Section titled “Steady-state semantics”Each check inspects the last known state from the transport client’s event stream. There is no active probing — that would compete with real traffic and amplify failure modes (a probe interval of 5 seconds × N replicas would mean steady channel churn against the broker for what is, in practice, a one-bit signal). Each check also honours the framework-supplied CancellationToken — probes cancelled by the health-check framework (timeout or shutdown) surface as OperationCanceledException rather than returning a stale result.
Three timing / state footnotes worth knowing:
- The producer connects lazily — and the check knows it.
ProducerConnectionHealthCheckdistinguishes lazy-not-yet-tried from failure viaIProducer.HasAttemptedConnection. Before any publish or send has been issued (HasAttemptedConnectionisfalse), the check returnsHealthyso pods don’t crash-loop at startup. Once the first publish attempt completes (success or failure),HasAttemptedConnectionbecomestrueand the check then reflects the realIProducer.IsHealthystate. Hosts that publish only in response to inbound traffic should still not put the producer check on thereadytag — the check can’t detect a connection failure until an outbound attempt is made. UseAddServiceConnectConsumeronly onreadyfor that topology. - The window between connection drop and event observation. When the broker connection drops, there is a small (millisecond-scale) gap before the client raises its shutdown event and
IsConnected/IsHealthyflip tofalse. A probe firing inside that gap can still see Healthy. This is shorter than any K8s probe interval and is the same gap any in-process check has, regardless of implementation.
Custom checks
Section titled “Custom checks”If you need anything more than the shipped three checks — a custom predicate over multiple bus state pieces, a different failure-status mapping, integration with a non-Microsoft.Extensions.Diagnostics.HealthChecks framework — implement IHealthCheck directly against IBus, IConsumer, or IProducer. The shipped check classes are sealed; their source is short enough to copy as a starting point.
Putting it together
Section titled “Putting it together”An on-call engineer is looking at a ticket that says “the welcome-email job didn’t fire for user X.” The stitching looks like this:
- Find the message by correlation id (probably the user id) in the audit queue — that tells you whether the event actually hit the bus.
- If it’s there, search logs for that
MessageId— a handler exception, or a warning about an unregistered type, falls out here. - If the log trail ends with “Max retries exceeded,” pull the message from the error queue — the
Exceptionheader tells you what threw. Fix the root cause, then replay from audit or from the error queue back into the main queue. - If the audit queue doesn’t have it, the problem is upstream of the bus — the publisher didn’t send it. That’s a different investigation, but one this stack doesn’t obscure.
What comes next
Section titled “What comes next”- Error Handling — the error-queue side of the investigation flow above.
- Filters — the hook to add tracing, redaction, or custom header stamping.
- Configuration —
AuditingEnabled,AuditQueueName,IncludeMachineNameInHeaders.