Skip to content

IConsumer

IConsumer is the contract that plugs a message broker into the ServiceConnect pipeline on the receive side. An implementation opens a connection to a broker, subscribes to the queues or topics that correspond to the supplied messageTypes, and dispatches each raw delivery as a ReadOnlyMemory<byte> to the ConsumerEventHandler delegate. The pipeline deserialises, routes, and executes the message from there.

One IConsumer implementation is active per transport. The framework resolves it as a singleton and calls StartConsumingAsync once per consumer host at startup. At shutdown it calls DisposeAsync to close the broker connection cleanly.

The handler’s Task<ConsumeEventResult> return value drives the ack/nack decision: returning Success = true tells the transport to acknowledge the message; Success = false or a non-null Exception tells it to nack, requeue, or dead-letter the delivery according to the transport’s policy. This design preserves at-least-once delivery — the framework never silently discards a message.

See The Bus for how the consumer fits into the full message pipeline.

bool IsConnected { get; }

Returns true when the consumer is connected to the broker and able to receive deliveries. The framework uses this property for health checks and to decide whether to attempt a reconnect. Implementations should return false during startup, after an ungraceful disconnection, and after DisposeAsync has been called.


bool IsCancelledByBroker { get; }

Returns true when the broker has explicitly cancelled this consumer — for example, the queue was deleted, the queue policy expired, or a mirror was promoted. Distinct from a transient disconnection: a broker-cancellation is terminal until the consumer is restarted. IBus.IsConsuming returns false whenever this is true so health checks surface the unhealthy state.


bool IsStopped => false;

Default-interface method (returns false if not overridden). Distinguishes “intentional shutdown” from “transient disconnect”: once StopConsumingAsync or DisposeAsync has run, IsStopped flips true permanently — there is no reconnect to wait for. ConsumerConnectionHealthCheck consults this to bypass the recovery-grace window on intentional shutdown and report Unhealthy immediately.


Task StartConsumingAsync(
string queueName,
IReadOnlyList<string> messageTypes,
ConsumerEventHandler eventHandler,
CancellationToken cancellationToken = default);

Subscribes to queueName and begins delivering messages to eventHandler. The returned Task completes when the consumer loop has terminated — either because cancellationToken was cancelled or because the implementation encountered a fatal broker error.

Parameters

  • queueName — the name of the queue, topic, or subscription to consume from.
  • messageTypes — the allow-list of wire type names the consumer should receive. Transports that support server-side filtering (Azure Service Bus subscription filters, Kafka topic-per-type routing) should push this filter to the broker. Transports that do not support server-side filtering must filter client-side: discard deliveries whose type header is not in this list.
  • eventHandler — the ConsumerEventHandler delegate that the transport calls for each delivery. See ConsumerEventHandler below.
  • cancellationToken — signals that the consumer should stop receiving new messages and exit the consume loop cleanly.

Task StopConsumingAsync(CancellationToken cancellationToken = default) => Task.CompletedTask;

Default-interface method (no-op by default). Issues a graceful stop: instructs the broker to stop delivering messages to this consumer and drains any in-flight handler invocations. Does NOT tear down the underlying channel/connection — that happens on IAsyncDisposable.DisposeAsync. Idempotent.

The default no-op exists so existing third-party IConsumer implementations stay source-compatible. Custom transports that want graceful shutdown semantics should override this; without an override, Bus.StopConsumingAsync only flips the consuming flag and the broker keeps delivering until DI disposal.


public delegate Task<ConsumeEventResult> ConsumerEventHandler(
ReadOnlyMemory<byte> message,
string type,
IDictionary<string, object> headers,
CancellationToken cancellationToken);

The callback the transport invokes for every delivered message.

Parameters

  • message — the raw serialised payload as received from the broker.
  • type — the wire type name of the message (for example "OrderService.Messages.OrderPlaced"). The pipeline uses this to resolve the CLR type and dispatch to the correct handler.
  • headers — a dictionary of transport headers. Values are typed as object because some broker clients expose typed header values (integers, timestamps) rather than plain strings. Note the asymmetry with IProducer, where outgoing headers are IDictionary<string, string> — the transport layer stringifies header values on the wire, but deserialisation restores the broker-native type.
  • cancellationToken — propagated from the consumer’s cancellation token; the handler should respect it for any I/O it performs.

