Skip to content

Scatter-Gather

Scatter-Gather sends the same request to several services at once, waits for their replies, and hands you back the collected results. It is the many-responders cousin of Request/Reply: one call site, one outgoing request, N replies, one return value.

The canonical use is parallel lookup — “ask every catalog for what they have that matches this query” — but the shape fits anywhere you’d otherwise write a Task.WhenAll of HTTP calls.

  • You have several services that can answer the same question independently.
  • You want the answers in parallel, not one at a time.
  • Partial results are acceptable — missing one responder shouldn’t fail the whole operation.

If you need every responder’s answer or none at all, this is not the right pattern — you want a transactional coordination tool, not a messaging one. Scatter-Gather completes when a time budget expires, not when every responder is guaranteed to have replied.

One request type, one reply type. Same shape as request/reply:

Contracts/Search.cs
using ServiceConnect.Interfaces;
public sealed class SearchRequest(Guid correlationId) : Message(correlationId)
{
public string Query { get; init; } = string.Empty;
}
public sealed class SearchResponse(Guid correlationId) : Message(correlationId)
{
public string Source { get; init; } = string.Empty; // which responder answered
public IReadOnlyList<string> Hits { get; init; } = Array.Empty<string>();
}

A Source field on the reply is a convention worth borrowing — the requester gets a bag of replies back and has to attribute them somehow. A string tag is enough.

Scatter-Gather uses PublishRequestAsync<TRequest, TReply>. The request travels through the request type’s pub/sub exchange, so every service subscribed to SearchRequest receives a copy and can reply. A callback fires for each reply that arrives; the call completes when ExpectedReplyCount replies arrive or the timeout elapses:

Requester/Program.cs
using ServiceConnect.Interfaces;
using ServiceConnect.Interfaces.Options;
await bus.StartConsumingAsync(); // required — replies land on our queue
var replies = new List<SearchResponse>();
await bus.PublishRequestAsync<SearchRequest, SearchResponse>(
new SearchRequest(Guid.NewGuid()) { Query = "widgets" },
reply => { lock (replies) { replies.Add(reply); } },
new RequestOptions
{
ExpectedReplyCount = 2,
Timeout = 30_000,
});
foreach (var reply in replies)
Console.WriteLine($"{reply.Source}: {reply.Hits.Count} hits");

Three things to notice:

  • No queue mapping required — anybody subscribed to SearchRequest is automatically a responder. The requester doesn’t list responder queues.
  • onReply callback — replies are surfaced one at a time via the callback rather than returned as a list. Aggregate inside the callback if you need a list; the returned Task completes when ExpectedReplyCount arrives or the timeout elapses.
  • Callback exceptions are fatal — an exception thrown from onReply faults the awaited task, closes the request, and silently drops subsequent matching replies. Wrap the body in try/catch if you want log-and-continue per-reply semantics.

See examples/ScatterGather for this pattern in a runnable project.

Each responder is a normal request/reply handler — identical to what you’d write for a single-responder request/reply:

CatalogA/SearchRequestHandler.cs
public sealed class SearchRequestHandler : IMessageHandler<SearchRequest>
{
public async Task HandleAsync(SearchRequest message, IConsumeContext context, CancellationToken cancellationToken = default)
{
var hits = await SearchAsync(message.Query);
await context.ReplyAsync(new SearchResponse(message.CorrelationId)
{
Source = "catalog-a",
Hits = hits,
});
}
private Task<IReadOnlyList<string>> SearchAsync(string q) => Task.FromResult<IReadOnlyList<string>>(Array.Empty<string>());
}

The responders don’t know each other. They don’t coordinate. Each one sees a SearchRequest, runs its own search, and replies. The requester is the only place that knows there are multiple responders.

Alternative — SendRequestMultiAsync to a known endpoint list

Section titled “Alternative — SendRequestMultiAsync to a known endpoint list”

When the set of responders is a fixed deployment concern — you know the queue names at startup and want the queue mapping to fail-fast if a name is misspelled — SendRequestMultiAsync lets you enumerate them explicitly. Register the request type against the responder queues and ServiceConnect fans the request out to all of them on every call. You receive an IList<TReply>:

services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => { /* … */ });
builder.ConfigureQueues(q =>
{
q.QueueName = "search-requester";
q.AddQueueMapping(typeof(SearchRequest), new[] { "catalog-a", "catalog-b" });
});
});
var replies = await bus.SendRequestMultiAsync<SearchRequest, SearchResponse>(
new SearchRequest(Guid.NewGuid()) { Query = "widgets" },
new RequestOptions
{
ExpectedReplyCount = 2,
Timeout = 30_000,
});
foreach (var reply in replies)
Console.WriteLine($"{reply.Source}: {reply.Hits.Count} hits");

Use PublishRequestAsync when responders are discovered dynamically and the requester should not hard-code the list. Use SendRequestMultiAsync when the responder set is stable and you want startup-time validation of queue names. The reply-count semantics described below apply to both methods.

ExpectedReplyCount is the knob that defines “done”:

  • Positive value N — the call completes as soon as N replies have arrived. If fewer than N arrive before the timeout, RequestTimeoutException is thrown with the partials available on PartialReplies.
  • Zero, negative, or null (default) — the call waits the full timeout and returns every reply received. Useful when you don’t know how many responders are listening — say, the set of catalogs is dynamic. You accept a fixed wait in exchange for not having to count responders.

With ExpectedReplyCount unset, the response list comes back short, the call returns cleanly, and it is your job to decide what that means — “no results from catalog-b”, a health alert, a fallback, nothing at all. The requester sees exactly the replies that arrived; the ones that didn’t are simply absent.

With a positive ExpectedReplyCount, under-delivery is a typed exception (RequestTimeoutException) carrying the partials, so you can choose to recover them or fail the whole call.

If you need to log specifically which responders failed to reply, compare the Source tags on the replies to the queue-mapping list you configured. ServiceConnect doesn’t surface this for you, and on purpose: the pattern’s whole appeal is that the call site doesn’t have to reason about per-responder failure.

  • Request/Reply — the single-responder variant this extends.
  • Pub/Sub — when you want fan-out without needing replies.
  • Aggregator — when the collecting happens on the consumer side, not the sender’s.