Filters
Filters run on the envelope — the bytes-and-headers wrapper that carries a message through the transport — at three points in the pipeline: before a message is published or sent, before an incoming message reaches a handler, and after a handler returns. They are the hook for cross-cutting concerns that should apply to every message passing a certain point, rather than being added to each handler individually.
Use filters for the things every handler would otherwise duplicate: tracing headers, auth checks, logging, message shape normalisation, redaction.
When to use it
Section titled “When to use it”- You have a concern that applies to many or all messages — correlation propagation, auth, telemetry, redaction.
- You want the concern enforced at the transport boundary, not scattered across handler bodies.
- You need the ability to stop a message centrally. Filters can decide the pipeline stops here.
If the concern only applies to one or two handlers, put it in those handlers. Filters earn their place by being uniform.
The contract
Section titled “The contract”One interface, one method:
public interface IFilter{ Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default);}The return value tells the pipeline what to do next:
FilterAction.Continue— the next filter runs; eventually the message is sent or handled.FilterAction.Stop— no further filters run on this envelope, and the pipeline stops.
The Envelope gives you two things: mutable Headers and a read-only Body. You modify the bytes only indirectly — by letting serialisation happen upstream of the filter and working with headers here. If a filter needs to change the payload itself, it’s usually a sign the concern belongs in a handler instead.
An outgoing filter
Section titled “An outgoing filter”The canonical example: stamp a trace id on every outgoing message. The handler shouldn’t have to know — the filter puts it on for you:
public sealed class TraceHeaderFilter(ITracingContext tracing) : IFilter{ public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { envelope.Headers["X-Trace-Id"] = tracing.CurrentTraceId; return Task.FromResult(FilterAction.Continue); }}Registration is a one-liner on the builder, plus whatever DI setup the filter’s dependencies need:
services.AddSingleton<ITracingContext, TracingContext>();services.AddSingleton<TraceHeaderFilter>();services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(t => { t.Host = "localhost"; }); builder.ConfigureQueues(q => q.QueueName = "notifications-sender"); builder.AddOutgoingFilter<TraceHeaderFilter>();});Every SendAsync and PublishAsync on this bus now runs through TraceHeaderFilter before hitting the wire. Consumers see X-Trace-Id in the message headers without the sending handler knowing the filter exists.
Note the DI registration. Filters are resolved from the container, so they must be registered as services — the builder’s AddOutgoingFilter<T>() call only tells the pipeline which type to resolve; the container is what actually constructs one. Register filters as Scoped or Transient by default; use Singleton only when the filter is stateless and thread-safe.
When an outgoing filter returns FilterAction.Stop, the corresponding PublishAsync / SendAsync / SendToManyAsync / RouteAsync call throws OutgoingFiltersBlockedException. Callers that intentionally use FilterAction.Stop to suppress a message must catch this exception (or rely on a higher-level handler).
Incoming filters
Section titled “Incoming filters”Three hooks on the incoming side, each for a different purpose:
builder.AddBeforeConsumingFilter<AuthFilter>(); // runs before the handlerbuilder.AddAfterConsumingFilter<AuditFilter>(); // runs after the handler returnsbuilder.AddOnConsumedSuccessfullyFilter<DedupeFilter>(); // runs only after a successful handler invocationThe before position is where “should this message even be processed?” goes — auth, deduplication, feature-flag gates. Returning FilterAction.Stop here stops the pipeline cleanly: the handler never runs, and the pipeline treats it as successfully processed (so the message is acked and does not go to the error queue).
The after position is where post-processing goes — audit logging, metrics, cleanup. The handler has already run; a filter returning FilterAction.Stop here has no effect on the handler outcome but does stop any filters further down the after-chain from running. If an AfterConsuming filter throws, the exception is swallowed (logged at Warning) and the message remains acked — the handler’s side effects have already committed.
The on-consumed-successfully position runs only after a successful handler invocation — failures and unhandled messages skip it. Use it for at-most-once side effects that must not fire if the handler threw or left the message unhandled: recording a deduplication key, writing to an outbox, emitting an audit event where partial records are worse than no records.
public sealed class AuthFilter(IAuthChecker auth) : IFilter{ public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { if (!envelope.Headers.TryGetValue("X-Principal", out var principal)) return FilterAction.Stop; // drop unauthenticated messages
return await auth.IsAuthorizedAsync((string)principal!, cancellationToken) ? FilterAction.Continue : FilterAction.Stop; }}Order of execution
Section titled “Order of execution”Filters run in registration order. If you register A then B as outgoing filters, A runs first; if A returns FilterAction.Stop, B never sees the envelope. Design filters to be independent where you can — a pipeline where filter order silently matters is a pipeline that breaks the first time someone reorders registrations.
When order must matter (stamp the trace id before an auth check that depends on it), state that coupling in a comment next to the registration, not hidden inside the filter.
The middleware alternative
Section titled “The middleware alternative”ServiceConnect also has two middleware hooks — AddSendMessageMiddleware<T> and AddMessageProcessingMiddleware<T> — which wrap the whole operation with next-delegate semantics, the way ASP.NET Core middleware does. Filters are stateless inspect/stamp/maybe-stop; middleware is wrap-the-operation. Pick by scenario:
| You want to… | Use | Why |
|---|---|---|
| Read or stamp a header | Filter | Stateless inspect/produce; that’s the whole filter contract |
| Short-circuit the pipeline based on header content | Filter | FilterAction.Stop is a first-class outcome |
| Authorise the message based on incoming claims and reject | Filter | Inspect headers, return FilterAction.Stop if rejected |
Wrap the inner pipeline with try/finally (open a tracing scope, close it) | Middleware | Filters can’t observe completion of next |
| Time the whole consume operation | Middleware | Need a Stopwatch that brackets next() |
| Catch exceptions thrown by the inner pipeline | Middleware | Filters return FilterAction; they don’t see exceptions from next |
| Mutate the message body | Middleware | Middleware can hand substitute bytes to next(); filters have no continuation to redirect |
| Open a unit-of-work, commit on success, rollback on exception | Middleware | Needs to observe success-vs-exception from next() to choose commit or rollback |
Rule of thumb: filter when “inspect or stamp, maybe stop” is the whole job; middleware when you need to wrap the operation.
Lifetime constraints
Section titled “Lifetime constraints”ISendMessageMiddleware must be registered as Singleton. AddServiceConnect validates this at host start-up and throws InvalidOperationException if any ISendMessageMiddleware type is registered with a Transient or Scoped lifetime — the send pipeline runs from non-handler call sites (background workers, hosted services, any caller that holds an IBus) where a per-request scope is not always available, and silently capturing a stale scope from the root provider would race against IServiceProvider disposal.
IMessageProcessingMiddleware has no such restriction. Register it as Transient (the default), Scoped (one instance per handler dispatch), or Singleton (one instance reused across dispatches); the consume pipeline runs inside a per-message scope, so any of these is safe.
Reference
Section titled “Reference”IFilter— short-circuit consume-pipeline stageIMessageProcessingMiddleware— wrap the consume pipelineISendMessageMiddleware— wrap the send pipelineIPipelineConfiguration— wiring config