Returns. A ConsumeEventResult describing whether the message was processed successfully. See ConsumeEventResult below.


public sealed class ConsumeEventResult
{
public bool Success { get; init; }
public bool NotHandled { get; init; }
public Exception? Exception { get; init; }
public bool TerminalFailure { get; init; }
}

Returned by the ConsumerEventHandler to communicate the processing outcome back to the transport. The properties are init-only — construct via object initialiser at the point of return; the type is framework-produced and consumer-observed only.

  • Success = true — the message was processed; the transport should acknowledge it to the broker.
  • Success = false — processing failed; the transport should nack, requeue, or move the message to the dead-letter destination according to its policy.
  • NotHandled = true — the dispatcher ran to completion but no processor claimed the message. Distinct from a failure; the transport should honour IBusConfiguration.DeadLetterUnhandledMessages when deciding whether to ack or route the delivery to the error exchange.
  • Exception — if non-null, the exception that caused the failure. The transport may log or attach this to the nack decision.
  • TerminalFailure = true — the failure is permanent: the message is structurally malformed (for example, an unparseable wire payload) and retrying will produce the same outcome. Transport implementers must route TerminalFailure = true directly to the error exchange, bypassing the retry queue entirely. Burning the retry budget on a poison payload that no amount of redelivery will fix is wasteful and delays detection. The dispatcher sets this flag on payload-level deserialisation failures (JsonException, NotSupportedException from a converter mismatch); handler-thrown exceptions are non-terminal and follow the normal retry path.

ServiceConnect assumes at-least-once delivery. If eventHandler returns Success = false or throws, the transport must not acknowledge the message to the broker. If the consumer process crashes while a handler is executing, the broker should redeliver the message on reconnect. Implementations that acknowledge before invoking the handler break this contract and risk silent message loss.

The sequence must be:

  1. Receive delivery from the broker.
  2. Invoke eventHandler and await the result.
  3. If result.Success is true, acknowledge to the broker.
  4. Otherwise, nack, requeue, or dead-letter according to the transport’s error policy.

Only acknowledge after the returned task has completed. Do not fire-and-forget the handler.

Transports that support prefetch limits (RabbitMQ basicQos, Kafka max.poll.records) should configure a sensible default at the point StartConsumingAsync is called, before the consumer loop begins. Unbounded prefetch can exhaust memory when the pipeline is slower than the broker. A prefetch of 10–50 messages is a reasonable starting point for most workloads; expose it via configuration rather than hard-coding it.

Observe cancellationToken throughout the consume loop. On cancellation:

  1. Stop requesting new deliveries from the broker.
  2. Allow any in-flight handler invocation to complete (or timeout gracefully).
  3. Close open broker handles inside DisposeAsync.

Do not swallow OperationCanceledException — let it propagate so the caller knows the loop exited due to cancellation rather than a broker error.

If DisposeAsync times out waiting for an in-flight StartConsumingAsync to complete its setup, the consumer’s IsStopped latch is set on entry (which surfaces as IBus.IsConsuming=false to health checks), but the started flag is intentionally left set. A subsequent StartConsumingAsync on the same instance throws InvalidOperationException("Consumer is already consuming. Call DisposeAsync before starting again.") rather than deadlocking against the lifecycle lock the wedged start still holds. Operators should resolve the wedge (typically a process restart) before consumption resumes.

The messageTypes parameter is the allow-list of wire type names for this consumer. How to apply it depends on the broker:

  • Topic-per-type (Kafka): subscribe only to the topics named after each entry in messageTypes.
  • Server-side filter (Azure Service Bus): create a subscription rule matching the type header against messageTypes.
  • Fan-out with client filtering (broad fan-out topics): subscribe once and discard deliveries whose type header is not in messageTypes before invoking the handler.
