Skip to content

IProducer

IProducer is the contract that plugs a message broker into the ServiceConnect pipeline on the send side. An implementation accepts serialised message bytes and delivers them to the broker using whichever routing strategy the caller requests. One IProducer implementation is active per transport; the framework resolves it as a singleton and holds it for the lifetime of the bus.

The four send-shaped methods cover distinct routing patterns:

  • PublishAsync — pub/sub fan-out. The broker routes the message to every subscriber bound to the message type. Use this when OrderService publishes OrderPlaced and multiple downstream services each receive a copy.
  • SendAsync(Type, ...) — auto-routed point-to-point. The producer receives a Type, looks up IQueueConfiguration.QueueMappings itself to resolve the destination queue or queues, and sends to each. The caller does not need to know the destination — the producer owns the mapping lookup.
  • SendAsync(string, Type, ...) — explicit destination point-to-point. The caller has already resolved the endpoint and supplies it directly. The producer sends to that address without consulting any configuration.
  • SendBytesAsync(string, Type, ReadOnlyMemory<byte>, ...) — raw-bytes delivery to an explicit endpoint. Used by control-plane channels (stream writers, reply-to envelopes) when the caller has already framed the payload. The Type parameter carries the logical message type for header stamping only.

MaximumMessageSize lets the pipeline pre-check payload size before attempting a send, avoiding a roundtrip to the broker for a message that will be rejected.

Note the header asymmetry: outgoing headers are IReadOnlyDictionary<string, string>? (string values only) because transport headers are stringified on the wire. On the receive side, ConsumerEventHandler receives IDictionary<string, object> because some broker clients restore typed values (integers, timestamps) during deserialisation. See IConsumer for the receiving contract.

See The Bus for how the producer fits into the full message pipeline.

Task PublishAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Publishes body to all subscribers registered for type. The broker routes the delivery using whatever fan-out mechanism the transport supports (exchange/binding for RabbitMQ, topic for Kafka, topic with multiple subscriptions for Azure Service Bus).

Parameters

  • type — the CLR type of the message; used to derive the routing key, topic name, or exchange binding.
  • body — the serialised payload bytes.
  • headers — optional string-valued transport headers to attach to the delivery (for example, "CorrelationId", "MessageId").
  • cancellationToken — cancels the in-progress send.

Returns. A Task that completes once the broker has confirmed durability (publisher confirm, acks=all, or equivalent). Do not return early on an in-flight send.


