Skip to content

Observability

Observability in a messaging system is three overlapping views of the same events: logs tell you what the bus did, the audit queue gives you a replayable copy of every message it handled, and headers carry enough metadata to correlate each processed message back to a specific run. ServiceConnect offers all three. This page covers what each one is for, and how to wire them together so an on-call engineer can answer “what happened to message X?”.

The bus logs through Microsoft.Extensions.Logging. No special setup is needed — the ILogger<T> instances in the container are what the bus writes to, so whatever sink you’ve configured (console, file, Seq, Application Insights) receives its logs automatically.

Key log events you’ll see in a healthy system:

  • Startup: "Bus starting to consume on queue {QueueName} for {Count} message types." — fired once during StartConsumingAsync. If the bus has started, this has logged.
  • Shutdown: "Bus stopping message consumption." — fired from StopConsumingAsync.
  • Unregistered type (warning): "Unregistered message type '{TypeName}'. Rejecting" — fired for an incoming message whose type name isn’t in the handler registry. Usually a topology mistake (wrong queue binding) or a forgotten handler registration.

Key log events for failures:

  • Dispatch error (error): "Error dispatching message of type {MessageType}" — fired every time a handler (or a filter, or the deserializer) throws. Always paired with an exception.
  • Max retries exceeded (error): "Max retries exceeded for MessageId {MessageId}" — fired when a retried message reaches MaxRetries and is about to land in the error queue.
  • Terminal rejection (error): "Rejecting permanently invalid inbound message with MessageId {MessageId}" — fired for messages that are unparsable on the wire (missing headers, oversized payload) and go straight to the error queue without retries.

Structure log queries around MessageId. Every published message has one (the bus stamps it); it follows the message through retries and into the error queue via the MessageId header. Searching for a single MessageId in your log aggregator gets you the full processing history of that message.

When AuditingEnabled = true, every successfully processed message is republished to AuditQueueName (default "audit") with its headers preserved. Setup:

builder.ConfigureQueues(q =>
{
q.QueueName = "orders-service";
q.AuditingEnabled = true;
q.AuditQueueName = "orders-service.audit";
});

What audit gives you that logs don’t:

  • Full message body. Logs have the exception and the MessageId; the audit queue has the actual bytes the handler saw. When a bug surfaces days later, you have the payload to reproduce.
  • Replay. The audit queue is just a queue. Nothing stops you from draining it into the main queue and re-running the workflow against a fixed handler. Treat that as a manual operator action, not an automated retry.
  • Downstream analytics. Because audit is a copy of the real traffic, you can point a stream processor (Kafka mirror, Logstash, whatever) at it without being in the critical path.

Two important caveats:

  • Byte-stream messages are not audited. The stream packets (Streaming) would flood the audit queue with raw frames; the audit publisher skips anything tagged as a stream.
  • Failed messages are not audited. Audit is the success path. Failed messages go to the error queue (Error Handling), and the two queues are deliberately separate.

Size the audit queue. If you keep it around indefinitely it grows linearly with traffic; either consume it to archive storage on a schedule, or set a queue-level TTL or length limit in RabbitMQ.

Every message the bus sends carries a consistent set of headers. These are the primitives you stitch telemetry from:

HeaderWhat it isTypical use
MessageIdUnique per sendLog correlation, idempotency key
CorrelationIdThe conversation idStitch request/reply or a saga’s full trace together
SourceAddressSending queue name”Who sent this?” — populated automatically
DestinationAddressReceiving queue”Where is this going?”
TimeSentUTC timestamp at sendCompare to TimeReceived to observe transit time
TimeReceivedUTC timestamp at consumeEnd-to-end latency signal
TimeProcessedUTC timestamp after handlerHandler duration if you want to split transit from processing
RetryCountAttempts so farDiagnose retry storms; populated by the retry handler
ExceptionJSON-serialised errorPresent in error-queue entries only

All header names live in ServiceConnect.Interfaces.HeaderKeys so you don’t have to spell them. The IConsumeContext on a handler exposes the whole header dictionary — see Handlers.

There are also two opt-in headers:

  • SourceMachine and DestinationMachine — sender and receiver hostnames. Off by default: IncludeMachineNameInHeaders = true on the bus config turns them on. Useful for multi-tenant brokers where you need to identify the sender host, but be aware that exposing internal hostnames to every audit consumer is an information-disclosure risk.

