IConsumer
Overview
Section titled “Overview”IConsumer is the contract that plugs a message broker into the ServiceConnect pipeline on the receive side. An implementation opens a connection to a broker, subscribes to the queues or topics that correspond to the supplied messageTypes, and dispatches each raw delivery as a ReadOnlyMemory<byte> to the ConsumerEventHandler delegate. The pipeline deserialises, routes, and executes the message from there.
One IConsumer implementation is active per transport. The framework resolves it as a singleton and calls StartConsumingAsync once per consumer host at startup. At shutdown it calls DisposeAsync to close the broker connection cleanly.
The handler’s Task<ConsumeEventResult> return value drives the ack/nack decision: returning Success = true tells the transport to acknowledge the message; Success = false or a non-null Exception tells it to nack, requeue, or dead-letter the delivery according to the transport’s policy. This design preserves at-least-once delivery — the framework never silently discards a message.
See The Bus for how the consumer fits into the full message pipeline.
Reference
Section titled “Reference”IsConnected
Section titled “IsConnected”bool IsConnected { get; }Returns true when the consumer is connected to the broker and able to receive deliveries. The framework uses this property for health checks and to decide whether to attempt a reconnect. Implementations should return false during startup, after an ungraceful disconnection, and after DisposeAsync has been called.
IsCancelledByBroker
Section titled “IsCancelledByBroker”bool IsCancelledByBroker { get; }Returns true when the broker has explicitly cancelled this consumer — for example, the queue was deleted, the queue policy expired, or a mirror was promoted. Distinct from a transient disconnection: a broker-cancellation is terminal until the consumer is restarted. IBus.IsConsuming returns false whenever this is true so health checks surface the unhealthy state.
IsStopped
Section titled “IsStopped”bool IsStopped => false;Default-interface method (returns false if not overridden). Distinguishes “intentional shutdown” from “transient disconnect”: once StopConsumingAsync or DisposeAsync has run, IsStopped flips true permanently — there is no reconnect to wait for. ConsumerConnectionHealthCheck consults this to bypass the recovery-grace window on intentional shutdown and report Unhealthy immediately.
StartConsumingAsync
Section titled “StartConsumingAsync”Task StartConsumingAsync( string queueName, IReadOnlyList<string> messageTypes, ConsumerEventHandler eventHandler, CancellationToken cancellationToken = default);Subscribes to queueName and begins delivering messages to eventHandler. The returned Task completes when the consumer loop has terminated — either because cancellationToken was cancelled or because the implementation encountered a fatal broker error.
Parameters
queueName— the name of the queue, topic, or subscription to consume from.messageTypes— the allow-list of wire type names the consumer should receive. Transports that support server-side filtering (Azure Service Bus subscription filters, Kafka topic-per-type routing) should push this filter to the broker. Transports that do not support server-side filtering must filter client-side: discard deliveries whose type header is not in this list.eventHandler— theConsumerEventHandlerdelegate that the transport calls for each delivery. SeeConsumerEventHandlerbelow.cancellationToken— signals that the consumer should stop receiving new messages and exit the consume loop cleanly.
StopConsumingAsync
Section titled “StopConsumingAsync”Task StopConsumingAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;Default-interface method (no-op by default). Issues a graceful stop: instructs the broker to stop delivering messages to this consumer and drains any in-flight handler invocations. Does NOT tear down the underlying channel/connection — that happens on IAsyncDisposable.DisposeAsync. Idempotent.
The default no-op exists so existing third-party IConsumer implementations stay source-compatible. Custom transports that want graceful shutdown semantics should override this; without an override, Bus.StopConsumingAsync only flips the consuming flag and the broker keeps delivering until DI disposal.
ConsumerEventHandler
Section titled “ConsumerEventHandler”public delegate Task<ConsumeEventResult> ConsumerEventHandler( ReadOnlyMemory<byte> message, string type, IDictionary<string, object> headers, CancellationToken cancellationToken);The callback the transport invokes for every delivered message.
Parameters
message— the raw serialised payload as received from the broker.type— the wire type name of the message (for example"OrderService.Messages.OrderPlaced"). The pipeline uses this to resolve the CLR type and dispatch to the correct handler.headers— a dictionary of transport headers. Values are typed asobjectbecause some broker clients expose typed header values (integers, timestamps) rather than plain strings. Note the asymmetry withIProducer, where outgoing headers areIDictionary<string, string>— the transport layer stringifies header values on the wire, but deserialisation restores the broker-native type.cancellationToken— propagated from the consumer’s cancellation token; the handler should respect it for any I/O it performs.
Returns. A ConsumeEventResult describing whether the message was processed successfully. See ConsumeEventResult below.
ConsumeEventResult
Section titled “ConsumeEventResult”public sealed class ConsumeEventResult{ public bool Success { get; init; } public bool NotHandled { get; init; } public Exception? Exception { get; init; } public bool TerminalFailure { get; init; }}Returned by the ConsumerEventHandler to communicate the processing outcome back to the transport. The properties are init-only — construct via object initialiser at the point of return; the type is framework-produced and consumer-observed only.
Success = true— the message was processed; the transport should acknowledge it to the broker.Success = false— processing failed; the transport should nack, requeue, or move the message to the dead-letter destination according to its policy.NotHandled = true— the dispatcher ran to completion but no processor claimed the message. Distinct from a failure; the transport should honourIBusConfiguration.DeadLetterUnhandledMessageswhen deciding whether to ack or route the delivery to the error exchange.Exception— if non-null, the exception that caused the failure. The transport may log or attach this to the nack decision.TerminalFailure = true— the failure is permanent: the message is structurally malformed (for example, an unparseable wire payload) and retrying will produce the same outcome. Transport implementers must routeTerminalFailure = truedirectly to the error exchange, bypassing the retry queue entirely. Burning the retry budget on a poison payload that no amount of redelivery will fix is wasteful and delays detection. The dispatcher sets this flag on payload-level deserialisation failures (JsonException,NotSupportedExceptionfrom a converter mismatch); handler-thrown exceptions are non-terminal and follow the normal retry path.
Implementing
Section titled “Implementing”Delivery guarantees
Section titled “Delivery guarantees”ServiceConnect assumes at-least-once delivery. If eventHandler returns Success = false or throws, the transport must not acknowledge the message to the broker. If the consumer process crashes while a handler is executing, the broker should redeliver the message on reconnect. Implementations that acknowledge before invoking the handler break this contract and risk silent message loss.
Ack/nack contract
Section titled “Ack/nack contract”The sequence must be:
- Receive delivery from the broker.
- Invoke
eventHandlerandawaitthe result. - If
result.Successistrue, acknowledge to the broker. - Otherwise, nack, requeue, or dead-letter according to the transport’s error policy.
Only acknowledge after the returned task has completed. Do not fire-and-forget the handler.
Prefetch and flow control
Section titled “Prefetch and flow control”Transports that support prefetch limits (RabbitMQ basicQos, Kafka max.poll.records) should configure a sensible default at the point StartConsumingAsync is called, before the consumer loop begins. Unbounded prefetch can exhaust memory when the pipeline is slower than the broker. A prefetch of 10–50 messages is a reasonable starting point for most workloads; expose it via configuration rather than hard-coding it.
Cancellation
Section titled “Cancellation”Observe cancellationToken throughout the consume loop. On cancellation:
- Stop requesting new deliveries from the broker.
- Allow any in-flight handler invocation to complete (or timeout gracefully).
- Close open broker handles inside
DisposeAsync.
Do not swallow OperationCanceledException — let it propagate so the caller knows the loop exited due to cancellation rather than a broker error.
DisposeAsync timeout behaviour
Section titled “DisposeAsync timeout behaviour”If DisposeAsync times out waiting for an in-flight StartConsumingAsync to complete its setup, the consumer’s IsStopped latch is set on entry (which surfaces as IBus.IsConsuming=false to health checks), but the started flag is intentionally left set. A subsequent StartConsumingAsync on the same instance throws InvalidOperationException("Consumer is already consuming. Call DisposeAsync before starting again.") rather than deadlocking against the lifecycle lock the wedged start still holds. Operators should resolve the wedge (typically a process restart) before consumption resumes.
Message-type filtering
Section titled “Message-type filtering”The messageTypes parameter is the allow-list of wire type names for this consumer. How to apply it depends on the broker:
- Topic-per-type (Kafka): subscribe only to the topics named after each entry in
messageTypes. - Server-side filter (Azure Service Bus): create a subscription rule matching the type header against
messageTypes. - Fan-out with client filtering (broad fan-out topics): subscribe once and discard deliveries whose type header is not in
messageTypesbefore invoking the handler.
Skeletal consumer loop
Section titled “Skeletal consumer loop”using ServiceConnect.Interfaces;
public sealed class BrokerConsumer : IConsumer{ private bool _connected;
public bool IsConnected => _connected;
// Broker-cancellation is a hard signal (queue deleted, policy expired, mirror promoted). // Stub `false` here; real implementations should flip true on the broker's cancel callback. public bool IsCancelledByBroker => false;
public async Task StartConsumingAsync( string queueName, IReadOnlyList<string> messageTypes, ConsumerEventHandler eventHandler, CancellationToken cancellationToken = default) { // 1. Open connection / subscribe to queueName. _connected = true;
try { while (!cancellationToken.IsCancellationRequested) { // 2. Receive next delivery from the broker. var delivery = await ReceiveNextAsync(cancellationToken); if (delivery is null) continue;
// 3. Extract type name and headers from the delivery. var type = ExtractTypeName(delivery); var headers = ExtractHeaders(delivery);
// 4. Client-side filter (only needed for fan-out transports). if (!messageTypes.Contains(type)) continue;
// 5. Invoke the handler. var result = await eventHandler(delivery.Body, type, headers, cancellationToken);
// 6. Ack or nack based on the result. if (result.Success) await AcknowledgeAsync(delivery, cancellationToken); else await NackAsync(delivery, result.Exception, cancellationToken); } } finally { _connected = false; } }
public ValueTask DisposeAsync() { // Close broker connection / channel. _connected = false; return ValueTask.CompletedTask; }
// Broker-specific helpers — implement using your broker client. private Task<Delivery?> ReceiveNextAsync(CancellationToken ct) => throw new NotImplementedException(); private string ExtractTypeName(Delivery d) => throw new NotImplementedException(); private IDictionary<string, object> ExtractHeaders(Delivery d) => throw new NotImplementedException(); private Task AcknowledgeAsync(Delivery d, CancellationToken ct) => throw new NotImplementedException(); private Task NackAsync(Delivery d, Exception? ex, CancellationToken ct) => throw new NotImplementedException();
private sealed record Delivery(ReadOnlyMemory<byte> Body);}Kafka consumer using Confluent.Kafka
Section titled “Kafka consumer using Confluent.Kafka”The following skeleton shows a KafkaConsumer that subscribes to topics named after each message type, delivers raw payloads to the pipeline, and commits offsets only after the handler returns Success = true.
using Confluent.Kafka;using ServiceConnect.Interfaces;
public sealed class KafkaConsumer : IConsumer{ private readonly ConsumerConfig _config; private IConsumer<string, byte[]>? _inner;
public KafkaConsumer(ConsumerConfig config) { _config = config; }
public bool IsConnected => _inner is not null;
// Kafka has no broker-side consumer-cancel callback equivalent to AMQP `basic.cancel`. // Stub false; a real implementation could surface `ErrorCode.UnknownTopicOrPart` here. public bool IsCancelledByBroker => false;
public async Task StartConsumingAsync( string queueName, IReadOnlyList<string> messageTypes, ConsumerEventHandler eventHandler, CancellationToken cancellationToken = default) { _inner = new ConsumerBuilder<string, byte[]>(_config).Build();
// Subscribe to one topic per message type (topic-per-type routing). _inner.Subscribe(messageTypes);
try { while (!cancellationToken.IsCancellationRequested) { ConsumeResult<string, byte[]> result; try { result = _inner.Consume(cancellationToken); } catch (OperationCanceledException) { break; }
// The message key carries the wire type name. var type = result.Message.Key; var headers = ExtractHeaders(result.Message.Headers); var payload = new ReadOnlyMemory<byte>(result.Message.Value);
var outcome = await eventHandler(payload, type, headers, cancellationToken);
if (outcome.Success) { // Commit offset only on successful processing. _inner.Commit(result); } // On failure, do not commit — the message will be redelivered on restart. } } finally { _inner.Close(); } }
public ValueTask DisposeAsync() { _inner?.Dispose(); _inner = null; return ValueTask.CompletedTask; }
private static IDictionary<string, object> ExtractHeaders(Headers headers) { var dict = new Dictionary<string, object>(); foreach (var header in headers) dict[header.Key] = header.GetValueBytes(); return dict; }}Register the KafkaConsumer during bus startup so the framework resolves it in place of the default RabbitMQ consumer:
services.AddServiceConnect(builder =>{ builder.AddRegistration(svc => { svc.AddSingleton(new ConsumerConfig { BootstrapServers = "kafka.internal.example:9092", GroupId = "order-service", AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false, }); svc.AddSingleton<IConsumer, KafkaConsumer>(); });});When OrderService starts, the framework calls StartConsumingAsync with the queue name and the list of message types registered for that service. The KafkaConsumer subscribes to the corresponding topics and begins delivering OrderPlaced payloads to the pipeline.