Task PublishAsync(
Type type,
ReadOnlyMemory<byte> body,
string? routingKey,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Publishes body to subscribers of type, forwarding routingKey to the transport for topic-exchange dispatch. This overload is a default-interface method: implementations that predate it fall back to the no-routing-key PublishAsync overload, silently dropping the key. First-party transports (RabbitMQ) override this to honour the key on the wire.

Parameters

  • type — the CLR type of the message.
  • body — the serialised payload bytes.
  • routingKey — the transport routing key. Pass an empty string for fanout dispatch. Pass null to use the transport’s default routing behaviour (identical to calling the no-routing-key overload).
  • headers — optional string-valued transport headers.
  • cancellationToken — cancels the in-progress send.

Default implementation. Ignores routingKey and delegates to PublishAsync(type, body, headers, cancellationToken). Third-party producers that have not overridden this method silently drop the key — this is the same behaviour as before the overload was added.

Relationship to SupportsRoutingKey. SupportsRoutingKey returns false for any producer that has not overridden this overload. The bus emits a once-per-bus LogWarning when a caller supplies PublishOptions.RoutingKey and SupportsRoutingKey is false, so that topic-exchange dispatch failures are visible without a roundtrip to the broker. Custom producers that honour the routing key must override both this method and SupportsRoutingKey.


Task SendAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Sends body to the destination queue (or queues) mapped for type. The transport implementation receives the Type and looks up IQueueConfiguration.QueueMappings itself — keyed on the type’s fully qualified name — to resolve the destination. If no mapping exists, the producer should throw rather than silently drop the message. This is the overload to use when queue-mapping configuration fully controls routing and the caller does not need to name the destination.

Parameters

  • type — the CLR type of the message.
  • body — the serialised payload bytes.
  • headers — optional string-valued transport headers.
  • cancellationToken — cancels the in-progress send.

Failure semantics (multi-mapping fan-out). When the queue mapping for type resolves to multiple endpoints, each endpoint is published to in turn:

  • Per-endpoint failure does not abort the loop. A non-cancellation failure on one endpoint is collected, and the loop continues to the remaining endpoints. After the loop completes, all collected failures surface as a single AggregateException whose inner exceptions are the individual per-endpoint failures.
  • Cancellation aborts the remaining endpoints. If cancellationToken fires (or an awaited operation throws OperationCanceledException) on a given endpoint, the loop does not attempt subsequent endpoints. If any prior endpoint already failed, the cancellation is aggregated with those failures into a single AggregateException (so prior failures are not lost). If no prior endpoint failed, the OperationCanceledException propagates plain — callers’ standard cancellation handlers see the canonical type.
  • Single-endpoint mappings retain plain-exception semantics. When the type maps to exactly one endpoint, this overload behaves identically to the explicit-endpoint overload below: failures and cancellation propagate plain, never wrapped.

Task SendAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Sends body to endPoint directly. The producer does not consult queue mappings — it trusts the caller and sends to the given address.

Parameters

  • endPoint — the queue name, topic, or address to send to.
  • type — the CLR type of the message; carried in headers for the receiver to resolve.
  • body — the serialised payload bytes.
  • headers — optional string-valued transport headers.
  • cancellationToken — cancels the in-progress send.

Task SendAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> body,
int? routingSlipHopsCompleted,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Sends body to endPoint, carrying the framework’s routing-slip hop counter so it can be stamped authoritatively after middleware runs.

Parameters

  • endPoint — the destination queue name.
  • type — the CLR type of the message; carried in headers for the receiver to resolve.
  • body — the serialised payload bytes.
  • routingSlipHopsCompleted — the hop count set by Bus.RouteAsync. When non-null, the value is stamped onto the outgoing headers as HeaderKeys.RoutingSlipHopsCompleted after the send middleware chain, so middleware cannot override the framework value. When null, no hop-count header is written.
  • headers — optional string-valued transport headers.
  • cancellationToken — cancels the in-progress send.

Default implementation. When routingSlipHopsCompleted is non-null, the DIM injects the hop count into a copy of headers as HeaderKeys.RoutingSlipHopsCompleted (using InvariantCulture formatting) and delegates to SendAsync(endPoint, type, body, (IReadOnlyDictionary<string,string>)injected, cancellationToken). When null, it delegates directly. First-party transports override to honour the separate parameter without the header-copy step.

Remarks. This overload exists so the hop counter is stamped authoritatively — after all middleware has run — regardless of whether the producer has been upgraded to handle the parameter natively. Third-party producers that have not overridden this method still receive the correct wire stamp via the DIM’s header-copy fallback.


Task SendBytesAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> packet,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default);

Delivers a pre-framed byte packet to endPoint without any type-routing logic. Used by control-plane channels (for example, stream writers and reply-to envelopes) where the caller has already composed the full payload.

Parameters

  • endPoint — the queue name or address to deliver to.
  • type — the logical message type the packet represents (for example, the element type of a stream). The implementation uses this to stamp the reserved type headers (TypeName, FullTypeName) authoritatively — callers cannot override them via headers.
  • packet — the raw bytes to send; the caller is responsible for framing.
  • headers — optional string-valued transport headers. The producer stamps DestinationAddress, MessageType, SourceAddress, TimeSent, SourceMachine, TypeName, FullTypeName, ConsumerType, and Language authoritatively — caller values for these are overwritten. MessageId is preserved when the caller supplies one; the producer mints a new value only if absent. MessageType is stamped "ByteStream" for this method (see the page-level note above for the full operation-flag semantics).
  • cancellationToken — cancels the in-progress send.

long MaximumMessageSize { get; }

The maximum payload size in bytes that this transport accepts per message. The pipeline reads this value before serialising large payloads and can reject or split the message before incurring a network roundtrip to the broker.

Return long.MaxValue if the transport imposes no practical limit. Return the broker’s documented maximum (for example, 1 048 576 for the RabbitMQ default frame size, or 262 144 for Azure Service Bus Standard tier) so that callers can enforce it.


bool IsHealthy { get; }

Returns true when the producer is currently connected and ready to publish or send. Returns false before the first publish/send call (the producer connects lazily) and after a connection drop until the next reconnect. Mirrors IConsumer.IsConnected.


