Skip to content

IFilter

IFilter is a short-circuit stage in the pipeline — each filter returns FilterAction.Continue to proceed or FilterAction.Stop to stop the pipeline. Filters are cheap, synchronous-feeling hooks that fit neatly in front of a handler for concerns such as deduplication, tenant checks, or feature-flag gating. IFilterPipeline is the orchestrator that composes the registered filters into the outgoing, before-consuming, and after-consuming stages that wrap every handler invocation.

See Filters for the conceptual model, and reach for IMessageProcessingMiddleware when you need an await next() style wrapper instead of a predicate.

Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default);

Inspects or mutates envelope and returns whether the pipeline should continue.

Parameters

  • envelope — the in-flight message envelope; headers may be read or written.
  • cancellationToken — cancels the filter when the consumer loop is stopping.

Returns. FilterAction.Continue to continue pipeline execution; FilterAction.Stop to stop the pipeline and suppress further stages.

Remarks. Filters that need the bus should take IBus as a constructor dependency and be registered in DI. The historical IFilter.Bus property was never populated by the pipeline and returned null at runtime — it has been removed.


Task<FilterAction> ExecuteOutgoingFiltersAsync(
Envelope envelope,
CancellationToken cancellationToken = default);

Runs every outgoing filter in registration order against envelope.

Parameters

  • envelope — the envelope being published or sent.
  • cancellationToken — cancels the pipeline when the caller abandons the send.

Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion.

Remarks. Invoked for every publish, send, and reply. Filters run sequentially and the first FilterAction.Stop short-circuits the rest.


Task<FilterAction> ExecuteBeforeConsumingFiltersAsync(
Envelope envelope,
CancellationToken cancellationToken = default);

Runs the before-consuming filters that fence the handler call.

Parameters

  • envelope — the envelope about to be dispatched to a handler.
  • cancellationToken — cancels the pipeline with the consumer loop.

Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion.


Task<FilterAction> ExecuteAfterConsumingFiltersAsync(
Envelope envelope,
CancellationToken cancellationToken = default);

Runs the after-consuming filters once the handler has returned.

Parameters

  • envelope — the envelope whose handler just completed.
  • cancellationToken — cancels the pipeline with the consumer loop.

Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion. Stopping here suppresses remaining post-processing stages but does not un-handle the message.

Remarks. Useful for post-processing concerns such as metrics emission, audit trails, or cleanup that is cheaper to run as a predicate than a middleware. Exceptions thrown from an after-consuming filter are caught by the dispatcher and logged at Warning; they do not flip the dispatch outcome to Success = false or trigger redelivery. Reserve AfterConsuming for best-effort teardown that must not block acknowledgement, and use OnConsumedSuccessfully (below) for side-effects whose failure must redeliver the message.


Task<FilterAction> ExecuteOnConsumedSuccessfullyFiltersAsync(
Envelope envelope,
CancellationToken cancellationToken = default);

Runs only after a successful handler invocation — the dispatcher’s chain returned Success = true and NotHandled = false. Failures and unhandled messages skip this stage.

A filter throwing here propagates as a dispatch failure: the dispatcher’s catch block sets result.Success = false and the broker redelivers. Use this to record at-most-once side effects (deduplication keys, audit events, outbox rows) that depend on the handler having actually completed.

FilterAction.Stop halts further on-success filters but does not flip result.Success to false — consumption already succeeded.

Dropping duplicate messages with a distributed cache

Section titled “Dropping duplicate messages with a distributed cache”
public sealed class DuplicateCheckFilter : IFilter
{
private readonly IDistributedCache _cache;
private const string CachePrefix = "msg:";
public DuplicateCheckFilter(IDistributedCache cache) { _cache = cache; }
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
if (!envelope.Headers.TryGetValue("MessageId", out var raw)) return FilterAction.Continue;
var cacheKey = CachePrefix + raw;
var existing = await _cache.GetAsync(cacheKey, cancellationToken);
return existing is null ? FilterAction.Continue : FilterAction.Stop;
}
}
public sealed class DuplicateRecordFilter : IFilter
{
private static readonly TimeSpan DedupeWindow = TimeSpan.FromHours(24);
private readonly IDistributedCache _cache;
private const string CachePrefix = "msg:";
public DuplicateRecordFilter(IDistributedCache cache) { _cache = cache; }
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
if (!envelope.Headers.TryGetValue("MessageId", out var raw)) return FilterAction.Continue;
var cacheKey = CachePrefix + raw;
await _cache.SetAsync(
cacheKey,
new byte[] { 1 },
new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = DedupeWindow },
cancellationToken);
return FilterAction.Continue;
}
}
// Registration during startup.
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example");
builder.ConfigureQueues(queues => queues.QueueName = "order-service");
builder.AddBeforeConsumingFilter<DuplicateCheckFilter>();
builder.AddOnConsumedSuccessfullyFilter<DuplicateRecordFilter>();
});

The pair of filters runs in two stages: the before-consuming stage short-circuits the dispatch when the MessageId is already recorded; the on-success stage records the id only after the handler has completed. Recording in the before-consuming stage (or in AfterConsuming, which runs on both success and failure) silently drops legitimate broker redeliveries after a handler crash — the canonical reason this redesign uses two filters rather than one.