ITimeoutStore
Overview
Section titled “Overview”ITimeoutStore persists scheduled timeout messages and delivers them in batches when they become due. The timeout manager polls GetTimeoutsBatchAsync on a cadence driven by BusConfiguration.ProcessManagerTimeoutPollInterval, inserts new timeouts via InsertTimeoutAsync, and commits or rolls back each dispatch with RemoveDispatchedTimeoutAsync or ReleaseDispatchedTimeoutAsync.
ITimeoutStore covers both single-instance and multi-instance (active-active) deployments. Remove and release operations accept an optional Guid? lockOwner — when supplied, the operation is lease-checked and a worker that has lost its lease observes ConcurrencyException so the row is not double-dispatched. When lockOwner is null, the operation is unconditional and the lease check is skipped.
See Process Manager for the saga and timeout conceptual model.
Reference
Section titled “Reference”public interface ITimeoutStore{ Task InsertTimeoutAsync(TimeoutData timeoutData, CancellationToken cancellationToken = default); Task<TimeoutsBatch> GetTimeoutsBatchAsync(int? batchSize = null, CancellationToken cancellationToken = default); Task RemoveDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default); Task ReleaseDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default); Task<long> ReapStaleLeasesAsync(CancellationToken cancellationToken = default); // DIM — returns 0L}TimeoutData carries the timeout identifier, target destination, scheduled fire time, and an IReadOnlyDictionary<string, object> headers bag. Implementations that construct TimeoutData pass any headers via the init accessor; post-construction mutation is not supported. TimeoutsBatch wraps a list of due TimeoutData records. Callers re-poll on their own cadence (typically BusConfiguration.ProcessManagerTimeoutPollInterval).
InsertTimeoutAsync
Section titled “InsertTimeoutAsync”Task InsertTimeoutAsync(TimeoutData timeoutData, CancellationToken cancellationToken = default);Inserts a timeout into the store.
Parameters
timeoutData— the timeout to persist.cancellationToken— a token that cancels the operation.
GetTimeoutsBatchAsync
Section titled “GetTimeoutsBatchAsync”Task<TimeoutsBatch> GetTimeoutsBatchAsync(int? batchSize = null, CancellationToken cancellationToken = default);Loads the next batch of due timeouts.
Parameters
batchSize— when supplied, caps the number of timeouts returned in a single poll. Must be greater than zero. When null, the persistor applies its default cap (MongoDB usesMongoDbPersistenceOptions.TimeoutBatchSize; the in-memory store returns all due timeouts).cancellationToken— a token that cancels the operation.
Returns a TimeoutsBatch whose DueTimeouts list contains the timeouts ready to fire. Callers re-poll on their own cadence (typically BusConfiguration.ProcessManagerTimeoutPollInterval).
Throws ArgumentOutOfRangeException when batchSize is non-null and not greater than zero.
RemoveDispatchedTimeoutAsync
Section titled “RemoveDispatchedTimeoutAsync”Task RemoveDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default);Removes a timeout after it has been dispatched successfully.
Parameters
id— the timeout identifier.lockOwner— when non-null, the row is removed only if its current lock owner matches; when null, the row is removed unconditionally.cancellationToken— a token that cancels the operation.
Throws ConcurrencyException when lockOwner is supplied and the row’s current owner does not match (or the row’s lease has expired and been reclaimed by another worker).
Remarks. Call this after the timeout message has been delivered to the downstream consumer and acknowledged. Pass lockOwner when running multiple bus instances against a shared store, so a worker whose lease was reaped cannot accidentally delete a row that another worker has since claimed.
ReleaseDispatchedTimeoutAsync
Section titled “ReleaseDispatchedTimeoutAsync”Task ReleaseDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default);Releases a dispatched timeout so it may be retried later.
Parameters
id— the timeout identifier.lockOwner— when non-null, the row is released only if its current lock owner matches; when null, the row is released unconditionally.cancellationToken— a token that cancels the operation.
Throws ConcurrencyException when lockOwner is supplied and the row’s current owner does not match.
Remarks. Call this when dispatch fails — the consumer rejected the message, transport delivery failed, or the timeout is being requeued for retry. After a successful release the timeout is visible on the next GetTimeoutsBatchAsync call.
ReapStaleLeasesAsync
Section titled “ReapStaleLeasesAsync”Task<long> ReapStaleLeasesAsync(CancellationToken cancellationToken = default);Reclaims rows whose lease has expired but whose row is still flagged as locked — typically because a worker crashed mid-dispatch or a broker partition outlasted the lease window.
Parameters
cancellationToken— cancels the operation.
Returns the number of rows whose lease was reclaimed as a long.
Default implementation. Returns Task.FromResult(0L) — a no-op. Persistors whose GetTimeoutsBatchAsync already reclaims expired leases as a side-effect (the in-memory store) can leave this default in place.
Override guidance. Persistors with explicit lease rows (MongoDB) should override with a single batched update that clears the lock flag for all rows where LockExpiresAt <= UtcNow. Operators running such persistors should call ReapStaleLeasesAsync on a timer at roughly LockLeaseDuration / 4 cadence to reclaim rows held by crashed workers promptly rather than waiting for the natural GetTimeoutsBatchAsync recovery path.
Remarks. The natural-recovery path is the next GetTimeoutsBatchAsync poll, whose filter accepts both unlocked rows and locked-but-expired rows — operators don’t need to call this method for routine recovery. It exists for on-demand cleanup from an admin endpoint or a one-off script when a deployment wants to unstick the queue without waiting for the next poll cycle.
Implementing
Section titled “Implementing”Invariants
Section titled “Invariants”-
Concurrent callers.
GetTimeoutsBatchAsyncmust not return the same timeout to two concurrent callers without locking. The built-inInMemoryTimeoutStoreuses aReaderWriterLockSlimand a per-record lease; a SQL implementation should useFOR UPDATE SKIP LOCKED(Postgres) or an equivalent. In clustered setups where multiple bus instances share the same persistent store, honour the optionallockOwneron remove/release so lease ownership is verified before mutating the row. -
Lease ownership. When the caller supplies a
lockOwner, the persistor must throwConcurrencyExceptionif the stored owner has changed (or the lease has expired and been reaped). This is what closes the duplicate-dispatch race in active-active deployments. -
Unconditional path. When
lockOwneris null, the operation is unconditional — a second call on an already-removed row is a silent no-op, matching idempotent retry semantics on transient storage failures.
Lease semantics
Section titled “Lease semantics”Remove and release operations accept an optional lock owner. When supplied, the operation is lease-checked: a worker that no longer holds a valid lease observes ConcurrencyException so the row is not double-dispatched by a second worker that reclaimed it.
Both the Mongo and InMemory persistors honour identical semantics. The LockLeaseDuration setting — MongoDbPersistenceOptions.TimeoutLockLeaseDuration or InMemoryPersistenceOptions.LockLeaseDuration — governs how long a worker may hold a row before the reaper reclaims it. Both default to five minutes.
Operators running the Mongo persistor should call ReapStaleLeasesAsync on a timer at roughly LockLeaseDuration / 4 cadence to reclaim rows held by crashed workers promptly, rather than waiting for the natural GetTimeoutsBatchAsync recovery path.
Skeletal in-memory implementation
Section titled “Skeletal in-memory implementation”The following example is adapted from the InMemoryTimeoutStore shipped with the library (in ServiceConnect.Persistence.InMemory), which is the reference single-instance implementation. It stores timeouts in a sorted set, uses a write lock for all mutations, and reads its lease duration from InMemoryPersistenceOptions so that a crashed dispatcher does not hold records indefinitely. A minimal third-party implementation that does NOT need lease-aware semantics (single-instance only) can ignore the lockOwner parameter and omit ReapStaleLeasesAsync (the DIM default returns 0L):
public sealed class InMemoryTimeoutStore : ITimeoutStore{ private readonly ReaderWriterLockSlim _lock = new(); private readonly SortedSet<TimeoutEntry> _index = new(TimeoutEntryComparer.Instance); private readonly Dictionary<Guid, TimeoutEntry> _byId = new(); private readonly TimeSpan _lockLeaseDuration;
public InMemoryTimeoutStore(InMemoryPersistenceOptions options) { ArgumentNullException.ThrowIfNull(options); if (options.LockLeaseDuration <= TimeSpan.Zero) throw new ArgumentOutOfRangeException(nameof(options), "LockLeaseDuration must be positive."); _lockLeaseDuration = options.LockLeaseDuration; }
public Task InsertTimeoutAsync(TimeoutData timeoutData, CancellationToken cancellationToken = default) { _lock.EnterWriteLock(); try { var entry = new TimeoutEntry(timeoutData.Time, timeoutData.Id, timeoutData); _byId[timeoutData.Id] = entry; _index.Add(entry); } finally { _lock.ExitWriteLock(); } return Task.CompletedTask; }
public Task<TimeoutsBatch> GetTimeoutsBatchAsync(int? batchSize = null, CancellationToken cancellationToken = default) { if (batchSize is not null && batchSize <= 0) throw new ArgumentOutOfRangeException(nameof(batchSize), "batchSize must be greater than zero.");
var batch = new TimeoutsBatch { DueTimeouts = [] }; var utcNow = DateTimeOffset.UtcNow; var cap = batchSize ?? int.MaxValue;
_lock.EnterWriteLock(); try { foreach (var entry in _index) { if (batch.DueTimeouts.Count >= cap) break; if (entry.Time <= utcNow && (!entry.Data.Locked || entry.Data.LockExpiresAt <= utcNow)) { entry.Data.Locked = true; entry.Data.LockExpiresAt = utcNow + _lockLeaseDuration; batch.DueTimeouts.Add(entry.Data); } else if (entry.Time > utcNow) { break; } } } finally { _lock.ExitWriteLock(); }
return Task.FromResult(batch); }
public Task RemoveDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default) { _lock.EnterWriteLock(); try { if (_byId.TryGetValue(id, out var entry)) { _byId.Remove(id); _index.Remove(entry); } } finally { _lock.ExitWriteLock(); } return Task.CompletedTask; }
public Task ReleaseDispatchedTimeoutAsync(Guid id, Guid? lockOwner = null, CancellationToken cancellationToken = default) { _lock.EnterWriteLock(); try { if (_byId.TryGetValue(id, out var entry)) { entry.Data.Locked = false; entry.Data.LockExpiresAt = null; } } finally { _lock.ExitWriteLock(); } return Task.CompletedTask; }}For multi-instance deployments, honour the lockOwner on remove/release: throw ConcurrencyException if the stored owner has changed since the worker claimed the row.
Register a custom ITimeoutStore implementation during bus startup:
services.AddServiceConnect(builder =>{ builder.UseRabbitMQ(transport => transport.Host = "rabbit.internal.example"); builder.AddRegistration(services => { services.AddSingleton<ITimeoutStore, YourTimeoutStore>(); });});MongoDB timeout-store contract
Section titled “MongoDB timeout-store contract”WriteConcern.Unacknowledged rejected
Section titled “WriteConcern.Unacknowledged rejected”MongoDbTimeoutStore requires the IMongoClient to be configured with an acknowledged write concern (w:1 or higher). Construction throws InvalidOperationException for WriteConcern.Unacknowledged. Under w:0, lock-aware delete and release operations silently no-op-succeed — converting a stale-lease no-op into an apparent successful delete and admitting duplicate timeout dispatch.
See Persistence Configuration → MongoDB provider contract for the full guard set across all three Mongo stores.
See also
Section titled “See also”IAggregatorPersistor— related referenceIProcessManagerFinder— related reference- Process Manager — concept