Skip to content

Streaming

Streaming is for payloads that won’t fit comfortably in a single message. Images, PDFs, logs, CSV exports. The sender splits the payload into packets and writes them through a stream handle; the bus delivers each packet on the transport; the receiver reassembles the bytes and invokes the handler exactly once, when the whole payload has arrived.

This is the pattern for “the message is the bytes.” When the payload is small enough to live in the message body, use normal send or publish. Streaming exists for the cases where that would be impractical.

  • The payload is too big to carry in a single bus message (broker limits, memory pressure, serialiser cost).
  • The receiver wants the whole payload as one blob, not as a stream of independent messages.
  • You are willing to accept that the handler waits for the full transfer before starting.

If the receiver can process packets as they arrive — say, a video consumer that renders frames — that is a different shape, and you should look at fan-out patterns instead. If the payload is small, don’t reach for streaming; the overhead isn’t worth it.

A streamed payload pairs a message with bytes. The message carries metadata; the bytes live on the stream:

Contracts/DocumentUploaded.cs
using ServiceConnect.Interfaces;
public sealed class DocumentUploaded(Guid correlationId) : Message(correlationId)
{
public string FileName { get; init; } = string.Empty;
public int TotalBytes { get; init; }
}

The sender decides what goes on the message vs. the stream. The rule of thumb: anything the receiver needs to know about the payload — filename, size, content-type, trace id — goes on the message; the payload itself goes on the stream.

bus.CreateStream<T>(endpoint) opens a write stream pointed at a receiver queue. Write packets with WriteAsync, and call CloseAsync when done:

Uploader/Program.cs
await using var stream = bus.CreateStream<DocumentUploaded>("document-receiver");
for (int offset = 0; offset < payload.Length; offset += chunkSize)
{
var count = Math.Min(chunkSize, payload.Length - offset);
await stream.WriteAsync(payload.AsMemory(offset, count));
}
await stream.CloseAsync();

Four things to notice:

  • Chunking is your job. The API takes a ReadOnlyMemory<byte>; slice a byte array with .AsMemory(offset, count) to write a sub-range. You decide how big each packet is. A few kilobytes to a few hundred kilobytes is typical — small enough to fit well inside broker limits, large enough that per-packet overhead doesn’t dominate.
  • Close is not optional. Until CloseAsync is called, the receiver does not know the stream is complete and the handler will not fire. The await using ensures disposal, but the explicit CloseAsync is what marks the final packet.
  • Each WriteAsync is one bus message. The packet becomes an envelope on the wire. Ordering, at-least-once delivery, and backpressure all apply at the packet level; the bus reassembles them in order on the receiver.
  • The generic parameter names the control message type. bus.CreateStream<DocumentUploaded>(endpoint) sends a control DocumentUploaded message that initiates the stream. The handler receives both the control message (the TMessage parameter in IStreamHandler<TMessage>.ExecuteAsync) and an IMessageBusReadStream for the reassembled bytes — they are separate: the control message carries metadata (filename, size, …) and the stream carries the payload.

A stream handler implements IStreamHandler<TMessage>. It’s slightly different in shape from a normal IMessageHandler<T>:

Receiver/DocumentUploadedHandler.cs
public sealed class DocumentUploadedHandler : IStreamHandler<DocumentUploaded>
{
public async Task ExecuteAsync(
DocumentUploaded message,
IMessageBusReadStream stream,
CancellationToken cancellationToken = default)
{
var bytes = stream.Read();
await File.WriteAllBytesAsync(message.FileName, bytes, cancellationToken);
}
}
  • stream is a parameter, not a property. The dispatcher hands the reassembled stream to ExecuteAsync directly — there is no ambient property the framework writes between dispatches, so singleton-registered handlers stay thread-safe.
  • ExecuteAsync lets you await the full reassembled stream. By the time it is called, every packet has arrived and been reassembled in memory; the I/O-bound wait for the wire is over. The supplied CancellationToken flows through from the dispatcher, so long writes to disk or downstream services can be cancelled — but don’t make that work slow, because it serialises against the next stream’s reassembly.
  • stream.Read() returns the whole byte array. The reassembly is complete by the time the handler fires; no packet-by-packet processing is expected. stream.Read() throws InvalidOperationException if the assembled stream has a missing packet — a gap in the sequence indicates incomplete delivery and the handler should not silently process truncated output. stream.ReadSequence() returns the same assembled data as a ReadOnlySequence<byte>, which avoids the extra MemoryStream+ToArray() copy that Read() produces; it does not reduce peak memory, because all packets are already fully resident before the handler fires. To limit peak memory, configure IBusConfiguration.MaxStreamSizeBytes.

Register the handler the same way you register any consumer — a HandlerReference and a DI registration — except the interface is IStreamHandler<T>:

Receiver/Program.cs
services.AddSingleton<IReadOnlyList<HandlerReference>>(new List<HandlerReference>
{
new() { HandlerType = typeof(DocumentUploadedHandler), MessageType = typeof(DocumentUploaded) },
});
services.AddTransient<IStreamHandler<DocumentUploaded>, DocumentUploadedHandler>();
services.AddServiceConnect(builder =>
{
builder.UseRabbitMQ(t => { t.Host = "localhost"; });
builder.ConfigureQueues(q => q.QueueName = "document-receiver");
});

Streaming is not free. Each packet is a transport message — RabbitMQ allocates, routes, and acknowledges it. A 50 MB payload at 64 KB packets is ~800 transport messages. At 1 MB packets it is 50. The broker is happier with fewer larger packets, up to the point where a single message becomes uncomfortable for other reasons (memory pressure on the broker, redelivery cost if a packet fails).

The receiver reassembles in memory. A 5 GB payload will hold 5 GB in the receiver’s heap until ExecuteAsync returns. If that is not acceptable, chunking the semantics (many small DocumentChunkUploaded messages with sequence numbers) is the right move, not streaming.

The receiver caps the cumulative byte count per stream to defend against hostile or buggy producers that never close their stream. The ceiling is configurable via IBusConfiguration.MaxStreamSizeBytes and defaults to 100 MB; a stream that exceeds it terminates with InvalidOperationException on the consumer-side packet reassembly (so the failing handler dispatch surfaces it; the sender’s IMessageBusWriteStream.WriteAsync returns successfully on each frame and does not see the cap violation directly). Raise this for legitimate large-artefact workloads (file uploads, ML model weights); lower it to harden memory-constrained hosts. IBusConfiguration reference.

  • Point-to-Point — the normal “one message to one queue” model streaming builds on.
  • Messages — the message-vs-body distinction; streaming is where that distinction matters most.
  • HandlersIStreamHandler<T> lives alongside IMessageHandler<T> and shares the same lifetime rules.