bool HasAttemptedConnection { get; }

Returns false for a freshly-constructed producer that has not yet been asked to publish or send. Once a publish/send call begins (whether or not it succeeds), this becomes true and stays true for the producer’s lifetime. The producer health check uses this to distinguish “lazy, not yet tried” (Healthy) from “tried and currently disconnected” (Unhealthy).


ProducerHealthSnapshot GetHealthSnapshot()
=> new(IsHealthy, HasAttemptedConnection);

Default-interface method that returns a single-snapshot read of IsHealthy and HasAttemptedConnection. Health-check probes that need both values must use this method rather than reading the two properties separately — the pair admits a race where a publish-success transition lands between the reads and the probe sees stale state. First-party producers (RabbitMQ) override the default with a truly-atomic snapshot read.


bool SupportsRoutingKey { get; }

true when the producer honors the routingKey parameter on PublishAsync(Type, ReadOnlyMemory<byte>, string?, ...). The default-interface-method value is false; the RabbitMQ producer overrides to true. When false, the bus emits a once-per-bus LogWarning if a caller supplies PublishOptions.RoutingKey to a producer that drops the key on the wire. Custom producers that route by key should override this to true and handle the key in their PublishAsync implementation.


IProducer extends IAsyncDisposable. Lifecycle is owned exclusively by DisposeAsync — there is no separate DisconnectAsync method. Implementations should flush any pending in-flight sends before closing the underlying broker connection inside DisposeAsync.


The Task returned by each send method should complete only after the broker has confirmed durability — RabbitMQ publisher confirms, Kafka acks=all, Azure Service Bus settlement. Returning before the broker acknowledgement is received risks silent message loss if the process crashes immediately after the send returns. Avoid fire-and-forget sends inside the implementation.

ServiceConnect does not deduplicate at the transport layer. If the pipeline retries a failed send and the broker already accepted the first copy, the consumer receives duplicates. Transports that expose per-message sequence numbers or deduplication ids (Kafka idempotent producer, Azure Service Bus MessageId) should populate those identifiers from the headers dictionary so that downstream consumers can detect and discard duplicates.

The producer should retry on transient broker errors — connection blips, throttling responses, temporary unavailability. It should not retry on payload-level errors such as message-too-large or schema validation failures. Use MaximumMessageSize to short-circuit before sending rather than discovering the rejection from a broker error response.

The three point-to-point overloads have different responsibilities:

  • SendAsync(Type, ...) — the implementation must consult IQueueConfiguration.QueueMappings to resolve the destination. Inject IQueueConfiguration via the constructor and call TryGetQueueMapping (or equivalent) inside this method. If no mapping is registered for the type, throw InvalidOperationException rather than silently dropping the message.
  • SendAsync(string endPoint, Type, ...) — the caller has already resolved the endpoint. The implementation should trust the supplied endPoint and send directly without performing any queue-mapping lookup.
  • SendBytesAsync(string endPoint, ...) — same as the explicit-endpoint overload: trust the caller’s endpoint value and send without modification.

PublishAsync(Type, ...) is distinct from all three: it is a pub/sub fan-out. Derive the exchange name, topic, or routing key from the type and publish to every subscriber — no queue mapping is involved.

If your adapter needs to interoperate with the bundled RabbitMQ transport on the same broker (publishers using one adapter, subscribers using the other), call ServiceConnect.Services.MessageTypeExchangeName.From(type) to compute the exchange name. The helper is the single source of truth used by the bundled RabbitMQ producer for publish and by the core bus for binding — its output format is fixed for wire compatibility. Adapters that are self-contained (their own publishers and consumers, no cross-adapter interop) are free to use any naming scheme.

