Skip to content

IRequestReplyManager

IRequestReplyManager is the seam between IBus.SendRequestAsync and the transport’s outgoing pipeline. When a request method is called on IBus, the bus delegates to IRequestReplyManager, which:

  1. Generates a fresh correlation Guid and stamps it onto the outgoing headers as RequestMessageId.
  2. Records a TaskCompletionSource (or callback) keyed by that id in its pending-request table.
  3. Dispatches the typed message through the outgoing pipeline; the pipeline serializes it and writes to the transport.
  4. Awaits the TaskCompletionSource until the matching reply arrives or the timeout fires.

When a reply message arrives, the reply consumer calls ProcessReply, which looks up the correlation id in the pending table, deserializes the payload using the reply type recorded at request time (not the wire-reported type), and resolves the TaskCompletionSource.

The correlation mechanism is entirely in-memory. Pending requests are keyed by Guid in a ConcurrentDictionary; replies received with an unknown id are silently dropped.

using ServiceConnect.Interfaces.Exceptions;
using ServiceConnect.Interfaces.Options;
namespace ServiceConnect.Interfaces;
/// <summary>
/// Coordinates request/reply interactions on top of the transport pipeline.
/// </summary>
public interface IRequestReplyManager
{
/// <summary>
/// Sends a request and waits for a single reply.
/// </summary>
Task<TReply> SendRequestAsync<TRequest, TReply>(
TRequest message,
IDictionary<string, string> headers,
RequestOptions options,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;
/// <summary>
/// Sends a request and collects multiple replies.
/// </summary>
Task<IList<TReply>> SendRequestMultiAsync<TRequest, TReply>(
TRequest message,
IDictionary<string, string> headers,
RequestOptions options,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;
/// <summary>
/// Publishes a request and invokes a callback for each reply that arrives.
/// </summary>
Task PublishRequestAsync<TRequest, TReply>(
TRequest message,
IDictionary<string, string> headers,
RequestOptions options,
Action<TReply> onReply,
CancellationToken cancellationToken = default)
where TRequest : Message
where TReply : Message;
/// <summary>
/// Attempts to match an incoming reply to a pending request.
/// </summary>
void ProcessReply(string messageId, ReadOnlyMemory<byte> messageBytes, Type type);
}

Sends a typed request and waits for exactly one reply. The method generates a correlation id, stamps RequestMessageId onto headers, hands the message to the outgoing pipeline (which serializes and writes to the transport), then blocks until ProcessReply resolves the TaskCompletionSource. If the reply does not arrive within RequestOptions.Timeout, a RequestTimeoutException is thrown.

Parameters

  • message — the typed request message. The pipeline serializes it and middleware sees the strongly-typed instance via SendContext.Message.
  • headers — outgoing headers; RequestMessageId is written into this dictionary by the implementation.
  • options — routing and timeout options. Set EndPoint to target a specific queue, or leave blank to use the type-mapped default.
  • cancellationToken — cancels the wait; throws RequestSendCancelledException if cancelled during the outbound send, or plain OperationCanceledException if cancelled while awaiting the reply.

Returns the deserialized reply as TReply.

Sends a typed request and collects multiple replies. Works identically to SendRequestAsync for the outgoing side; fan-out to multiple responders comes from registering multiple queues against the request type via IQueueConfiguration.AddQueueMapping. On the incoming side, each ProcessReply call appends to an internal list. With RequestOptions.ExpectedReplyCount set to a positive value, the call returns as soon as that many replies have arrived; if fewer arrive before the timeout, a RequestTimeoutException is thrown with the partials available on PartialReplies. With ExpectedReplyCount left unset (or set to zero/negative), the call waits the full timeout and returns every reply received.

Parameters — same shape as SendRequestAsync. options.ExpectedReplyCount drives the reply-count expectation; cancellationToken throws RequestSendCancelledException on outbound cancellation or plain OperationCanceledException on caller-token cancellation.

Returns the collected replies as IList<TReply>.

Publishes a typed request to all subscribers and invokes onReply for each reply that arrives. Unlike the SendRequest variants, this uses the publish pipeline rather than the send pipeline, and the caller supplies a callback rather than awaiting a return value. The method returns when the reply count is satisfied, or after the timeout. If options.ExpectedReplyCount is positive and fewer replies arrive before the timeout, a RequestTimeoutException is thrown.

Parameters

  • message — the typed request message; same shape as SendRequestAsync.
  • onReply — invoked on the reply-processing thread for each matching reply. The callback must not block; long-running work should be dispatched to a background thread.
  • headers, options — match SendRequestAsync.
  • cancellationToken — throws RequestSendCancelledException if cancelled during the outbound publish, or plain OperationCanceledException if cancelled while awaiting replies.

Called by the reply consumer to match an incoming reply to a tracked request. Looks up messageId in the pending-request table. If a match is found, deserializes messageBytes using the reply type recorded at request time (to prevent deserialization into attacker-controlled types from crafted reply messages), then resolves the TaskCompletionSource or invokes the callback. Unknown or already-completed ids are silently ignored.

Parameters

  • messageId — the RequestMessageId header value copied verbatim from the request into the reply by the replying handler.
  • messageBytes — the serialized reply payload.
  • type — the wire-reported reply type; used for logging but the implementation deserializes to the type stored at request time.

IRequestReplyManager sits between the IBus request methods and the send/publish pipelines:

IBus.SendRequestAsync
└─► IRequestReplyManager.SendRequestAsync
├─► stamps RequestMessageId header
├─► ISendMessagePipeline.ExecuteSendMessagePipelineAsync
└─► awaits TaskCompletionSource
reply consumer calls ProcessReply

The correlation id travels on the RequestMessageId header. Replying handlers read this header and echo it back; the reply consumer strips it and calls ProcessReply.

You rarely need to replace RequestReplyManager. Common reasons to do so:

  • Durable pending-request store — survive process restarts by persisting correlation ids and their reply types to Redis or a database. On startup, reload the table and reconnect reply channels.
  • Custom timeout behaviour — extend the timeout window based on message priority, add retry logic, or surface partial results before the deadline.
  • Telemetry and observability — instrument the full round-trip duration, correlate traces across the request and reply sides, or emit metrics per message type.
  • Rate limiting — cap the number of in-flight requests to protect downstream services.

Replace the default implementation via AddRegistration:

services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example");
builder.AddRegistration(svc =>
svc.AddSingleton<IRequestReplyManager, MyDurableRequestReplyManager>());
});