The ServiceConnect.Telemetry package provides the ActivitySource plumbing for distributed tracing. It exposes a single source (ServiceConnectActivitySource.ActivitySourceName) that emits publish, send, and consume spans, each tagged with the OTel messaging semantic conventions (messaging.system, messaging.destination.name, messaging.rabbitmq.destination.routing_key, messaging.message.id, messaging.message.conversation_id). For RabbitMQ, messaging.destination.name carries the broker-side destination — the exchange name for publishes and the queue name for sends — and messaging.rabbitmq.destination.routing_key carries the publish routing key when present. Operation classification follows the OTel messaging semconv 1.x attributes: messaging.operation.type is publish for the producer side (sends and publishes alike) and process for the consumer side, and messaging.operation.name mirrors that with publish / process. The span also carries server.address (broker hostname or first entry of a cluster list) and server.port (broker TCP port from ITransportConfiguration.ClientSettings["Port"] when present). Backends can aggregate each shape separately by filtering on messaging.operation.type.

Terminal window
dotnet add package ServiceConnect.Telemetry

Call builder.AddTelemetry() inside AddServiceConnect, then register the single activity source with your OTel tracing builder:

using ServiceConnect.Telemetry;
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(/* ... */);
builder.AddTelemetry(opts => { /* optional enrichment */ });
});
services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddServiceConnectInstrumentation()
.AddOtlpExporter());

To disable a specific direction without unregistering the source, use the per-direction flags in ServiceConnectInstrumentationOptions:

builder.AddTelemetry(opts =>
{
opts.EnablePublishTelemetry = false; // suppress publish spans
opts.EnableSendTelemetry = false; // suppress send spans
// opts.EnableConsumeTelemetry is true by default
});

W3C traceparent/tracestate propagation is unaffected by the enable flags — headers are injected and extracted regardless of whether ServiceConnect emits its own span for that direction.

Pipeline ordering — user outgoing filters run before the telemetry span starts

Section titled “Pipeline ordering — user outgoing filters run before the telemetry span starts”

TelemetrySendMiddleware is registered at the outermost position on the send-message middleware pipeline so its activity brackets every other middleware on the way out and on the way back. User outgoing filters, however, are a separate pipeline stage that runs before the send-message middleware on the outgoing path. The order is:

user code → outgoing filters → send-message middleware (telemetry span starts here) → producer

Two consequences worth knowing:

  • Outgoing filters do not see the new publish/send span’s traceparent. The telemetry span is started inside the send-message middleware, after the filters have already run. Filters that need a parent trace context observe Activity.Current from the caller’s ambient context (typically the inbound consume span for handlers that publish, or the application’s root span for app code) — not the publish/send span ServiceConnect is about to create.
  • A filter that returns FilterAction.Stop (or throws) aborts the call before the telemetry middleware runs. No publish/send span is created for the blocked delivery. The OutgoingFiltersBlockedException thrown to the caller carries the user-visible signal; observability backends will see the absence of a span where one was expected.

If you want a span that brackets the entire IBus call — including the filter stage — start one in your application code around the PublishAsync / SendAsync invocation. The framework’s own publish/send span is scoped narrowly to the wire path.

Use opts.EnrichWithMessage to attach application-specific tags to a span from the decoded message:

builder.AddTelemetry(opts =>
{
opts.EnrichWithMessage = (activity, message) =>
activity.SetTag("app.correlation_id", message.CorrelationId);
});

Security note. Do not attach raw payload fields (or raw bytes) as span tags without review — messages may contain PII or regulated data that would then flow to every observability backend.

The framework injects W3C traceparent/tracestate headers onto every outgoing publish and send, and extracts them on the consume side. A span recorded by a subscriber is a direct child of the publisher’s span — a single TraceId threads the full journey across the broker, even across independent processes. See the examples/Telemetry sample for a three-process demonstration.

For bespoke headers that aren’t tracing — redaction, a tenant stamp — the Filters API remains the right hook. Filters and telemetry compose; they don’t compete.

ServiceConnect emits operator-grade metrics via System.Diagnostics.Metrics. The Meter is named ServiceConnect.Bus and is always-on — instruments are zero-cost when no listener subscribes (BCL pattern, same as HttpClient).

For OpenTelemetry users:

services.AddOpenTelemetry().WithMetrics(b => b.AddServiceConnectInstrumentation());

Without OpenTelemetry, attach a MeterListener directly:

var listener = new MeterListener();
listener.InstrumentPublished = (instrument, l) =>
{
if (instrument.Meter.Name == "ServiceConnect.Bus") l.EnableMeasurementEvents(instrument);
};
listener.Start();

Tags follow the OpenTelemetry messaging-metrics conventions. All metrics carry messaging.system="rabbitmq" and (where applicable) messaging.destination.name. ServiceConnect-specific metrics live under the messaging.serviceconnect.* sub-namespace.

