Cancellation
ServiceConnect honours the host’s shutdown token end-to-end. When IHostApplicationLifetime signals shutdown, the consumer host stops admitting new deliveries, waits for in-flight handlers to complete, and propagates the cancellation token to every async boundary inside the dispatch pipeline.
What the bus promises
Section titled “What the bus promises”- The handler’s
CancellationTokenflows from the consumer host’s lifecycle. When shutdown begins, the dispatcher rethrowsOperationCanceledExceptionfrom the catch path; the broker leaves the message unacked for redelivery on the next start. - When a handler throws both a non-OCE failure and an
OperationCanceledExceptionfor the cancelled dispatchCancellationToken, the framework surfaces the OCE directly — it does not wrap it intoAggregateException. Callers that previously caughtAggregateExceptionand unwrapped to detect cancellation must now catchOperationCanceledExceptionfirst. - After-consuming filters still run in the dispatcher’s
finallyblock on every path — even when the handler threw OCE during shutdown. - Audit publishes and telemetry header injection are fire-and-forget — they don’t block shutdown and don’t surface OCE as application errors.
What your code must do
Section titled “What your code must do”Three rules:
1. Token propagation
Section titled “1. Token propagation”Methods that accept a CancellationToken cancellationToken parameter must propagate it to every transitively-awaited call inside the method body. No default. No parameterless overloads.
// ❌ Wrong — drops the token on the downstream await.public async Task HandleAsync(MyMessage msg, CancellationToken cancellationToken = default){ await _http.GetAsync("https://api.example.com/data"); // No token!}
// ✅ Correct — forwards the token.public async Task HandleAsync(MyMessage msg, CancellationToken cancellationToken = default){ await _http.GetAsync("https://api.example.com/data", cancellationToken);}2. OCE filter discipline
Section titled “2. OCE filter discipline”Catch blocks that wrap a cancellable operation must include an OCE filter ahead of any generic catch (Exception):
// ❌ Wrong — turns shutdown into an application error.public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default){ try { await DoSomethingAsync(cancellationToken); return FilterAction.Continue; } catch (Exception ex) { _logger.LogError(ex, "Filter failed"); return FilterAction.Stop; }}
// ✅ Correct — shutdown propagates; application errors are caught.public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default){ try { await DoSomethingAsync(cancellationToken); return FilterAction.Continue; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (Exception ex) { _logger.LogError(ex, "Filter failed"); return FilterAction.Stop; }}3. Fire-and-forget cleanup whitelist
Section titled “3. Fire-and-forget cleanup whitelist”Dispose / teardown paths and observability publishes (DisposeAsync, CloseAsync of resources we’re tearing down, audit publish, telemetry inject) are deliberately fire-and-forget. They may accept a cancellation token to honour a cooperative-shutdown deadline (so they can’t block shutdown indefinitely), but they do NOT surface OCE as application errors — an OCE during these paths is swallowed (logged Debug) so the surrounding work can complete its normal flow. Cancellation cannot be used to abort one of these paths and short-circuit subsequent work.
OCE handlers in these paths log at Debug, not Error — an OCE is expected in this regime, not an error.
The bus’s whitelist (you don’t extend this in your own code):
IBus.DisposeAsyncand the consumer/producer disposal cascade.- Audit publish (handler succeeded; audit is observability metadata).
- Telemetry trace-context injection.
If you find yourself wanting a similar fire-and-forget block in your own code, your code probably has a cancellation bug.
Worked example: middleware
Section titled “Worked example: middleware”public sealed class MyMiddleware : IMessageProcessingMiddleware{ public async Task<ConsumeEventResult> ProcessAsync( ReadOnlyMemory<byte> messageBytes, Type messageType, object message, IDictionary<string, object> headers, Envelope envelope, MessageProcessingDelegate next, CancellationToken cancellationToken) { try { // Pre-handler work uses the token. await BeforeAsync(message, cancellationToken);
// Forward the token to next. var result = await next(messageBytes, messageType, message, headers, envelope, cancellationToken);
// Post-handler work uses the token. await AfterAsync(result, cancellationToken);
return result; } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { // Cooperative shutdown — propagate so the dispatcher's outer finally still runs // AfterConsumingFilters but the message stays unacked for redelivery. throw; } catch (Exception ex) { _logger.LogError(ex, "Middleware failed"); throw; } }}See also
Section titled “See also”IFilter— pipeline contract.IMessageProcessingMiddleware— handler-wrapping middleware contract.- Hosting & Lifecycle — how the host coordinates shutdown.