using ServiceConnect.Interfaces;
public sealed class BrokerConsumer : IConsumer
{
private bool _connected;
public bool IsConnected => _connected;
// Broker-cancellation is a hard signal (queue deleted, policy expired, mirror promoted).
// Stub `false` here; real implementations should flip true on the broker's cancel callback.
public bool IsCancelledByBroker => false;
public async Task StartConsumingAsync(
string queueName,
IReadOnlyList<string> messageTypes,
ConsumerEventHandler eventHandler,
CancellationToken cancellationToken = default)
{
// 1. Open connection / subscribe to queueName.
_connected = true;
try
{
while (!cancellationToken.IsCancellationRequested)
{
// 2. Receive next delivery from the broker.
var delivery = await ReceiveNextAsync(cancellationToken);
if (delivery is null) continue;
// 3. Extract type name and headers from the delivery.
var type = ExtractTypeName(delivery);
var headers = ExtractHeaders(delivery);
// 4. Client-side filter (only needed for fan-out transports).
if (!messageTypes.Contains(type)) continue;
// 5. Invoke the handler.
var result = await eventHandler(delivery.Body, type, headers, cancellationToken);
// 6. Ack or nack based on the result.
if (result.Success)
await AcknowledgeAsync(delivery, cancellationToken);
else
await NackAsync(delivery, result.Exception, cancellationToken);
}
}
finally
{
_connected = false;
}
}
public ValueTask DisposeAsync()
{
// Close broker connection / channel.
_connected = false;
return ValueTask.CompletedTask;
}
// Broker-specific helpers — implement using your broker client.
private Task<Delivery?> ReceiveNextAsync(CancellationToken ct) => throw new NotImplementedException();
private string ExtractTypeName(Delivery d) => throw new NotImplementedException();
private IDictionary<string, object> ExtractHeaders(Delivery d) => throw new NotImplementedException();
private Task AcknowledgeAsync(Delivery d, CancellationToken ct) => throw new NotImplementedException();
private Task NackAsync(Delivery d, Exception? ex, CancellationToken ct) => throw new NotImplementedException();
private sealed record Delivery(ReadOnlyMemory<byte> Body);
}

The following skeleton shows a KafkaConsumer that subscribes to topics named after each message type, delivers raw payloads to the pipeline, and commits offsets only after the handler returns Success = true.

using Confluent.Kafka;
using ServiceConnect.Interfaces;
public sealed class KafkaConsumer : IConsumer
{
private readonly ConsumerConfig _config;
private IConsumer<string, byte[]>? _inner;
public KafkaConsumer(ConsumerConfig config)
{
_config = config;
}
public bool IsConnected => _inner is not null;
// Kafka has no broker-side consumer-cancel callback equivalent to AMQP `basic.cancel`.
// Stub false; a real implementation could surface `ErrorCode.UnknownTopicOrPart` here.
public bool IsCancelledByBroker => false;
public async Task StartConsumingAsync(
string queueName,
IReadOnlyList<string> messageTypes,
ConsumerEventHandler eventHandler,
CancellationToken cancellationToken = default)
{
_inner = new ConsumerBuilder<string, byte[]>(_config).Build();
// Subscribe to one topic per message type (topic-per-type routing).
_inner.Subscribe(messageTypes);
try
{
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, byte[]> result;
try
{
result = _inner.Consume(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
// The message key carries the wire type name.
var type = result.Message.Key;
var headers = ExtractHeaders(result.Message.Headers);
var payload = new ReadOnlyMemory<byte>(result.Message.Value);
var outcome = await eventHandler(payload, type, headers, cancellationToken);
if (outcome.Success)
{
// Commit offset only on successful processing.
_inner.Commit(result);
}
// On failure, do not commit — the message will be redelivered on restart.
}
}
finally
{
_inner.Close();
}
}
public ValueTask DisposeAsync()
{
_inner?.Dispose();
_inner = null;
return ValueTask.CompletedTask;
}
private static IDictionary<string, object> ExtractHeaders(Headers headers)
{
var dict = new Dictionary<string, object>();
foreach (var header in headers)
dict[header.Key] = header.GetValueBytes();
return dict;
}
}

Register the KafkaConsumer during bus startup so the framework resolves it in place of the default RabbitMQ consumer:

services.AddServiceConnect(builder =>
{
builder.AddRegistration(svc =>
{
svc.AddSingleton(new ConsumerConfig
{
BootstrapServers = "kafka.internal.example:9092",
GroupId = "order-service",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
});
svc.AddSingleton<IConsumer, KafkaConsumer>();
});
});

When OrderService starts, the framework calls StartConsumingAsync with the queue name and the list of message types registered for that service. The KafkaConsumer subscribes to the corresponding topics and begins delivering OrderPlaced payloads to the pipeline.

  • The Bus — how the consumer fits into the message pipeline
  • IProducer — the outbound counterpart