using ServiceConnect.Interfaces;
public sealed class BrokerProducer : IProducer
{
public long MaximumMessageSize => 1_048_576; // broker limit
public Task PublishAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
var topic = DeriveTopicFromType(type);
return PublishToBrokerAsync(topic, body, headers, cancellationToken);
}
public Task SendAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
// Resolve the destination queue from IQueueConfiguration.QueueMappings.
// Inject IQueueConfiguration via the constructor and call TryGetQueueMapping here.
throw new NotImplementedException();
}
public Task SendAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
return PublishToBrokerAsync(endPoint, body, headers, cancellationToken);
}
public Task SendBytesAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> packet,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
// Stamp type headers from `type` (server-authoritative) then publish.
return PublishToBrokerAsync(endPoint, packet, headers, cancellationToken);
}
public bool IsHealthy => /* read connection state */ false;
public bool HasAttemptedConnection => /* set when first send begins */ false;
public ValueTask DisposeAsync()
{
// Flush any pending in-flight sends, then close + dispose the broker client.
return ValueTask.CompletedTask;
}
// Broker-specific helpers — implement using your broker client.
private string DeriveTopicFromType(Type type) => throw new NotImplementedException();
private Task PublishToBrokerAsync(string destination, ReadOnlyMemory<byte> payload,
IReadOnlyDictionary<string, string>? headers, CancellationToken ct) => throw new NotImplementedException();
}

The following skeleton shows a KafkaProducer that enables idempotent delivery, ensuring exactly-once writes to a given topic partition when the broker is reachable.

using Confluent.Kafka;
using ServiceConnect.Interfaces;
public sealed class KafkaProducer : IProducer
{
private readonly IProducer<string, byte[]> _inner;
public KafkaProducer(ProducerConfig config)
{
_inner = new ProducerBuilder<string, byte[]>(config).Build();
}
public long MaximumMessageSize => 1_048_576; // Kafka default max.message.bytes
public async Task PublishAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
var topic = type.FullName!.Replace('.', '-').ToLowerInvariant();
var kafkaMsg = BuildMessage(type.FullName!, body, headers);
await _inner.ProduceAsync(topic, kafkaMsg, cancellationToken);
}
public async Task SendAsync(
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
// Resolve the destination from IQueueConfiguration.QueueMappings, then
// fall through to the explicit-endpoint overload — not shown here for brevity.
throw new NotImplementedException();
}
public async Task SendAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> body,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
var kafkaMsg = BuildMessage(type.FullName!, body, headers);
await _inner.ProduceAsync(endPoint, kafkaMsg, cancellationToken);
}
public async Task SendBytesAsync(
string endPoint,
Type type,
ReadOnlyMemory<byte> packet,
IReadOnlyDictionary<string, string>? headers = null,
CancellationToken cancellationToken = default)
{
var kafkaMsg = BuildMessage(key: string.Empty, packet, headers);
await _inner.ProduceAsync(endPoint, kafkaMsg, cancellationToken);
}
// IProducer lifecycle is owned exclusively by DisposeAsync. Health probes (IsHealthy +
// HasAttemptedConnection) are exposed via dedicated properties + GetHealthSnapshot.
public bool IsHealthy => _inner is not null;
public bool HasAttemptedConnection { get; private set; }
public ValueTask DisposeAsync()
{
// Flush any in-flight produces before disposing the underlying client so the broker
// gets a chance to ack every send the bus has handed off.
var remaining = _inner.Flush(TimeSpan.FromSeconds(5));
if (remaining > 0)
{
// Caller decides whether unacked messages are an error; here we accept best-effort.
}
_inner.Dispose();
return ValueTask.CompletedTask;
}
private static Message<string, byte[]> BuildMessage(
string key,
ReadOnlyMemory<byte> value,
IReadOnlyDictionary<string, string>? headers)
{
var kafkaHeaders = new Headers();
if (headers is not null)
foreach (var (k, v) in headers)
kafkaHeaders.Add(k, System.Text.Encoding.UTF8.GetBytes(v));
return new Message<string, byte[]> { Key = key, Value = value.ToArray(), Headers = kafkaHeaders };
}
}

Register the KafkaProducer with idempotent writes enabled:

services.AddServiceConnect(builder =>
{
builder.AddRegistration(svc =>
{
svc.AddSingleton(new ProducerConfig
{
BootstrapServers = "kafka.internal.example:9092",
EnableIdempotence = true,
Acks = Acks.All,
});
svc.AddSingleton<IProducer, KafkaProducer>();
});
});

When PaymentProcessor publishes a PaymentAuthorised message, the pipeline serialises it, calls PublishAsync, and the KafkaProducer delivers it to the paymentprocessor-messages-paymentauthorised topic. With EnableIdempotence = true and Acks = All, the ProduceAsync call does not complete until all in-sync replicas have written the record — the at-least-once guarantee is preserved even if the producer retries a transient network error.

  • The Bus — how the producer fits into the message pipeline
  • IConsumer — the inbound counterpart