MetricTypeUnitWhat it counts
messaging.publish.durationHistogramsWall time of a publish, from start to broker ack
messaging.process.durationHistogramsWall time of consumer-side handler dispatch
messaging.client.published.messagesCounter{message}Messages successfully published
messaging.client.consumed.messagesCounter{message}Messages consumed; tagged messaging.outcome=success|error|retry
messaging.serviceconnect.retry.attemptsCounter{attempt}Header-counter retry increments
messaging.serviceconnect.retry.dropsCounter{drop}Messages dropped because retry publishing failed
messaging.serviceconnect.publish.confirm_timeoutsCounter{timeout}Publishes that exceeded the configured publish timeout. Tagged with messaging.system, messaging.operation.type=publish, and messaging.destination.name (the exchange name, or the routing key when exchange is empty — SendAsync routes through the default exchange so the routing key carries the real per-queue destination).
messaging.serviceconnect.audit.dropsCounter{drop}Audit messages that failed to publish
messaging.serviceconnect.outgoing_filters.blockedCounter{message}Outgoing operations aborted because an outgoing filter returned FilterAction.Stop. No publish/send span is emitted for blocked operations, so this counter is the operator-visible signal for filter-suppressed deliveries. Tagged with messaging.system=serviceconnect and (when known) messaging.message.type.
messaging.serviceconnect.process.messages.inflightUpDownCounter{message}Currently-dispatched, not-yet-acked messages
messaging.serviceconnect.aggregator.snapshot_remove_failed_after_dispatchCounter{failure}Aggregator snapshot-remove failures after a successful handler dispatch — indicates the at-least-once duplicate window has been entered for the affected rows

The error.type tag is added on failure paths via an allow-list mapper. Stable mapped values are: cancelled (OperationCanceledException), timeout (TimeoutException), channel_closed (AlreadyClosedException), broker_unreachable (BrokerUnreachableException), publish_nacked (PublishException), broker_interrupted (OperationInterruptedException and its subclasses). Everything else falls back to the exception’s Type.Name. Exception messages are never used as tags — they’re unbounded cardinality.

Carve-out for messaging.publish.duration: TimeoutException does NOT add error.type=timeout on this metric. A publish-confirm timeout means the broker acknowledgement did not arrive within the configured window, but delivery may still have succeeded — adding error.type here would produce misleading “publish error” signals in duration-based dashboards. Use the messaging.serviceconnect.publish.confirm_timeouts counter as the authoritative signal for confirm-timeouts. All other failure types carry error.type as normal on this metric.

Typical deployments — 5-20 queues, 5-20 exchanges, ~5 outcome / error categories — yield ~400-500 active series per metric upper-bound. Operators with very high queue counts (1000+) should consider this when sizing their TSDB.

The messaging.outcome tag is a closed three-value set (success | error | retry) at the consumer-host emit site. The “drop” outcome (retry-publish-failure path) is captured separately on messaging.serviceconnect.retry.drops rather than as a fourth outcome value: when the retry publish fails, InboundMessageProcessor returns processed=true so the broker stops redelivering the poison message, and that signal collapses with normal success at the host emit site — there’s no fourth value to surface without changing the contract.

Note that messaging.serviceconnect.audit.drops is the one metric that doesn’t carry messaging.destination.name — the audit queue is a single global destination, so per-queue cardinality doesn’t apply there.

Per-message tags (messaging.message.id, routing keys, conversation IDs) are deliberately NOT included on metrics — they belong on traces, where one span per message matches the data model.

Tracing is opt-in via AddTelemetry() (see the Wiring subsection under Tracing above) because span creation has a per-message allocation cost that’s only worth paying when you’ll actually export the spans. Metrics are always-on because instrument emission is free without a listener — same pattern as the .NET BCL libraries (HttpClient, EFCore, Sockets).

Beyond metrics, ServiceConnect emits structured Info logs for connection state transitions. Filter on the ServiceConnect.Client.RabbitMQ category:

Event IDEvent nameLevelEmitted when
2ConnectionOpenedInformationA consumer connection establishes
3ProducerConnectionOpenedInformationA producer connection establishes
4ConnectionRecoveredInformationRabbitMQ.Client’s auto-recovery restores the connection and replays the topology (exchanges, queues, bindings) on the new channel — TopologyRecoveryEnabled = true so consumer bindings survive cluster failover to a fresh broker node
5ConnectionLostInformationThe broker initiates ConnectionShutdown (e.g. broker restart, cluster failover) — NOT escalated to Warning because broker-initiated shutdowns happen for normal operational reasons
6AckFailedWarningAn ack call fails; carries MessageId for correlation
7NackFailedWarningA nack call fails; carries MessageId for correlation

