IFilter
Overview
Section titled “Overview”IFilter is a short-circuit stage in the pipeline — each filter returns FilterAction.Continue to proceed or FilterAction.Stop to stop the pipeline. Filters are cheap, synchronous-feeling hooks that fit neatly in front of a handler for concerns such as deduplication, tenant checks, or feature-flag gating. IFilterPipeline is the orchestrator that composes the registered filters into the outgoing, before-consuming, and after-consuming stages that wrap every handler invocation.
See Filters for the conceptual model, and reach for IMessageProcessingMiddleware when you need an await next() style wrapper instead of a predicate.
IFilter
Section titled “IFilter”ProcessAsync
Section titled “ProcessAsync”Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default);Inspects or mutates envelope and returns whether the pipeline should continue.
Parameters
envelope— the in-flight message envelope; headers may be read or written.cancellationToken— cancels the filter when the consumer loop is stopping.
Returns. FilterAction.Continue to continue pipeline execution; FilterAction.Stop to stop the pipeline and suppress further stages.
Remarks. Filters that need the bus should take IBus as a constructor dependency and be registered in DI. The historical IFilter.Bus property was never populated by the pipeline and returned null at runtime — it has been removed.
IFilterPipeline
Section titled “IFilterPipeline”ExecuteOutgoingFiltersAsync
Section titled “ExecuteOutgoingFiltersAsync”Task<FilterAction> ExecuteOutgoingFiltersAsync( Envelope envelope, CancellationToken cancellationToken = default);Runs every outgoing filter in registration order against envelope.
Parameters
envelope— the envelope being published or sent.cancellationToken— cancels the pipeline when the caller abandons the send.
Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion.
Remarks. Invoked for every publish, send, and reply. Filters run sequentially and the first FilterAction.Stop short-circuits the rest.
ExecuteBeforeConsumingFiltersAsync
Section titled “ExecuteBeforeConsumingFiltersAsync”Task<FilterAction> ExecuteBeforeConsumingFiltersAsync( Envelope envelope, CancellationToken cancellationToken = default);Runs the before-consuming filters that fence the handler call.
Parameters
envelope— the envelope about to be dispatched to a handler.cancellationToken— cancels the pipeline with the consumer loop.
Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion.
ExecuteAfterConsumingFiltersAsync
Section titled “ExecuteAfterConsumingFiltersAsync”Task<FilterAction> ExecuteAfterConsumingFiltersAsync( Envelope envelope, CancellationToken cancellationToken = default);Runs the after-consuming filters once the handler has returned.
Parameters
envelope— the envelope whose handler just completed.cancellationToken— cancels the pipeline with the consumer loop.
Returns. FilterAction.Stop if any filter stopped the pipeline; FilterAction.Continue when the pipeline ran to completion. Stopping here suppresses remaining post-processing stages but does not un-handle the message.
Remarks. Useful for post-processing concerns such as metrics emission, audit trails, or cleanup that is cheaper to run as a predicate than a middleware. Exceptions thrown from an after-consuming filter are caught by the dispatcher and logged at Warning; they do not flip the dispatch outcome to Success = false or trigger redelivery. Reserve AfterConsuming for best-effort teardown that must not block acknowledgement, and use OnConsumedSuccessfully (below) for side-effects whose failure must redeliver the message.
ExecuteOnConsumedSuccessfullyFiltersAsync
Section titled “ExecuteOnConsumedSuccessfullyFiltersAsync”Task<FilterAction> ExecuteOnConsumedSuccessfullyFiltersAsync( Envelope envelope, CancellationToken cancellationToken = default);Runs only after a successful handler invocation — the dispatcher’s chain
returned Success = true and NotHandled = false. Failures and unhandled
messages skip this stage.
A filter throwing here propagates as a dispatch failure: the dispatcher’s
catch block sets result.Success = false and the broker redelivers. Use
this to record at-most-once side effects (deduplication keys, audit events,
outbox rows) that depend on the handler having actually completed.
FilterAction.Stop halts further on-success filters but does not flip
result.Success to false — consumption already succeeded.
Dropping duplicate messages with a distributed cache
Section titled “Dropping duplicate messages with a distributed cache”public sealed class DuplicateCheckFilter : IFilter{ private readonly IDistributedCache _cache; private const string CachePrefix = "msg:";
public DuplicateCheckFilter(IDistributedCache cache) { _cache = cache; }
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { if (!envelope.Headers.TryGetValue("MessageId", out var raw)) return FilterAction.Continue; var cacheKey = CachePrefix + raw; var existing = await _cache.GetAsync(cacheKey, cancellationToken); return existing is null ? FilterAction.Continue : FilterAction.Stop; }}
public sealed class DuplicateRecordFilter : IFilter{ private static readonly TimeSpan DedupeWindow = TimeSpan.FromHours(24); private readonly IDistributedCache _cache; private const string CachePrefix = "msg:";
public DuplicateRecordFilter(IDistributedCache cache) { _cache = cache; }
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { if (!envelope.Headers.TryGetValue("MessageId", out var raw)) return FilterAction.Continue; var cacheKey = CachePrefix + raw; await _cache.SetAsync( cacheKey, new byte[] { 1 }, new DistributedCacheEntryOptions { AbsoluteExpirationRelativeToNow = DedupeWindow }, cancellationToken); return FilterAction.Continue; }}
// Registration during startup.services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.ConfigureQueues(queues => queues.QueueName = "order-service"); builder.AddBeforeConsumingFilter<DuplicateCheckFilter>(); builder.AddOnConsumedSuccessfullyFilter<DuplicateRecordFilter>();});The pair of filters runs in two stages: the before-consuming stage short-circuits the dispatch when the MessageId is already recorded; the on-success stage records the id only after the handler has completed. Recording in the before-consuming stage (or in AfterConsuming, which runs on both success and failure) silently drops legitimate broker redeliveries after a handler crash — the canonical reason this redesign uses two filters rather than one.
See also
Section titled “See also”- Filters — concept
IMessageProcessingMiddleware— related referenceIPipelineConfiguration— related reference