Skip to content

IPipelineConfiguration

IPipelineConfiguration is the read-only view of the filter and middleware types registered for the consume and send pipelines. It is populated through the ServiceConnectBuilder helpers — AddBeforeConsumingFilter<T>(), AddOnConsumedSuccessfullyFilter<T>(), AddAfterConsumingFilter<T>(), AddOutgoingFilter<T>(), AddMessageProcessingMiddleware<T>(), AddSendMessageMiddleware<T>(), InsertSendMessageMiddlewareOutermost<T>(), and InsertMessageProcessingMiddlewareOutermost<T>() — and exposed so transports and diagnostics can enumerate the configured types without mutating them.

The concrete PipelineConfiguration, like the rest of the *Configuration concrete classes (BusConfiguration, TransportConfiguration, QueueConfiguration, PersistenceConfiguration), is internal. Only the I*Configuration interfaces are public — consumer code interacts with them via the builder callbacks (ConfigureTransport, ConfigureQueues, etc.) or by resolving IPipelineConfiguration (or other sub-config interfaces) directly from DI. Constructing a new PipelineConfiguration() directly is not supported from outside the framework assembly.

See Filters for the conceptual model and for when to reach for a filter over a middleware.

IReadOnlyList<Type> BeforeConsumingFilters { get; }

Gets the filters that run before handler invocation.

Remarks. Each type must implement IFilter. Filters run in registration order; returning FilterAction.Stop short-circuits the pipeline and skips the handler.


IReadOnlyList<Type> AfterConsumingFilters { get; }

Gets the filters that run after handler invocation.

Remarks. Each type must implement IFilter. Handy for post-processing concerns such as metrics emission or cleanup. Returning FilterAction.Stop from an after-consuming filter suppresses any remaining post-processing stages but does not un-handle the message — the handler has already run and the message will still be acked.


IReadOnlyList<Type> OnConsumedSuccessfullyFilters { get; }

The filters that run only after a successful handler invocation. The dispatcher invokes this stage when the chain returned ConsumeEventResult.Success = true and NotHandled = false — failures and unhandled messages skip it. Use for at-most-once side effects (dedup, outbox, audit) that depend on the handler having completed.

See IFilter.ExecuteOnConsumedSuccessfullyFiltersAsync for filter-author semantics.


IReadOnlyList<Type> OutgoingFilters { get; }

Gets the filters that run on outgoing messages.

Remarks. Each type must implement IFilter. Invoked for every publish, send, and reply.


IReadOnlyList<Type> MessageProcessingMiddleware { get; }

Gets the middleware types that wrap message processing.

Remarks. Each type must implement IMessageProcessingMiddleware. Middleware is an await next() model — preferred over filters when you need to surround the handler with a try/finally, a scope, or an activity.


IReadOnlyList<Type> SendMessageMiddleware { get; }

Gets the middleware types that wrap outgoing send and publish operations.

Remarks. Each type must implement ISendMessageMiddleware.

Adding a logging filter and an outgoing retry filter

Section titled “Adding a logging filter and an outgoing retry filter”
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport =>
{
transport.Host = "rabbit.internal.example";
transport.Username = "order-service";
transport.Password = Environment.GetEnvironmentVariable("ORDER_RABBIT_PASSWORD");
});
builder.ConfigureQueues(queues => queues.QueueName = "order-service");
builder.AddBeforeConsumingFilter<LoggingFilter>();
builder.AddOutgoingFilter<OutgoingRetryFilter>();
});
// Filters wired up above.
public sealed class LoggingFilter : IFilter
{
private readonly ILogger<LoggingFilter> _logger;
public LoggingFilter(ILogger<LoggingFilter> logger) => _logger = logger;
public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
_logger.LogInformation(
"Consuming {MessageType} {MessageId} from {SourceAddress}",
envelope.Headers.GetValueOrDefault("MessageType"),
envelope.Headers.GetValueOrDefault("MessageId"),
envelope.Headers.GetValueOrDefault("SourceAddress"));
return Task.FromResult(FilterAction.Continue);
}
}
public sealed class OutgoingRetryFilter : IFilter
{
public Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
envelope.Headers["RetryPolicy"] = "exponential";
envelope.Headers["RetryBudget"] = "5";
return Task.FromResult(FilterAction.Continue);
}
}

LoggingFilter sits on the consume side, writing a structured log line for every message the OrderService processes before it reaches the handler. OutgoingRetryFilter sits on the send side, stamping an explicit retry-policy hint onto every outbound envelope so downstream services can apply a uniform retry strategy. Both types are registered once during startup and resolved per-message from the DI container.