IRequestReplyManager
Overview
Section titled “Overview”IRequestReplyManager is the seam between IBus.SendRequestAsync and the transport’s outgoing pipeline. When a request method is called on IBus, the bus delegates to IRequestReplyManager, which:
- Generates a fresh correlation
Guidand stamps it onto the outgoing headers asRequestMessageId. - Records a
TaskCompletionSource(or callback) keyed by that id in its pending-request table. - Dispatches the typed message through the outgoing pipeline; the pipeline serializes it and writes to the transport.
- Awaits the
TaskCompletionSourceuntil the matching reply arrives or the timeout fires.
When a reply message arrives, the reply consumer calls ProcessReply, which looks up the correlation id in the pending table, deserializes the payload using the reply type recorded at request time (not the wire-reported type), and resolves the TaskCompletionSource.
The correlation mechanism is entirely in-memory. Pending requests are keyed by Guid in a ConcurrentDictionary; replies received with an unknown id are silently dropped.
Reference
Section titled “Reference”using ServiceConnect.Interfaces.Exceptions;using ServiceConnect.Interfaces.Options;
namespace ServiceConnect.Interfaces;
/// <summary>/// Coordinates request/reply interactions on top of the transport pipeline./// </summary>public interface IRequestReplyManager{ /// <summary> /// Sends a request and waits for a single reply. /// </summary> Task<TReply> SendRequestAsync<TRequest, TReply>( TRequest message, IDictionary<string, string> headers, RequestOptions options, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;
/// <summary> /// Sends a request and collects multiple replies. /// </summary> Task<IList<TReply>> SendRequestMultiAsync<TRequest, TReply>( TRequest message, IDictionary<string, string> headers, RequestOptions options, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;
/// <summary> /// Publishes a request and invokes a callback for each reply that arrives. /// </summary> Task PublishRequestAsync<TRequest, TReply>( TRequest message, IDictionary<string, string> headers, RequestOptions options, Action<TReply> onReply, CancellationToken cancellationToken = default) where TRequest : Message where TReply : Message;
/// <summary> /// Attempts to match an incoming reply to a pending request. /// </summary> void ProcessReply(string messageId, ReadOnlyMemory<byte> messageBytes, Type type);}SendRequestAsync<TRequest, TReply>
Section titled “SendRequestAsync<TRequest, TReply>”Sends a typed request and waits for exactly one reply. The method generates a correlation id, stamps RequestMessageId onto headers, hands the message to the outgoing pipeline (which serializes and writes to the transport), then blocks until ProcessReply resolves the TaskCompletionSource. If the reply does not arrive within RequestOptions.Timeout, a RequestTimeoutException is thrown.
Parameters
message— the typed request message. The pipeline serializes it and middleware sees the strongly-typed instance viaSendContext.Message.headers— outgoing headers;RequestMessageIdis written into this dictionary by the implementation.options— routing and timeout options. SetEndPointto target a specific queue, or leave blank to use the type-mapped default.cancellationToken— cancels the wait; throwsRequestSendCancelledExceptionif cancelled during the outbound send, or plainOperationCanceledExceptionif cancelled while awaiting the reply.
Returns the deserialized reply as TReply.
SendRequestMultiAsync<TRequest, TReply>
Section titled “SendRequestMultiAsync<TRequest, TReply>”Sends a typed request and collects multiple replies. Works identically to SendRequestAsync for the outgoing side; fan-out to multiple responders comes from registering multiple queues against the request type via IQueueConfiguration.AddQueueMapping. On the incoming side, each ProcessReply call appends to an internal list. With RequestOptions.ExpectedReplyCount set to a positive value, the call returns as soon as that many replies have arrived; if fewer arrive before the timeout, a RequestTimeoutException is thrown with the partials available on PartialReplies. With ExpectedReplyCount left unset (or set to zero/negative), the call waits the full timeout and returns every reply received.
Parameters — same shape as SendRequestAsync. options.ExpectedReplyCount drives the reply-count expectation; cancellationToken throws RequestSendCancelledException on outbound cancellation or plain OperationCanceledException on caller-token cancellation.
Returns the collected replies as IList<TReply>.
PublishRequestAsync<TRequest, TReply>
Section titled “PublishRequestAsync<TRequest, TReply>”Publishes a typed request to all subscribers and invokes onReply for each reply that arrives. Unlike the SendRequest variants, this uses the publish pipeline rather than the send pipeline, and the caller supplies a callback rather than awaiting a return value. The method returns when the reply count is satisfied, or after the timeout. If options.ExpectedReplyCount is positive and fewer replies arrive before the timeout, a RequestTimeoutException is thrown.
Parameters
message— the typed request message; same shape asSendRequestAsync.onReply— invoked on the reply-processing thread for each matching reply. The callback must not block; long-running work should be dispatched to a background thread.headers,options— matchSendRequestAsync.cancellationToken— throwsRequestSendCancelledExceptionif cancelled during the outbound publish, or plainOperationCanceledExceptionif cancelled while awaiting replies.
ProcessReply
Section titled “ProcessReply”Called by the reply consumer to match an incoming reply to a tracked request. Looks up messageId in the pending-request table. If a match is found, deserializes messageBytes using the reply type recorded at request time (to prevent deserialization into attacker-controlled types from crafted reply messages), then resolves the TaskCompletionSource or invokes the callback. Unknown or already-completed ids are silently ignored.
Parameters
messageId— theRequestMessageIdheader value copied verbatim from the request into the reply by the replying handler.messageBytes— the serialized reply payload.type— the wire-reported reply type; used for logging but the implementation deserializes to the type stored at request time.
Where it sits
Section titled “Where it sits”IRequestReplyManager sits between the IBus request methods and the send/publish pipelines:
IBus.SendRequestAsync └─► IRequestReplyManager.SendRequestAsync ├─► stamps RequestMessageId header ├─► ISendMessagePipeline.ExecuteSendMessagePipelineAsync └─► awaits TaskCompletionSource ▲ reply consumer calls ProcessReplyThe correlation id travels on the RequestMessageId header. Replying handlers read this header and echo it back; the reply consumer strips it and calls ProcessReply.
When to implement
Section titled “When to implement”You rarely need to replace RequestReplyManager. Common reasons to do so:
- Durable pending-request store — survive process restarts by persisting correlation ids and their reply types to Redis or a database. On startup, reload the table and reconnect reply channels.
- Custom timeout behaviour — extend the timeout window based on message priority, add retry logic, or surface partial results before the deadline.
- Telemetry and observability — instrument the full round-trip duration, correlate traces across the request and reply sides, or emit metrics per message type.
- Rate limiting — cap the number of in-flight requests to protect downstream services.
Registration
Section titled “Registration”Replace the default implementation via AddRegistration:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(svc => svc.AddSingleton<IRequestReplyManager, MyDurableRequestReplyManager>());});Companion contract: IReplyStatusRequestReplyManager
Section titled “Companion contract: IReplyStatusRequestReplyManager”The in-box dispatcher also depends on an internal interface, IReplyStatusRequestReplyManager (IsTrackedRequest + TryProcessReply), which the reply consumer uses to filter messages and avoid deserializing replies for requests it is not tracking. The default RequestReplyManager implements both IRequestReplyManager and this companion interface.
IReplyStatusRequestReplyManager is internal to the ServiceConnect assembly. Third-party code cannot implement or reference it directly. If you register a custom type as IRequestReplyManager without also satisfying the companion contract, AddServiceConnect throws InvalidOperationException at startup because the required IReplyStatusRequestReplyManager registration is absent.
Supported approaches for custom replacements:
-
Wrap or decorate the default implementation. Register the default
RequestReplyManagernormally and forward calls from your outer type. This avoids the internal-contract problem entirely because the innerRequestReplyManagercontinues to satisfyIReplyStatusRequestReplyManager. -
Intercept via filters or middleware. Many customisation goals (telemetry, rate limiting, custom timeout behaviour) can be achieved by adding send or receive middleware rather than replacing the manager itself. Prefer this path when you do not need to change the correlation lifecycle.
-
Open an issue. If your use case genuinely requires a fully independent replacement, file an issue in the ServiceConnect repository. Making
IReplyStatusRequestReplyManagerpublic (or merging it into the main interface) is the right fix; working around an internal contract is fragile.
See also
Section titled “See also”IBus— the runtime bus surface;SendRequestAsync,SendRequestMultiAsync, andPublishRequestAsyncdelegate to this interface- Message options —
RequestOptionsincludingTimeout,EndPoint, andExpectedReplyCount - Request/reply pattern — the messaging pattern this interface implements