ServiceConnect.HealthChecks ships three opt-in IHealthCheck classes for Microsoft.Extensions.Diagnostics.HealthChecks:

CheckDefault nameObserves
BusConsumingHealthCheckserviceconnect-busIBus.IsConsuming
ConsumerConnectionHealthCheckserviceconnect-consumerIConsumer.IsConnected
ProducerConnectionHealthCheckserviceconnect-producerIProducer.IsHealthy

The package is transport-agnostic — it depends only on ServiceConnect.Interfaces. All three checks are O(1) state inspections; they do not open broker channels or perform AMQP round-trips per probe.

Terminal window
dotnet add package ServiceConnect.HealthChecks

Pick the calls that match what your host actually does. A consume-only host omits AddServiceConnectProducer; a publish-only host omits both AddServiceConnectConsumer and AddServiceConnectBus (a host that never starts consuming would report IsConsuming permanently false); hosts that do both call all three.

using ServiceConnect.HealthChecks;
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => t.Host = "rabbit");
});
services.AddHealthChecks()
.AddServiceConnectBus(tags: ["live"])
.AddServiceConnectConsumer(tags: ["ready"])
.AddServiceConnectProducer(tags: ["ready"]);
app.MapHealthChecks("/health/live", new HealthCheckOptions { Predicate = c => c.Tags.Contains("live") });
app.MapHealthChecks("/health/ready", new HealthCheckOptions { Predicate = c => c.Tags.Contains("ready") });

When the broker cancels a ServiceConnect consumer (queue deleted, queue policy expired, mirror promoted), the consumer’s deliveries stop. ServiceConnect detects this via AMQP’s basic.cancel event and propagates the signal:

  • IBus.IsConsuming returns false.
  • BusConsumingHealthCheck reports Unhealthy.

Operator action: investigate the broker-side cause, fix it (re-create the queue with the right arguments, restore the policy, etc.), then restart the host. ServiceConnect does not auto-redeclare the queue — that would defeat an operator’s deliberate deletion.

Each check inspects the last known state from the transport client’s event stream. There is no active probing — that would compete with real traffic and amplify failure modes (a probe interval of 5 seconds × N replicas would mean steady channel churn against the broker for what is, in practice, a one-bit signal). Each check also honours the framework-supplied CancellationToken — probes cancelled by the health-check framework (timeout or shutdown) surface as OperationCanceledException rather than returning a stale result.

Three timing / state footnotes worth knowing:

  • The producer connects lazily — and the check knows it. ProducerConnectionHealthCheck distinguishes lazy-not-yet-tried from failure via IProducer.HasAttemptedConnection. Before any publish or send has been issued (HasAttemptedConnection is false), the check returns Healthy so pods don’t crash-loop at startup. Once the first publish attempt completes (success or failure), HasAttemptedConnection becomes true and the check then reflects the real IProducer.IsHealthy state. Hosts that publish only in response to inbound traffic should still not put the producer check on the ready tag — the check can’t detect a connection failure until an outbound attempt is made. Use AddServiceConnectConsumer only on ready for that topology.
  • The window between connection drop and event observation. When the broker connection drops, there is a small (millisecond-scale) gap before the client raises its shutdown event and IsConnected / IsHealthy flip to false. A probe firing inside that gap can still see Healthy. This is shorter than any K8s probe interval and is the same gap any in-process check has, regardless of implementation.

If you need anything more than the shipped three checks — a custom predicate over multiple bus state pieces, a different failure-status mapping, integration with a non-Microsoft.Extensions.Diagnostics.HealthChecks framework — implement IHealthCheck directly against IBus, IConsumer, or IProducer. The shipped check classes are sealed; their source is short enough to copy as a starting point.

An on-call engineer is looking at a ticket that says “the welcome-email job didn’t fire for user X.” The stitching looks like this:

  1. Find the message by correlation id (probably the user id) in the audit queue — that tells you whether the event actually hit the bus.
  2. If it’s there, search logs for that MessageId — a handler exception, or a warning about an unregistered type, falls out here.
  3. If the log trail ends with “Max retries exceeded,” pull the message from the error queue — the Exception header tells you what threw. Fix the root cause, then replay from audit or from the error queue back into the main queue.
  4. If the audit queue doesn’t have it, the problem is upstream of the bus — the publisher didn’t send it. That’s a different investigation, but one this stack doesn’t obscure.
  • Error Handling — the error-queue side of the investigation flow above.
  • Filters — the hook to add tracing, redaction, or custom header stamping.
  • ConfigurationAuditingEnabled, AuditQueueName, IncludeMachineNameInHeaders.