Skip to content

The Bus

The bus is the handle your code uses to talk to the rest of the system. Concretely, it is the IBus interface. Once you have one, you can send messages to a specific queue, publish them to subscribers, fire a request and await a reply, route through a chain of services, stream large payloads, or consume messages from your own queue.

public interface IBus : IAsyncDisposable
{
Task SendAsync<T>(T message, SendOptions? options = null, CancellationToken cancellationToken = default) where T : Message;
Task SendToManyAsync<T>(T message, IReadOnlyList<string> endPoints, SendOptions? options = null, CancellationToken cancellationToken = default) where T : Message;
Task PublishAsync<T>(T message, PublishOptions? options = null, CancellationToken cancellationToken = default) where T : Message;
Task<TReply> SendRequestAsync<TRequest, TReply>(TRequest message, RequestOptions? options = null, CancellationToken cancellationToken = default)
where TRequest : Message where TReply : Message;
Task RouteAsync<T>(T message, IReadOnlyList<string> destinations, CancellationToken cancellationToken = default) where T : Message;
IMessageBusWriteStream CreateStream<T>(string endpoint) where T : Message;
Task StartConsumingAsync(CancellationToken cancellationToken = default);
Task StopConsumingAsync(CancellationToken cancellationToken = default);
bool IsConsuming { get; }
// …plus SendRequestMultiAsync, PublishRequestAsync, RequestTimeoutAsync
}

You do not construct IBus directly. You configure it, register it with dependency injection, and resolve it by type.

ServiceConnect integrates with Microsoft.Extensions.DependencyInjection. The entry point is AddServiceConnect:

var services = new ServiceCollection();
services.AddLogging();
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport =>
{
transport.Host = "localhost";
transport.Username = "guest";
transport.Password = "guest";
});
builder.ConfigureQueues(queues => queues.QueueName = "orders");
});
await using var provider = services.BuildServiceProvider();
var bus = provider.GetRequiredService<IBus>();

The builder composes four concerns:

  • Transport — which broker the bus talks to. UseRabbitMQ is the supported transport.
  • Queues — the name of this service’s queue, plus error and audit queue names, plus any message-to-queue routing for SendAsync. See Endpoints.
  • Bus — runtime behaviour flags: handler scanning, auto-start, consumer count, exception handler. See IBusConfiguration.
  • Persistence — optional. Required for process managers, aggregators, and persisted timeouts. Configured via UseMongoDbPersistence or UseInMemoryPersistence.

In a hosted application (ASP.NET Core, Worker Service, generic host), the bus is a singleton registered by AddServiceConnect. It starts consuming automatically when the host starts, and shuts down cleanly on host stop.

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddServiceConnect(sc =>
{
sc.UseRabbitMQ(t => { /* … */ });
sc.ConfigureQueues(q => q.QueueName = "orders");
});
await builder.Build().RunAsync();

For a sender-only process — something that produces a message and exits, like the Getting Started sender — you can skip StartConsumingAsync and just call SendAsync. The bus still needs a queue name so replies and errors have somewhere to land.

Calling StopConsumingAsync disposes the underlying consumer. After that, StartConsumingAsync throws InvalidOperationException. If you need to resume, dispose the bus and create a new one. This makes lifecycle reasoning explicit: a bus instance is either new, consuming, or stopped — never restarted.

await bus.StopConsumingAsync();
await bus.StartConsumingAsync(); // throws — stop is terminal

All outbound operations take a subtype of Message so ServiceConnect can flow a correlation id and attach headers.

Point-to-point. You know the destination queue. ServiceConnect writes one message to that queue.

await bus.SendAsync(
new WorkSubmitted(Guid.NewGuid()) { WorkId = "work-001" },
new SendOptions { EndPoint = "fulfillment" });

Fan-out. Subscribers of the message type receive a copy. You don’t name destinations; RabbitMQ’s exchange handles the fan-out.

await bus.PublishAsync(new OrderPlaced(correlationId) { OrderId = "order-100" });

A message that expects a single response, or a bounded set of responses.

var quote = await bus.SendRequestAsync<QuoteRequested, QuoteReady>(
new QuoteRequested(Guid.NewGuid()) { Sku = "widget" });

The caller awaits the reply. Under the hood, ServiceConnect correlates the reply using the ResponseMessageId header, which the reply handler sets to the value of the outbound RequestMessageId.

A message flows through an ordered list of destinations. Each service does its work and forwards along the slip.

await bus.RouteAsync(message, new[] { "validate", "enrich", "notify" });

See Routing Slip for the pattern it implements.

For payloads too large for a single message, CreateStream<T> opens a chunked write stream to a named endpoint. The receiving side exposes a matching IMessageBusReadStream.

Process managers use RequestTimeoutAsync to ask the bus to deliver a TimeoutMessage back to this queue after a delay. It is the mechanism behind saga timeouts.

await bus.RequestTimeoutAsync(processManagerId, TimeSpan.FromMinutes(30));

Inside a handler, prefer IConsumeContext.Bus — it is the same IBus instance but makes the dependency explicit at the point of use. Elsewhere, take IBus as a constructor parameter:

public sealed class PaymentsGateway(IBus bus)
{
public Task RefundAsync(Guid orderId) =>
bus.SendAsync(
new RefundRequested(orderId) { OrderId = orderId.ToString() },
new SendOptions { EndPoint = "payments" });
}

The bus is thread-safe. One instance is shared across the process.

  • Messages — the contracts you pass to the bus.
  • Handlers — how incoming messages reach your code.
  • Endpoints — how Send chooses a queue and how subscriptions work.