Skip to content

IStreamHandler

IStreamHandler<TMessage> is the contract for consuming a streaming message sent by a producer via IBus.CreateStream<T>(). The dispatch pipeline reassembles the in-order byte packets into a single IMessageBusReadStream and calls ExecuteAsync exactly once the complete payload has arrived, passing the stream directly as a parameter. Reach for this interface when the payload is too large or too streamy to carry in the body of a normal Message.

See Streaming for the pattern tour.

The interface is declared with a single type parameter constrained to Message:

public interface IStreamHandler<TMessage> where TMessage : Message
Task ExecuteAsync(TMessage message, IMessageBusReadStream stream, CancellationToken cancellationToken = default);

Invoked once the full stream has arrived and been reassembled. The reassembled stream is passed directly as stream, positioned at the start of the payload and ready to read.

Parameters

  • message — the control message carrying metadata about the streamed payload (file name, content type, anything else the producer attached).
  • stream — the read stream over the reassembled payload bytes. Guaranteed non-null when invoked by the dispatcher.
  • cancellationToken — token to observe for cancellation; flows through from the dispatcher.

Remarks. ExecuteAsync is async-first on this interface — await downstream I/O directly and pass the supplied CancellationToken through so long-running work can be cancelled. The stream is not IDisposable — the framework owns its lifetime via StreamProcessor. Handlers must not attempt to dispose it. Reading the stream to completion is the only handler-side action available. The handler still occupies a consumer slot while it runs, so offload heavy work appropriately if the consumer concurrency budget is tight.

Writing an uploaded file to disk in chunks

Section titled “Writing an uploaded file to disk in chunks”
public sealed class FileUploadStreamHandler : IStreamHandler<FileUpload>
{
private readonly IFileStorage _storage;
public FileUploadStreamHandler(IFileStorage storage) => _storage = storage;
public async Task ExecuteAsync(FileUpload message, IMessageBusReadStream stream, CancellationToken cancellationToken = default)
{
var destination = _storage.OpenWrite(message.FileName);
try
{
// IMessageBusReadStream materialises the fully reassembled payload via Read()
// (returns byte[]) or ReadSequence() (returns ReadOnlySequence<byte>). For very
// large payloads, prefer ReadSequence() to walk segment-by-segment without
// forcing a single contiguous allocation.
foreach (var segment in stream.ReadSequence())
{
await destination.WriteAsync(segment, cancellationToken);
}
}
finally
{
destination.Dispose();
// IMessageBusReadStream is not IDisposable — the handler does not own
// the stream's lifetime. Broker resources are released when the dispatch
// completes and the StreamProcessor evicts the per-sequence entry.
}
}
}

Streaming handlers are ideal for large uploads — the file never materialises in memory as a single byte[] if the handler walks ReadSequence() segment-by-segment. The framework owns the stream’s lifetime — handlers must not dispose it.