Streaming
Streaming is for payloads that won’t fit comfortably in a single message. Images, PDFs, logs, CSV exports. The sender splits the payload into packets and writes them through a stream handle; the bus delivers each packet on the transport; the receiver reassembles the bytes and invokes the handler exactly once, when the whole payload has arrived.
This is the pattern for “the message is the bytes.” When the payload is small enough to live in the message body, use normal send or publish. Streaming exists for the cases where that would be impractical.
When to use it
Section titled “When to use it”- The payload is too big to carry in a single bus message (broker limits, memory pressure, serialiser cost).
- The receiver wants the whole payload as one blob, not as a stream of independent messages.
- You are willing to accept that the handler waits for the full transfer before starting.
If the receiver can process packets as they arrive — say, a video consumer that renders frames — that is a different shape, and you should look at fan-out patterns instead. If the payload is small, don’t reach for streaming; the overhead isn’t worth it.
The message contract
Section titled “The message contract”A streamed payload pairs a message with bytes. The message carries metadata; the bytes live on the stream:
using ServiceConnect.Interfaces;
public sealed class DocumentUploaded(Guid correlationId) : Message(correlationId){ public string FileName { get; init; } = string.Empty; public int TotalBytes { get; init; }}The sender decides what goes on the message vs. the stream. The rule of thumb: anything the receiver needs to know about the payload — filename, size, content-type, trace id — goes on the message; the payload itself goes on the stream.
The sender
Section titled “The sender”bus.CreateStream<T>(endpoint) opens a write stream pointed at a receiver queue. Write packets with WriteAsync, and call CloseAsync when done:
await using var stream = bus.CreateStream<DocumentUploaded>("document-receiver");for (int offset = 0; offset < payload.Length; offset += chunkSize){ var count = Math.Min(chunkSize, payload.Length - offset); await stream.WriteAsync(payload.AsMemory(offset, count));}await stream.CloseAsync();Four things to notice:
- Chunking is your job. The API takes a
ReadOnlyMemory<byte>; slice a byte array with.AsMemory(offset, count)to write a sub-range. You decide how big each packet is. A few kilobytes to a few hundred kilobytes is typical — small enough to fit well inside broker limits, large enough that per-packet overhead doesn’t dominate. - Close is not optional. Until
CloseAsyncis called, the receiver does not know the stream is complete and the handler will not fire. Theawait usingensures disposal, but the explicitCloseAsyncis what marks the final packet. - Each
WriteAsyncis one bus message. The packet becomes an envelope on the wire. Ordering, at-least-once delivery, and backpressure all apply at the packet level; the bus reassembles them in order on the receiver. - The generic parameter names the control message type.
bus.CreateStream<DocumentUploaded>(endpoint)sends a controlDocumentUploadedmessage that initiates the stream. The handler receives both the control message (theTMessageparameter inIStreamHandler<TMessage>.ExecuteAsync) and anIMessageBusReadStreamfor the reassembled bytes — they are separate: the control message carries metadata (filename, size, …) and the stream carries the payload.
The receiver
Section titled “The receiver”A stream handler implements IStreamHandler<TMessage>. It’s slightly different in shape from a normal IMessageHandler<T>:
public sealed class DocumentUploadedHandler : IStreamHandler<DocumentUploaded>{ public async Task ExecuteAsync( DocumentUploaded message, IMessageBusReadStream stream, CancellationToken cancellationToken = default) { var bytes = stream.Read(); await File.WriteAllBytesAsync(message.FileName, bytes, cancellationToken); }}streamis a parameter, not a property. The dispatcher hands the reassembled stream toExecuteAsyncdirectly — there is no ambient property the framework writes between dispatches, so singleton-registered handlers stay thread-safe.ExecuteAsynclets you await the full reassembled stream. By the time it is called, every packet has arrived and been reassembled in memory; the I/O-bound wait for the wire is over. The suppliedCancellationTokenflows through from the dispatcher, so long writes to disk or downstream services can be cancelled — but don’t make that work slow, because it serialises against the next stream’s reassembly.stream.Read()returns the whole byte array. The reassembly is complete by the time the handler fires; no packet-by-packet processing is expected.stream.Read()throwsInvalidOperationExceptionif the assembled stream has a missing packet — a gap in the sequence indicates incomplete delivery and the handler should not silently process truncated output.stream.ReadSequence()returns the same assembled data as aReadOnlySequence<byte>, which avoids the extraMemoryStream+ToArray()copy thatRead()produces; it does not reduce peak memory, because all packets are already fully resident before the handler fires. To limit peak memory, configureIBusConfiguration.MaxStreamSizeBytes.
Register the handler the same way you register any consumer — a HandlerReference and a DI registration — except the interface is IStreamHandler<T>:
services.AddSingleton<IReadOnlyList<HandlerReference>>(new List<HandlerReference>{ new() { HandlerType = typeof(DocumentUploadedHandler), MessageType = typeof(DocumentUploaded) },});services.AddTransient<IStreamHandler<DocumentUploaded>, DocumentUploadedHandler>();
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(t => { t.Host = "localhost"; }); builder.ConfigureQueues(q => q.QueueName = "document-receiver");});The cost model
Section titled “The cost model”Streaming is not free. Each packet is a transport message — RabbitMQ allocates, routes, and acknowledges it. A 50 MB payload at 64 KB packets is ~800 transport messages. At 1 MB packets it is 50. The broker is happier with fewer larger packets, up to the point where a single message becomes uncomfortable for other reasons (memory pressure on the broker, redelivery cost if a packet fails).
The receiver reassembles in memory. A 5 GB payload will hold 5 GB in the receiver’s heap until ExecuteAsync returns. If that is not acceptable, chunking the semantics (many small DocumentChunkUploaded messages with sequence numbers) is the right move, not streaming.
The receiver caps the cumulative byte count per stream to defend against hostile or buggy producers that never close their stream. The ceiling is configurable via IBusConfiguration.MaxStreamSizeBytes and defaults to 100 MB; a stream that exceeds it terminates with InvalidOperationException on the consumer-side packet reassembly (so the failing handler dispatch surfaces it; the sender’s IMessageBusWriteStream.WriteAsync returns successfully on each frame and does not see the cap violation directly). Raise this for legitimate large-artefact workloads (file uploads, ML model weights); lower it to harden memory-constrained hosts. IBusConfiguration reference.
Reference
Section titled “Reference”IStreamHandler— streaming consumer contractIBus.CreateStream— streaming producer entry point
What comes next
Section titled “What comes next”- Point-to-Point — the normal “one message to one queue” model streaming builds on.
- Messages — the message-vs-body distinction; streaming is where that distinction matters most.
- Handlers —
IStreamHandler<T>lives alongsideIMessageHandler<T>and shares the same lifetime rules.