IStreamHandler
Overview
Section titled “Overview”IStreamHandler<TMessage> is the contract for consuming a streaming message sent by a producer via IBus.CreateStream<T>(). The dispatch pipeline reassembles the in-order byte packets into a single IMessageBusReadStream and calls ExecuteAsync exactly once the complete payload has arrived, passing the stream directly as a parameter. Reach for this interface when the payload is too large or too streamy to carry in the body of a normal Message.
See Streaming for the pattern tour.
Reference
Section titled “Reference”The interface is declared with a single type parameter constrained to Message:
public interface IStreamHandler<TMessage> where TMessage : MessageExecuteAsync
Section titled “ExecuteAsync”Task ExecuteAsync(TMessage message, IMessageBusReadStream stream, CancellationToken cancellationToken = default);Invoked once the full stream has arrived and been reassembled. The reassembled stream is passed directly as stream, positioned at the start of the payload and ready to read.
Parameters
message— the control message carrying metadata about the streamed payload (file name, content type, anything else the producer attached).stream— the read stream over the reassembled payload bytes. Guaranteed non-null when invoked by the dispatcher.cancellationToken— token to observe for cancellation; flows through from the dispatcher.
Remarks. ExecuteAsync is async-first on this interface — await downstream I/O directly and pass the supplied CancellationToken through so long-running work can be cancelled. The stream is not IDisposable — the framework owns its lifetime via StreamProcessor. Handlers must not attempt to dispose it. Reading the stream to completion is the only handler-side action available. The handler still occupies a consumer slot while it runs, so offload heavy work appropriately if the consumer concurrency budget is tight.
Writing an uploaded file to disk in chunks
Section titled “Writing an uploaded file to disk in chunks”public sealed class FileUploadStreamHandler : IStreamHandler<FileUpload>{ private readonly IFileStorage _storage;
public FileUploadStreamHandler(IFileStorage storage) => _storage = storage;
public async Task ExecuteAsync(FileUpload message, IMessageBusReadStream stream, CancellationToken cancellationToken = default) { var destination = _storage.OpenWrite(message.FileName); try { // IMessageBusReadStream materialises the fully reassembled payload via Read() // (returns byte[]) or ReadSequence() (returns ReadOnlySequence<byte>). For very // large payloads, prefer ReadSequence() to walk segment-by-segment without // forcing a single contiguous allocation. foreach (var segment in stream.ReadSequence()) { await destination.WriteAsync(segment, cancellationToken); } } finally { destination.Dispose(); // IMessageBusReadStream is not IDisposable — the handler does not own // the stream's lifetime. Broker resources are released when the dispatch // completes and the StreamProcessor evicts the per-sequence entry. } }}Streaming handlers are ideal for large uploads — the file never materialises in memory as a single byte[] if the handler walks ReadSequence() segment-by-segment. The framework owns the stream’s lifetime — handlers must not dispose it.
See also
Section titled “See also”- Streaming — concept
IMessageHandler<T>— related referenceIBus.CreateStream— producer API