Skip to content

IBus

IBus is the runtime surface for moving messages. A handler, service, or controller resolves IBus from DI and uses it to publish domain events, send commands to a known endpoint, issue a request and await a reply, route a message through a list of destinations, stream a large payload, schedule a process-manager timeout, and start or stop message consumption. You configure it via AddServiceConnect; you use it by resolving the interface.

See The Bus for the conceptual tour.

Task PublishAsync<T>(
T message,
PublishOptions? options = null,
CancellationToken cancellationToken = default)
where T : Message;

Broadcasts a message to every subscriber of the message type. Use for domain events that any number of interested services may consume.

Parameters

  • message — the event instance to publish; must derive from Message.
  • options — optional headers, routing key override, priority, or per-call overrides.
  • cancellationToken — cancels the publish before the broker acknowledges.

Throws

  • ArgumentNullExceptionmessage is null.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop before the message reached the transport.

Remarks. Publish is fire-and-forget from the publisher’s perspective; it completes once the broker accepts the message, not when subscribers process it. There is no built-in guarantee that any subscriber exists.


Task SendAsync<T>(
T message,
SendOptions? options = null,
CancellationToken cancellationToken = default)
where T : Message;

Sends a message to a specific endpoint or to the queue resolved from the configured queue mappings. Use for commands with a single owner.

Parameters

  • message — the command instance; must derive from Message.
  • options — optional destination override, headers, or priority.
  • cancellationToken — cancels the send before the broker acknowledges.

Throws

  • ArgumentNullExceptionmessage is null.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop before the message reached the transport.

Remarks. If no destination is supplied via options and no queue mapping is registered for the message type, the call throws.


Task SendToManyAsync<T>(
T message,
IReadOnlyList<string> endPoints,
SendOptions? options = null,
CancellationToken cancellationToken = default)
where T : Message;

Sends message to every endpoint in endPoints as an independent point-to-point delivery. Fan-out is an explicit per-call list rather than a property on SendOptions.

Parameters

  • message — the command instance; must derive from Message.
  • endPoints — the destination queue names to send to. Must be non-null and non-empty; each entry is forwarded as a single-endpoint send. SendOptions.EndPoint (singular) on options is ignored — the explicit list always wins.
  • options — optional headers shared across every delivery in the fan-out. Per-endpoint headers cannot diverge.
  • cancellationToken — cancels the fan-out mid-flight. The cancellation must come from the caller’s token to short-circuit; middleware-internal OperationCanceledException with a different token falls through to the per-endpoint failure path.

Throws

  • ArgumentNullExceptionmessage or endPoints is null.
  • ArgumentExceptionendPoints is empty.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop before the message reached the transport.
  • AggregateException — one or more endpoints failed. Inner exceptions are the per-endpoint failures in the order they occurred. If caller cancellation fired after some endpoints had already failed, the OperationCanceledException appears at slot 0 of the aggregate so callers can distinguish “cancelled with prior failures” from “cancelled clean”.
  • OperationCanceledException — caller cancelled before any endpoint failed.

Remarks. Each delivery emits its own SendEventArgs for telemetry — subscribers correlate fan-out by CorrelationId, which is stable across the deliveries. The per-endpoint loop short-circuits on caller cancellation but otherwise runs every endpoint to completion (success or failure) so a single broken endpoint cannot block the rest.


