IBus
Overview
Section titled “Overview”IBus is the runtime surface for moving messages. A handler, service, or controller resolves IBus from DI and uses it to publish domain events, send commands to a known endpoint, issue a request and await a reply, route a message through a list of destinations, stream a large payload, schedule a process-manager timeout, and start or stop message consumption. You configure it via AddServiceConnect; you use it by resolving the interface.
See The Bus for the conceptual tour.
Reference
Section titled “Reference”PublishAsync<T>
Section titled “PublishAsync<T>”Task PublishAsync<T>( T message, PublishOptions? options = null, CancellationToken cancellationToken = default) where T : Message;Broadcasts a message to every subscriber of the message type. Use for domain events that any number of interested services may consume.
Parameters
message— the event instance to publish; must derive fromMessage.options— optional headers, routing key override, priority, or per-call overrides.cancellationToken— cancels the publish before the broker acknowledges.
Throws
ArgumentNullException—messageisnull.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stopbefore the message reached the transport.
Remarks. Publish is fire-and-forget from the publisher’s perspective; it completes once the broker accepts the message, not when subscribers process it. There is no built-in guarantee that any subscriber exists.
SendAsync<T>
Section titled “SendAsync<T>”Task SendAsync<T>( T message, SendOptions? options = null, CancellationToken cancellationToken = default) where T : Message;Sends a message to a specific endpoint or to the queue resolved from the configured queue mappings. Use for commands with a single owner.
Parameters
message— the command instance; must derive fromMessage.options— optional destination override, headers, or priority.cancellationToken— cancels the send before the broker acknowledges.
Throws
ArgumentNullException—messageisnull.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stopbefore the message reached the transport.
Remarks. If no destination is supplied via options and no queue mapping is registered for the message type, the call throws.
SendToManyAsync<T>
Section titled “SendToManyAsync<T>”Task SendToManyAsync<T>( T message, IReadOnlyList<string> endPoints, SendOptions? options = null, CancellationToken cancellationToken = default) where T : Message;Sends message to every endpoint in endPoints as an independent point-to-point delivery. Fan-out is an explicit per-call list rather than a property on SendOptions.
Parameters
message— the command instance; must derive fromMessage.endPoints— the destination queue names to send to. Must be non-null and non-empty; each entry is forwarded as a single-endpoint send.SendOptions.EndPoint(singular) onoptionsis ignored — the explicit list always wins.options— optional headers shared across every delivery in the fan-out. Per-endpoint headers cannot diverge.cancellationToken— cancels the fan-out mid-flight. The cancellation must come from the caller’s token to short-circuit; middleware-internalOperationCanceledExceptionwith a different token falls through to the per-endpoint failure path.
Throws
ArgumentNullException—messageorendPointsisnull.ArgumentException—endPointsis empty.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stopbefore the message reached the transport.AggregateException— one or more endpoints failed. Inner exceptions are the per-endpoint failures in the order they occurred. If caller cancellation fired after some endpoints had already failed, theOperationCanceledExceptionappears at slot 0 of the aggregate so callers can distinguish “cancelled with prior failures” from “cancelled clean”.OperationCanceledException— caller cancelled before any endpoint failed.
Remarks. Each delivery emits its own SendEventArgs for telemetry — subscribers correlate fan-out by CorrelationId, which is stable across the deliveries. The per-endpoint loop short-circuits on caller cancellation but otherwise runs every endpoint to completion (success or failure) so a single broken endpoint cannot block the rest.
SendRequestAsync<TRequest, TReply>
Section titled “SendRequestAsync<TRequest, TReply>”Task<TReply> SendRequestAsync<TRequest, TReply>( TRequest message, RequestOptions? options = null, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;Sends a request to a single endpoint and awaits exactly one reply. Use when the caller needs a synchronous-looking answer from a single respondent.
Parameters
message— the request payload; must derive fromMessage.options— optional timeout, destination override, or correlation hints.cancellationToken— cancels the wait; the request may still have been delivered.
Returns. The single reply returned by the respondent.
Exceptions
ArgumentNullException—messageisnull.ArgumentOutOfRangeException—options.Timeoutis negative (other thanTimeout.Infinite, theintconstant-1) or zero.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stop.RequestSendCancelledException— the outbound send pipeline cancelled before the request reached the broker. Distinct from caller-token cancellation (OperationCanceledException).RequestTimeoutException— no reply arrived withinoptions.Timeout.OperationCanceledException— the caller’s cancellation token fired.
Remarks. If the reply does not arrive before the configured timeout, the task faults. The reply type must derive from Message.
SendRequestMultiAsync<TRequest, TReply>
Section titled “SendRequestMultiAsync<TRequest, TReply>”Task<IList<TReply>> SendRequestMultiAsync<TRequest, TReply>( TRequest message, RequestOptions? options = null, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;Sends a request that may fan out to multiple respondents and collects every reply received before the deadline. Use for scatter-gather over a known endpoint list.
Parameters
message— the request payload; must derive fromMessage.options— expected reply count and timeout are typically set here.cancellationToken— cancels the aggregate wait.
Returns. The list of replies that arrived before the timeout or the expected count was reached.
Exceptions
ArgumentNullException—messageisnull.ArgumentOutOfRangeException—options.Timeoutis negative (other thanTimeout.Infinite, theintconstant-1) or zero.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stop.RequestSendCancelledException— the outbound send pipeline cancelled before the request reached the broker. Distinct from caller-token cancellation (OperationCanceledException).RequestTimeoutException— no reply arrived withinoptions.Timeout, or fewer thanoptions.ExpectedReplyCountreplies arrived (when positive). Partials are exposed onRequestTimeoutException.PartialReplies.OperationCanceledException— the caller’s cancellation token fired.
Remarks. Returns whatever replies arrived — a partial result is a valid outcome. Inspect the list length against RequestOptions.ExpectedReplyCount to decide whether the scatter-gather succeeded.
PublishRequestAsync<TRequest, TReply>
Section titled “PublishRequestAsync<TRequest, TReply>”Task PublishRequestAsync<TRequest, TReply>( TRequest message, Action<TReply> onReply, RequestOptions? options = null, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;Publishes a request to every subscriber of the request type and invokes onReply each time a reply arrives. Use for streaming scatter-gather where replies should be processed as they arrive rather than collected into a list.
Parameters
message— the request payload; must derive fromMessage.onReply— a delegate invoked for each reply, on a thread-pool thread.options— expected reply count and timeout.cancellationToken— stops dispatching replies to the callback.
Exceptions
ArgumentNullException—messageoronReplyisnull.ArgumentException—options.EndPointis non-empty; useSendRequestAsyncfor single-destination requests.ArgumentOutOfRangeException—options.Timeoutis negative (other thanTimeout.Infinite, theintconstant-1) or zero.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stop.RequestTimeoutException— no replies arrived withinoptions.Timeout, or fewer thanoptions.ExpectedReplyCountreplies arrived.OperationCanceledException— the caller’s cancellation token fired, or anonReplyinvocation threw and propagated through the awaited task.
Remarks. The callback is invoked one-at-a-time per request (serialized under an internal lock). Keep it short-lived and non-blocking — heavy work or further bus calls from inside the callback can stall reply processing for the same request. The task completes when the expected reply count is reached or the timeout elapses.
RouteAsync<T>
Section titled “RouteAsync<T>”Task RouteAsync<T>( T message, IReadOnlyList<string> destinations, CancellationToken cancellationToken = default) where T : Message;Starts a routing slip: sends the message to the first destination with a routing-slip header listing the remaining hops. Each handler forwards to the next destination when it completes.
Parameters
message— the payload to route; must derive fromMessage.destinations— an ordered list of queue names the message should visit.cancellationToken— cancels the initial send.
Throws
ArgumentNullException—messageordestinationsisnull.ArgumentException—destinationsis empty or contains an entry with a comma (the routing-slip separator) or that otherwise fails destination validation.ObjectDisposedException— the bus has been disposed.OutgoingFiltersBlockedException— an outgoing filter returnedFilterAction.Stopbefore the message reached the transport.
Remarks. Routing-slip forwarding is handled by the consumer side and requires IBusConfiguration.EnableRoutingSlipProcessing (the default) to be true on every intermediate service.
CreateStream<T>
Section titled “CreateStream<T>”IMessageBusWriteStream CreateStream<T>(string endpoint) where T : Message;Opens a write stream for sending a large payload as a sequence of ordered chunks to a single endpoint.
Parameters
endpoint— the queue name to stream to.
Returns. A write stream the caller disposes to signal end-of-stream.
Throws
ArgumentException—endpointisnullor whitespace.ObjectDisposedException— the bus has been disposed.InvalidOperationException— noIProduceris registered in the bus’s DI graph.
Remarks. Each stream uses its own sequence number; consumers must use the matching IStreamHandler<T> to reassemble. Dispose the stream even on failure to release broker resources.
StartConsumingAsync
Section titled “StartConsumingAsync”Task StartConsumingAsync(CancellationToken cancellationToken = default);Starts consuming messages from the bus’s configured queue. The hosted service calls this automatically when IBusConfiguration.AutoStartConsuming is true; call it yourself when you have disabled auto-start.
Parameters
cancellationToken— cancels the startup handshake.
Remarks. Throws InvalidOperationException if the bus is already consuming or has previously been stopped.
StopConsumingAsync
Section titled “StopConsumingAsync”Task StopConsumingAsync(CancellationToken cancellationToken = default);Stops the consumer loop and disposes the underlying consumer connection.
Parameters
cancellationToken— bounds the graceful-shutdown wait.
IsConsuming
Section titled “IsConsuming”bool IsConsuming { get; }Indicates whether the bus is currently consuming messages. Useful in health checks and tests that must wait for the consumer to become ready.
Returns false before StartConsumingAsync has been called and after StopConsumingAsync returns. It also returns false when the broker has cancelled the consumer (queue deleted, policy expired, mirror promoted) — the underlying transport surfaces AMQP’s basic.cancel event and ServiceConnect propagates it through this property. Callers using IsConsuming for purposes other than health-checking should be aware of the broker-cancel case.
IsCancelledByBroker
Section titled “IsCancelledByBroker”bool IsCancelledByBroker { get; }Indicates whether the broker has cancelled the consumer — typically because the queue was deleted, a policy expired, or a mirror was promoted. This is a permanent broker-side failure distinct from a transient connection flap.
The property mirrors the underlying IConsumer.IsCancelledByBroker flag at the bus level so callers such as BusConsumingHealthCheck can distinguish a broker-initiated cancellation (basic.cancel) from a network reconnect that may self-heal. The default interface-method implementation returns false; framework-supplied buses override it.
IsStopped
Section titled “IsStopped”bool IsStopped { get; }Indicates whether the bus has been stopped or is in the process of being disposed.
Distinct from IsConsuming: IsConsuming also flips to false during a transient broker disconnect that the health check’s recovery-grace window may absorb, whereas IsStopped flips to true permanently once StopConsumingAsync or DisposeAsync has run. BusConsumingHealthCheck uses this flag to bypass its grace window and report Unhealthy immediately on intentional shutdown. The default interface-method implementation returns false; framework-supplied buses override it.
RequestTimeoutAsync
Section titled “RequestTimeoutAsync”Task RequestTimeoutAsync( Guid correlationId, TimeSpan delay, CancellationToken cancellationToken = default);Schedules a TimeoutMessage to be delivered back to the current bus’s queue after delay. The delivered message’s CorrelationId equals the supplied correlationId, which is the standard key a process manager uses to correlate a timeout with its saga instance.
Parameters
correlationId— the correlation id stamped on the deliveredTimeoutMessage.delay— how long to wait before delivery.cancellationToken— cancels the scheduling call, not the timeout itself.
Throws
ArgumentException—correlationIdisGuid.Empty.ArgumentOutOfRangeException—delayis less than or equal toTimeSpan.Zero.InvalidOperationException— noITimeoutStoreis registered (first-party Bus only; see the Aside below for the default-interface-method path).ObjectDisposedException— the bus has been disposed.
Publishing a domain event
Section titled “Publishing a domain event”public sealed class OrderService{ private readonly IBus _bus;
public OrderService(IBus bus) => _bus = bus;
public async Task PlaceOrderAsync(PlaceOrder command, CancellationToken cancellationToken) { // … persist the order, charge the card, etc. …
await _bus.PublishAsync( new OrderPlaced(command.CorrelationId) { OrderId = command.OrderId, CustomerId = command.CustomerId, Total = command.Total }, cancellationToken: cancellationToken); }}OrderPlaced is a domain event; any number of downstream services (shipping, invoicing, analytics) may subscribe. The publisher does not know or care who listens, and does not wait for subscribers to process the message.
Request/reply with a single respondent
Section titled “Request/reply with a single respondent”public sealed class ShippingSaga : IProcessHandler<ShippingState, OrderPlaced>{ public async Task HandleAsync( OrderPlaced @event, ShippingState data, IConsumeContext context, CancellationToken cancellationToken = default) { var quote = await context.Bus.SendRequestAsync<QuoteShipping, ShippingQuote>( new QuoteShipping(@event.CorrelationId) { OrderId = @event.OrderId, Destination = @event.ShippingAddress }, cancellationToken: cancellationToken);
data.QuotedCost = quote.Cost; data.CarrierCode = quote.CarrierCode; }}Use SendRequestAsync when there is exactly one respondent and the caller needs the reply in-line. The await will fault if the reply does not arrive within RequestOptions.Timeout, so set a timeout that matches the respondent’s worst-case latency and let the saga compensate via a timeout handler when it trips.
See also
Section titled “See also”- The Bus — concept
- Pub/Sub — concept
- Request/Reply — concept
IBusConfiguration— related referenceAddServiceConnect— related reference