IPipelineConfiguration
Overview
Section titled “Overview”IPipelineConfiguration is the read-only view of the filter and middleware types registered for the consume and send pipelines. It is populated through the ServiceConnectBuilder helpers — AddBeforeConsumingFilter<T>(), AddOnConsumedSuccessfullyFilter<T>(), AddAfterConsumingFilter<T>(), AddOutgoingFilter<T>(), AddMessageProcessingMiddleware<T>(), AddSendMessageMiddleware<T>(), InsertSendMessageMiddlewareOutermost<T>(), and InsertMessageProcessingMiddlewareOutermost<T>() — and exposed so transports and diagnostics can enumerate the configured types without mutating them.
The concrete PipelineConfiguration, like the rest of the *Configuration concrete classes (BusConfiguration, TransportConfiguration, QueueConfiguration, PersistenceConfiguration), is internal. Only the I*Configuration interfaces are public — consumer code interacts with them via the builder callbacks (ConfigureTransport, ConfigureQueues, etc.) or by resolving IPipelineConfiguration (or other sub-config interfaces) directly from DI. Constructing a new PipelineConfiguration() directly is not supported from outside the framework assembly.
See Filters for the conceptual model and for when to reach for a filter over a middleware.
Reference
Section titled “Reference”BeforeConsumingFilters
Section titled “BeforeConsumingFilters”IReadOnlyList<Type> BeforeConsumingFilters { get; }Gets the filters that run before handler invocation.
Remarks. Each type must implement IFilter. Filters run in registration order; returning FilterAction.Stop short-circuits the pipeline and skips the handler.
AfterConsumingFilters
Section titled “AfterConsumingFilters”IReadOnlyList<Type> AfterConsumingFilters { get; }Gets the filters that run after handler invocation.
Remarks. Each type must implement IFilter. Handy for post-processing concerns such as metrics emission or cleanup. Returning FilterAction.Stop from an after-consuming filter suppresses any remaining post-processing stages but does not un-handle the message — the handler has already run and the message will still be acked.
OnConsumedSuccessfullyFilters
Section titled “OnConsumedSuccessfullyFilters”IReadOnlyList<Type> OnConsumedSuccessfullyFilters { get; }The filters that run only after a successful handler invocation. The
dispatcher invokes this stage when the chain returned
ConsumeEventResult.Success = true and NotHandled = false — failures and
unhandled messages skip it. Use for at-most-once side effects (dedup,
outbox, audit) that depend on the handler having completed.
See IFilter.ExecuteOnConsumedSuccessfullyFiltersAsync for filter-author semantics.
OutgoingFilters
Section titled “OutgoingFilters”IReadOnlyList<Type> OutgoingFilters { get; }Gets the filters that run on outgoing messages.
Remarks. Each type must implement IFilter. Invoked for every publish, send, and reply.
MessageProcessingMiddleware
Section titled “MessageProcessingMiddleware”IReadOnlyList<Type> MessageProcessingMiddleware { get; }Gets the middleware types that wrap message processing.
Remarks. Each type must implement IMessageProcessingMiddleware. Middleware is an await next() model — preferred over filters when you need to surround the handler with a try/finally, a scope, or an activity.
SendMessageMiddleware
Section titled “SendMessageMiddleware”IReadOnlyList<Type> SendMessageMiddleware { get; }Gets the middleware types that wrap outgoing send and publish operations.
Remarks. Each type must implement ISendMessageMiddleware.
Adding a logging filter and an outgoing retry filter
Section titled “Adding a logging filter and an outgoing retry filter”services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => { transport.Host = "rabbit.internal.example"; transport.Username = "order-service"; transport.Password = Environment.GetEnvironmentVariable("ORDER_RABBIT_PASSWORD"); });
builder.ConfigureQueues(queues => queues.QueueName = "order-service");
builder.AddBeforeConsumingFilter<LoggingFilter>(); builder.AddOutgoingFilter<OutgoingRetryFilter>();});
// Filters wired up above.public sealed class LoggingFilter : IFilter{ private readonly ILogger<LoggingFilter> _logger;
public LoggingFilter(ILogger<LoggingFilter> logger) => _logger = logger;
public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { _logger.LogInformation( "Consuming {MessageType} {MessageId} from {SourceAddress}", envelope.Headers.GetValueOrDefault("MessageType"), envelope.Headers.GetValueOrDefault("MessageId"), envelope.Headers.GetValueOrDefault("SourceAddress")); return Task.FromResult(FilterAction.Continue); }}
public sealed class OutgoingRetryFilter : IFilter{ public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default) { envelope.Headers["RetryPolicy"] = "exponential"; envelope.Headers["RetryBudget"] = "5"; return Task.FromResult(FilterAction.Continue); }}LoggingFilter sits on the consume side, writing a structured log line for every message the OrderService processes before it reaches the handler. OutgoingRetryFilter sits on the send side, stamping an explicit retry-policy hint onto every outbound envelope so downstream services can apply a uniform retry strategy. Both types are registered once during startup and resolved per-message from the DI container.
See also
Section titled “See also”- Filters — concept
IFilter— related referenceIMessageProcessingMiddleware— related referenceISendMessageMiddleware— related reference