Skip to content

IBusConfiguration

IBusConfiguration is the bus-wide configuration surface. It holds the knobs that govern handler discovery, consumer concurrency, process-manager timeouts, reply validation, and routing-slip processing. It is exposed through the builder delegate passed to AddServiceConnect — you mutate it via builder.ConfigureBus(b => …) and via the sub-configuration helpers that wrap transport, queues, persistence, and pipeline.

See Configuration for the bigger picture.

bool ScanForMessageHandlers { get; set; }

Controls whether AddServiceConnect scans the configured assemblies for message-handler types. When false, handlers must be registered explicitly.

Remarks. Defaults to scanning the assemblies supplied via ServiceConnectBuilder.ScanAssemblies(...), or the current app domain if none were supplied. Explicit registration is preferred for deterministic, testable startup.


bool AutoStartConsuming { get; set; }

Controls whether the hosted service calls IBus.StartConsumingAsync automatically when the host starts. Disable when your process produces messages but does not consume any, or when tests need to start consumption manually.


bool EnableProcessManagerTimeouts { get; set; }

Controls whether the timeout-polling hosted service dispatches scheduled TimeoutMessage deliveries for process managers.

Remarks. Requires an ITimeoutStore registration (for example via UseMongoDbPersistence) to do useful work. With the flag set but no store registered, the polling hosted service logs a Warning at start-up and exits its poll loop — no exception is thrown, but no timeouts will be dispatched.


TimeSpan ProcessManagerTimeoutPollInterval { get; set; }

Sets how often the timeout-polling hosted service scans the timeout store for due timeouts.

Remarks. Shorter intervals reduce latency between scheduled delivery and actual delivery at the cost of more database round-trips. A few seconds is typical.


int ConsumerCount { get; set; }

Sets the number of consumer loops that run in parallel against the configured queue.

Remarks. Higher values improve throughput for handlers that spend time in I/O; they do not improve throughput for CPU-bound handlers and increase memory footprint per consumer.


Func<Exception, CancellationToken, ValueTask>? ExceptionHandler { get; set; }

Sets an async callback invoked when handler processing throws. Use for push-based telemetry or alerting that sits outside the normal logging pipeline.

Remarks. If the callback itself throws, the exception is caught and logged at Error level by the dispatcher; a flaky notification hook cannot block message processing. For structured observability, prefer the pipeline’s middleware and filters. The CancellationToken parameter is the consumer-loop token; observe it if the callback performs async I/O.

Wrap a synchronous side-effect with a completed ValueTask:

cfg.ExceptionHandler = (ex, _) => { Log.Error(ex); return ValueTask.CompletedTask; };

If the callback performs async work, await it and propagate the token:

cfg.ExceptionHandler = async (ex, ct) => await _alerting.PushAsync(ex, ct);

bool IncludeMachineNameInHeaders { get; set; }

When true, stamps Environment.MachineName into outgoing SourceMachine and incoming DestinationMachine headers.

Remarks. Defaults to false. Leaking an internal hostname to broker audit consumers is information disclosure in shared-broker deployments.


bool ValidateReplyDestinations { get; set; }

When true, IConsumeContext.ReplyAsync<TReply> validates that the incoming SourceAddress header points to a queue known from IQueueConfiguration.QueueMappings, QueueName, ErrorQueueName, or AuditQueueName. Set to false to allow replies to arbitrary queue names.

Remarks. Defaults to true. Leaving it enabled prevents a compromised or misbehaving upstream from coercing your service into replying to an attacker-controlled queue.


bool EnableRoutingSlipProcessing { get; set; }

When true, the handler processor forwards messages along the destinations listed in the RoutingSlip header after each hop’s handler completes. When false, routing-slip headers are silently ignored.

Remarks. Defaults to true. Destinations are validated by format only (non-null, non-empty, no embedded commas); cross-service destinations are permitted without any local queue registration.


bool DeadLetterUnhandledMessages { get; set; }

When true, messages that the dispatcher runs to completion on but which no processor claims — see ConsumeEventResult.NotHandled — are published to the error exchange instead of silently acked.

Remarks. Defaults to false, preserving historical behaviour where unhandled messages are logged and acked. Enable when a handler-less message should be treated as a terminal failure for operator visibility.


bool StrictReplyValidation { get; set; }

When true, only locally-tracked request-reply exchanges are trusted as reply destinations; the header-based fallback that allows cross-bus callers to be recognised is disabled. When false (the default), any inbound message whose headers match the cross-bus heuristic — a non-empty RequestMessageId, a SourceAddress, a MessageId, no ResponseMessageId, and DestinationAddress equal to this queue — is also trusted as a request envelope.