Task<TReply> SendRequestAsync<TRequest, TReply>(
TRequest message,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;

Sends a request to a single endpoint and awaits exactly one reply. Use when the caller needs a synchronous-looking answer from a single respondent.

Parameters

  • message — the request payload; must derive from Message.
  • options — optional timeout, destination override, or correlation hints.
  • cancellationToken — cancels the wait; the request may still have been delivered.

Returns. The single reply returned by the respondent.

Exceptions

  • ArgumentNullExceptionmessage is null.
  • ArgumentOutOfRangeExceptionoptions.Timeout is negative (other than Timeout.Infinite, the int constant -1) or zero.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop.
  • RequestSendCancelledException — the outbound send pipeline cancelled before the request reached the broker. Distinct from caller-token cancellation (OperationCanceledException).
  • RequestTimeoutException — no reply arrived within options.Timeout.
  • OperationCanceledException — the caller’s cancellation token fired.

Remarks. If the reply does not arrive before the configured timeout, the task faults. The reply type must derive from Message.


Task<IList<TReply>> SendRequestMultiAsync<TRequest, TReply>(
TRequest message,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;

Sends a request that may fan out to multiple respondents and collects every reply received before the deadline. Use for scatter-gather over a known endpoint list.

Parameters

  • message — the request payload; must derive from Message.
  • options — expected reply count and timeout are typically set here.
  • cancellationToken — cancels the aggregate wait.

Returns. The list of replies that arrived before the timeout or the expected count was reached.

Exceptions

  • ArgumentNullExceptionmessage is null.
  • ArgumentOutOfRangeExceptionoptions.Timeout is negative (other than Timeout.Infinite, the int constant -1) or zero.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop.
  • RequestSendCancelledException — the outbound send pipeline cancelled before the request reached the broker. Distinct from caller-token cancellation (OperationCanceledException).
  • RequestTimeoutException — no reply arrived within options.Timeout, or fewer than options.ExpectedReplyCount replies arrived (when positive). Partials are exposed on RequestTimeoutException.PartialReplies.
  • OperationCanceledException — the caller’s cancellation token fired.

Remarks. Returns whatever replies arrived — a partial result is a valid outcome. Inspect the list length against RequestOptions.ExpectedReplyCount to decide whether the scatter-gather succeeded.


Task PublishRequestAsync<TRequest, TReply>(
TRequest message,
Action<TReply> onReply,
RequestOptions? options = null,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;

Publishes a request to every subscriber of the request type and invokes onReply each time a reply arrives. Use for streaming scatter-gather where replies should be processed as they arrive rather than collected into a list.

Parameters

  • message — the request payload; must derive from Message.
  • onReply — a delegate invoked for each reply, on a thread-pool thread.
  • options — expected reply count and timeout.
  • cancellationToken — stops dispatching replies to the callback.

Exceptions

  • ArgumentNullExceptionmessage or onReply is null.
  • ArgumentExceptionoptions.EndPoint is non-empty; use SendRequestAsync for single-destination requests.
  • ArgumentOutOfRangeExceptionoptions.Timeout is negative (other than Timeout.Infinite, the int constant -1) or zero.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop.
  • RequestTimeoutException — no replies arrived within options.Timeout, or fewer than options.ExpectedReplyCount replies arrived.
  • OperationCanceledException — the caller’s cancellation token fired, or an onReply invocation threw and propagated through the awaited task.

Remarks. The callback is invoked one-at-a-time per request (serialized under an internal lock). Keep it short-lived and non-blocking — heavy work or further bus calls from inside the callback can stall reply processing for the same request. The task completes when the expected reply count is reached or the timeout elapses.


Task RouteAsync<T>(
T message,
IReadOnlyList<string> destinations,
CancellationToken cancellationToken = default)
where T : Message;

Starts a routing slip: sends the message to the first destination with a routing-slip header listing the remaining hops. Each handler forwards to the next destination when it completes.

Parameters

  • message — the payload to route; must derive from Message.
  • destinations — an ordered list of queue names the message should visit.
  • cancellationToken — cancels the initial send.

Throws

  • ArgumentNullExceptionmessage or destinations is null.
  • ArgumentExceptiondestinations is empty or contains an entry with a comma (the routing-slip separator) or that otherwise fails destination validation.
  • ObjectDisposedException — the bus has been disposed.
  • OutgoingFiltersBlockedException — an outgoing filter returned FilterAction.Stop before the message reached the transport.

Remarks. Routing-slip forwarding is handled by the consumer side and requires IBusConfiguration.EnableRoutingSlipProcessing (the default) to be true on every intermediate service.


IMessageBusWriteStream CreateStream<T>(string endpoint) where T : Message;

Opens a write stream for sending a large payload as a sequence of ordered chunks to a single endpoint.

Parameters

  • endpoint — the queue name to stream to.

Returns. A write stream the caller disposes to signal end-of-stream.

Throws

  • ArgumentExceptionendpoint is null or whitespace.
  • ObjectDisposedException — the bus has been disposed.
  • InvalidOperationException — no IProducer is registered in the bus’s DI graph.

Remarks. Each stream uses its own sequence number; consumers must use the matching IStreamHandler<T> to reassemble. Dispose the stream even on failure to release broker resources.


Task StartConsumingAsync(CancellationToken cancellationToken = default);

Starts consuming messages from the bus’s configured queue. The hosted service calls this automatically when IBusConfiguration.AutoStartConsuming is true; call it yourself when you have disabled auto-start.

Parameters

  • cancellationToken — cancels the startup handshake.

Remarks. Throws InvalidOperationException if the bus is already consuming or has previously been stopped.


Task StopConsumingAsync(CancellationToken cancellationToken = default);

Stops the consumer loop and disposes the underlying consumer connection.

Parameters

  • cancellationToken — bounds the graceful-shutdown wait.

bool IsConsuming { get; }

Indicates whether the bus is currently consuming messages. Useful in health checks and tests that must wait for the consumer to become ready.

Returns false before StartConsumingAsync has been called and after StopConsumingAsync returns. It also returns false when the broker has cancelled the consumer (queue deleted, policy expired, mirror promoted) — the underlying transport surfaces AMQP’s basic.cancel event and ServiceConnect propagates it through this property. Callers using IsConsuming for purposes other than health-checking should be aware of the broker-cancel case.


bool IsCancelledByBroker { get; }

Indicates whether the broker has cancelled the consumer — typically because the queue was deleted, a policy expired, or a mirror was promoted. This is a permanent broker-side failure distinct from a transient connection flap.

The property mirrors the underlying IConsumer.IsCancelledByBroker flag at the bus level so callers such as BusConsumingHealthCheck can distinguish a broker-initiated cancellation (basic.cancel) from a network reconnect that may self-heal. The default interface-method implementation returns false; framework-supplied buses override it.


bool IsStopped { get; }

Indicates whether the bus has been stopped or is in the process of being disposed.

Distinct from IsConsuming: IsConsuming also flips to false during a transient broker disconnect that the health check’s recovery-grace window may absorb, whereas IsStopped flips to true permanently once StopConsumingAsync or DisposeAsync has run. BusConsumingHealthCheck uses this flag to bypass its grace window and report Unhealthy immediately on intentional shutdown. The default interface-method implementation returns false; framework-supplied buses override it.


Task RequestTimeoutAsync(
Guid correlationId,
TimeSpan delay,
CancellationToken cancellationToken = default);

Schedules a TimeoutMessage to be delivered back to the current bus’s queue after delay. The delivered message’s CorrelationId equals the supplied correlationId, which is the standard key a process manager uses to correlate a timeout with its saga instance.

Parameters

  • correlationId — the correlation id stamped on the delivered TimeoutMessage.
  • delay — how long to wait before delivery.
  • cancellationToken — cancels the scheduling call, not the timeout itself.

Throws

  • ArgumentExceptioncorrelationId is Guid.Empty.
  • ArgumentOutOfRangeExceptiondelay is less than or equal to TimeSpan.Zero.
  • InvalidOperationException — no ITimeoutStore is registered (first-party Bus only; see the Aside below for the default-interface-method path).
  • ObjectDisposedException — the bus has been disposed.
public sealed class OrderService
{
private readonly IBus _bus;
public OrderService(IBus bus) => _bus = bus;
public async Task PlaceOrderAsync(PlaceOrder command, CancellationToken cancellationToken)
{
// … persist the order, charge the card, etc. …
await _bus.PublishAsync(
new OrderPlaced(command.CorrelationId)
{
OrderId = command.OrderId,
CustomerId = command.CustomerId,
Total = command.Total
},
cancellationToken: cancellationToken);
}
}

OrderPlaced is a domain event; any number of downstream services (shipping, invoicing, analytics) may subscribe. The publisher does not know or care who listens, and does not wait for subscribers to process the message.

public sealed class ShippingSaga : IProcessHandler<ShippingState, OrderPlaced>
{
public async Task HandleAsync(
OrderPlaced @event,
ShippingState data,
IConsumeContext context,
CancellationToken cancellationToken = default)
{
var quote = await context.Bus.SendRequestAsync<QuoteShipping, ShippingQuote>(
new QuoteShipping(@event.CorrelationId)
{
OrderId = @event.OrderId,
Destination = @event.ShippingAddress
},
cancellationToken: cancellationToken);
data.QuotedCost = quote.Cost;
data.CarrierCode = quote.CarrierCode;
}
}

Use SendRequestAsync when there is exactly one respondent and the caller needs the reply in-line. The await will fault if the reply does not arrive within RequestOptions.Timeout, so set a timeout that matches the respondent’s worst-case latency and let the saga compensate via a timeout handler when it trips.