Skip to content

Filters

Filters run on the envelope — the bytes-and-headers wrapper that carries a message through the transport — at three points in the pipeline: before a message is published or sent, before an incoming message reaches a handler, and after a handler returns. They are the hook for cross-cutting concerns that should apply to every message passing a certain point, rather than being added to each handler individually.

Use filters for the things every handler would otherwise duplicate: tracing headers, auth checks, logging, message shape normalisation, redaction.

  • You have a concern that applies to many or all messages — correlation propagation, auth, telemetry, redaction.
  • You want the concern enforced at the transport boundary, not scattered across handler bodies.
  • You need the ability to stop a message centrally. Filters can decide the pipeline stops here.

If the concern only applies to one or two handlers, put it in those handlers. Filters earn their place by being uniform.

One interface, one method:

public interface IFilter
{
Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default);
}

The return value tells the pipeline what to do next:

  • FilterAction.Continue — the next filter runs; eventually the message is sent or handled.
  • FilterAction.Stop — no further filters run on this envelope, and the pipeline stops.

The Envelope gives you two things: mutable Headers and a read-only Body. You modify the bytes only indirectly — by letting serialisation happen upstream of the filter and working with headers here. If a filter needs to change the payload itself, it’s usually a sign the concern belongs in a handler instead.

The canonical example: stamp a trace id on every outgoing message. The handler shouldn’t have to know — the filter puts it on for you:

Sender/TraceHeaderFilter.cs
public sealed class TraceHeaderFilter(ITracingContext tracing) : IFilter
{
public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
envelope.Headers["X-Trace-Id"] = tracing.CurrentTraceId;
return Task.FromResult(FilterAction.Continue);
}
}

Registration is a one-liner on the builder, plus whatever DI setup the filter’s dependencies need:

Sender/Program.cs
services.AddSingleton<ITracingContext, TracingContext>();
services.AddSingleton<TraceHeaderFilter>();
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => { t.Host = "localhost"; });
builder.ConfigureQueues(q => q.QueueName = "notifications-sender");
builder.AddOutgoingFilter<TraceHeaderFilter>();
});

Every SendAsync and PublishAsync on this bus now runs through TraceHeaderFilter before hitting the wire. Consumers see X-Trace-Id in the message headers without the sending handler knowing the filter exists.

Note the DI registration. Filters are resolved from the container, so they must be registered as services — the builder’s AddOutgoingFilter<T>() call only tells the pipeline which type to resolve; the container is what actually constructs one. Register filters as Scoped or Transient by default; use Singleton only when the filter is stateless and thread-safe.

When an outgoing filter returns FilterAction.Stop, the corresponding PublishAsync / SendAsync / SendToManyAsync / RouteAsync call throws OutgoingFiltersBlockedException. Callers that intentionally use FilterAction.Stop to suppress a message must catch this exception (or rely on a higher-level handler).

Three hooks on the incoming side, each for a different purpose:

builder.AddBeforeConsumingFilter<AuthFilter>(); // runs before the handler
builder.AddAfterConsumingFilter<AuditFilter>(); // runs after the handler returns
builder.AddOnConsumedSuccessfullyFilter<DedupeFilter>(); // runs only after a successful handler invocation

The before position is where “should this message even be processed?” goes — auth, deduplication, feature-flag gates. Returning FilterAction.Stop here stops the pipeline cleanly: the handler never runs, and the pipeline treats it as successfully processed (so the message is acked and does not go to the error queue).

The after position is where post-processing goes — audit logging, metrics, cleanup. The handler has already run; a filter returning FilterAction.Stop here has no effect on the handler outcome but does stop any filters further down the after-chain from running. If an AfterConsuming filter throws, the exception is swallowed (logged at Warning) and the message remains acked — the handler’s side effects have already committed.

The on-consumed-successfully position runs only after a successful handler invocation — failures and unhandled messages skip it. Use it for at-most-once side effects that must not fire if the handler threw or left the message unhandled: recording a deduplication key, writing to an outbox, emitting an audit event where partial records are worse than no records.

public sealed class AuthFilter(IAuthChecker auth) : IFilter
{
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
if (!envelope.Headers.TryGetValue("X-Principal", out var principal))
return FilterAction.Stop; // drop unauthenticated messages
return await auth.IsAuthorizedAsync((string)principal!, cancellationToken)
? FilterAction.Continue
: FilterAction.Stop;
}
}

Filters run in registration order. If you register A then B as outgoing filters, A runs first; if A returns FilterAction.Stop, B never sees the envelope. Design filters to be independent where you can — a pipeline where filter order silently matters is a pipeline that breaks the first time someone reorders registrations.

When order must matter (stamp the trace id before an auth check that depends on it), state that coupling in a comment next to the registration, not hidden inside the filter.

ServiceConnect also has two middleware hooks — AddSendMessageMiddleware<T> and AddMessageProcessingMiddleware<T> — which wrap the whole operation with next-delegate semantics, the way ASP.NET Core middleware does. Filters are stateless inspect/stamp/maybe-stop; middleware is wrap-the-operation. Pick by scenario:

You want to…UseWhy
Read or stamp a headerFilterStateless inspect/produce; that’s the whole filter contract
Short-circuit the pipeline based on header contentFilterFilterAction.Stop is a first-class outcome
Authorise the message based on incoming claims and rejectFilterInspect headers, return FilterAction.Stop if rejected
Wrap the inner pipeline with try/finally (open a tracing scope, close it)MiddlewareFilters can’t observe completion of next
Time the whole consume operationMiddlewareNeed a Stopwatch that brackets next()
Catch exceptions thrown by the inner pipelineMiddlewareFilters return FilterAction; they don’t see exceptions from next
Mutate the message bodyMiddlewareMiddleware can hand substitute bytes to next(); filters have no continuation to redirect
Open a unit-of-work, commit on success, rollback on exceptionMiddlewareNeeds to observe success-vs-exception from next() to choose commit or rollback

Rule of thumb: filter when “inspect or stamp, maybe stop” is the whole job; middleware when you need to wrap the operation.

ISendMessageMiddleware must be registered as Singleton. AddServiceConnect validates this at host start-up and throws InvalidOperationException if any ISendMessageMiddleware type is registered with a Transient or Scoped lifetime — the send pipeline runs from non-handler call sites (background workers, hosted services, any caller that holds an IBus) where a per-request scope is not always available, and silently capturing a stale scope from the root provider would race against IServiceProvider disposal.

IMessageProcessingMiddleware has no such restriction. Register it as Transient (the default), Scoped (one instance per handler dispatch), or Singleton (one instance reused across dispatches); the consume pipeline runs inside a per-message scope, so any of these is safe.

  • Handlers — where filtered messages end up.
  • Pub/Sub — outgoing filters run for both sends and publishes.
  • Endpoints — filters attach to the bus, not the endpoint; the same filters apply to every queue the bus talks to.