Remarks. Defaults to false, preserving backward compatibility with cross-bus request-reply where a request originated on a different bus instance and the local RequestReplyManager has no record of it. The header-based fallback can be spoofed by any external producer that knows the queue name. Set to true when the service does not participate in cross-bus request-reply, or when all upstream callers are verified to route through a tracked RequestReplyManager.


int MaxRoutingSlipHops { get; set; }

Caps the number of destinations the routing-slip processor will forward a message through before rejecting the slip. Defaults to 32.

Remarks. If the inbound slip carries more than this many destinations, the handler throws InvalidOperationException and the message follows the standard error path (retries → error queue). A misbehaving handler that re-prepends destinations could otherwise loop a message indefinitely. The cap is intentionally generous — typical routing slips have 3–6 hops; the limit catches accidents without constraining legitimate patterns.


TimeSpan DisposeTimeout { get; set; }

The maximum time IBus.DisposeAsync waits for the lifecycle semaphore before proceeding with teardown anyway. Guards against a wedged StartConsumingAsync (e.g., broker partition during handshake) that would otherwise block container shutdown. Defaults to 30 seconds.

Remarks. Set via the builder’s ConfigureBus callback (bus.DisposeTimeout = ...); the value is frozen at startup. Must be a strictly positive TimeSpan, or Timeout.InfiniteTimeSpan to wait indefinitely; zero or negative values are rejected at startup.


int MaxInflightRequests { get; set; }

Caps the number of concurrent request-reply exchanges the RequestReplyManager will track before rejecting new SendRequestAsync / SendRequestMultiAsync / PublishRequestAsync calls with InvalidOperationException. Defaults to 10,000.

Remarks. Each in-flight request pins a Timer, a CancellationTokenSource, a TaskCompletionSource, and a cancellation registration closure. The cap defends against unbounded memory growth from Timeout.Infinite callers that never wake up and from hot loops of unawaited requests. Raise for genuine high-concurrency request-fan workloads (parallel saga calls, fan-out aggregations); lower to harden against caller bugs in tight-budget hosts. Startup throws if a non-positive value is configured.


long MaxStreamSizeBytes { get; set; }

Caps the cumulative byte count a single inbound stream may reassemble before MessageBusReadStream.Write throws InvalidOperationException. Defaults to 100 MB (104,857,600 bytes).

Remarks. Every admitted stream pins a ConcurrentDictionary<long, byte[]> and a running total — without a ceiling, a hostile or buggy producer that never closes its stream would drive unbounded heap growth on the receiver. Raise for deployments that legitimately stream large artefacts (file uploads, ML model weights, image batches); lower to harden memory-constrained hosts where 100 MB per concurrent stream is too generous. Startup throws if a non-positive value is configured.


int MaxActiveStreams { get; set; }

Caps the number of concurrently-tracked partial inbound streams StreamProcessor will admit before rejecting new sequences (warning log + drop). Defaults to 1,000.

Remarks. Each admitted stream holds a MessageBusReadStream instance, a packet dictionary, and a per-sequence record in the processor’s active-stream map until completion, eviction, or fault. Without a slot ceiling, a producer (hostile or buggy) that opens streams without ever completing them would drive unbounded growth on the receiver. Raise for high-concurrency file-transfer workloads where 1,000 simultaneous in-flight reassemblies is too tight; lower to harden memory-constrained hosts. Snapshotted into the processor at construction — the configuration is frozen by the time the processor resolves from DI, so changes after AddServiceConnect returns are rejected. Startup throws if a non-positive value is configured.


bool AllowMissingProducer { get; set; }

When false (the default), BusHostedService.StartAsync throws InvalidOperationException at host start if no IProducer has been registered, surfacing a missing transport at host build time rather than at the first publish, send, or CreateStream call.

Remarks. Defaults to false. Set to true only in tests or specialised in-memory scenarios that legitimately operate without a producer — for example, consume-only buses that never publish or send.

Configuring a RabbitMQ bus with a named queue

Section titled “Configuring a RabbitMQ bus with a named queue”
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport =>
{
transport.Host = "rabbitmq.internal";
transport.Username = "order-service";
transport.Password = "";
});
builder.ConfigureQueues(queues =>
{
queues.QueueName = "order-service";
queues.ErrorQueueName = "order-service.errors";
});
builder.ConfigureBus(bus =>
{
bus.ConsumerCount = 4;
bus.AutoStartConsuming = true;
bus.EnableProcessManagerTimeouts = false;
});
});

This wires the OrderService process up to consume from its own queue on a shared RabbitMQ broker. Four parallel consumer loops give handlers headroom while keeping the memory footprint modest, and timeouts are left disabled because this service has no sagas that schedule them.