Skip to content

Cancellation

ServiceConnect honours the host’s shutdown token end-to-end. When IHostApplicationLifetime signals shutdown, the consumer host stops admitting new deliveries, waits for in-flight handlers to complete, and propagates the cancellation token to every async boundary inside the dispatch pipeline.

  • The handler’s CancellationToken flows from the consumer host’s lifecycle. When shutdown begins, the dispatcher rethrows OperationCanceledException from the catch path; the broker leaves the message unacked for redelivery on the next start.
  • When a handler throws both a non-OCE failure and an OperationCanceledException for the cancelled dispatch CancellationToken, the framework surfaces the OCE directly — it does not wrap it into AggregateException. Callers that previously caught AggregateException and unwrapped to detect cancellation must now catch OperationCanceledException first.
  • After-consuming filters still run in the dispatcher’s finally block on every path — even when the handler threw OCE during shutdown.
  • Audit publishes and telemetry header injection are fire-and-forget — they don’t block shutdown and don’t surface OCE as application errors.

Three rules:

Methods that accept a CancellationToken cancellationToken parameter must propagate it to every transitively-awaited call inside the method body. No default. No parameterless overloads.

// ❌ Wrong — drops the token on the downstream await.
public async Task HandleAsync(MyMessage msg, CancellationToken cancellationToken = default)
{
await _http.GetAsync("https://api.example.com/data"); // No token!
}
// ✅ Correct — forwards the token.
public async Task HandleAsync(MyMessage msg, CancellationToken cancellationToken = default)
{
await _http.GetAsync("https://api.example.com/data", cancellationToken);
}

Catch blocks that wrap a cancellable operation must include an OCE filter ahead of any generic catch (Exception):

// ❌ Wrong — turns shutdown into an application error.
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
try
{
await DoSomethingAsync(cancellationToken);
return FilterAction.Continue;
}
catch (Exception ex)
{
_logger.LogError(ex, "Filter failed");
return FilterAction.Stop;
}
}
// ✅ Correct — shutdown propagates; application errors are caught.
public async Task<FilterAction> ProcessAsync(Envelope envelope, CancellationToken cancellationToken = default)
{
try
{
await DoSomethingAsync(cancellationToken);
return FilterAction.Continue;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Filter failed");
return FilterAction.Stop;
}
}

Dispose / teardown paths and observability publishes (DisposeAsync, CloseAsync of resources we’re tearing down, audit publish, telemetry inject) are deliberately fire-and-forget. They may accept a cancellation token to honour a cooperative-shutdown deadline (so they can’t block shutdown indefinitely), but they do NOT surface OCE as application errors — an OCE during these paths is swallowed (logged Debug) so the surrounding work can complete its normal flow. Cancellation cannot be used to abort one of these paths and short-circuit subsequent work.

OCE handlers in these paths log at Debug, not Error — an OCE is expected in this regime, not an error.

The bus’s whitelist (you don’t extend this in your own code):

  • IBus.DisposeAsync and the consumer/producer disposal cascade.
  • Audit publish (handler succeeded; audit is observability metadata).
  • Telemetry trace-context injection.

If you find yourself wanting a similar fire-and-forget block in your own code, your code probably has a cancellation bug.

public sealed class MyMiddleware : IMessageProcessingMiddleware
{
public async Task<ConsumeEventResult> ProcessAsync(
ReadOnlyMemory<byte> messageBytes,
Type messageType,
object message,
IDictionary<string, object> headers,
Envelope envelope,
MessageProcessingDelegate next,
CancellationToken cancellationToken)
{
try
{
// Pre-handler work uses the token.
await BeforeAsync(message, cancellationToken);
// Forward the token to next.
var result = await next(messageBytes, messageType, message, headers, envelope, cancellationToken);
// Post-handler work uses the token.
await AfterAsync(result, cancellationToken);
return result;
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Cooperative shutdown — propagate so the dispatcher's outer finally still runs
// AfterConsumingFilters but the message stays unacked for redelivery.
throw;
}
catch (Exception ex)
{
_logger.LogError(ex, "Middleware failed");
throw;
}
}
}