Companion contract: IReplyStatusRequestReplyManager

Section titled “Companion contract: IReplyStatusRequestReplyManager”

The in-box dispatcher also depends on an internal interface, IReplyStatusRequestReplyManager (IsTrackedRequest + TryProcessReply), which the reply consumer uses to filter messages and avoid deserializing replies for requests it is not tracking. The default RequestReplyManager implements both IRequestReplyManager and this companion interface.

IReplyStatusRequestReplyManager is internal to the ServiceConnect assembly. Third-party code cannot implement or reference it directly. If you register a custom type as IRequestReplyManager without also satisfying the companion contract, AddServiceConnect throws InvalidOperationException at startup because the required IReplyStatusRequestReplyManager registration is absent.

Supported approaches for custom replacements:

  1. Wrap or decorate the default implementation. Register the default RequestReplyManager normally and forward calls from your outer type. This avoids the internal-contract problem entirely because the inner RequestReplyManager continues to satisfy IReplyStatusRequestReplyManager.

  2. Intercept via filters or middleware. Many customisation goals (telemetry, rate limiting, custom timeout behaviour) can be achieved by adding send or receive middleware rather than replacing the manager itself. Prefer this path when you do not need to change the correlation lifecycle.

  3. Open an issue. If your use case genuinely requires a fully independent replacement, file an issue in the ServiceConnect repository. Making IReplyStatusRequestReplyManager public (or merging it into the main interface) is the right fix; working around an internal contract is fragile.

  • IBus — the runtime bus surface; SendRequestAsync, SendRequestMultiAsync, and PublishRequestAsync delegate to this interface
  • Message optionsRequestOptions including Timeout, EndPoint, and ExpectedReplyCount
  • Request/reply pattern — the messaging pattern this interface implements