Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ var stateMachine = new StateMachine<State, Trigger>(initialState)

Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains.

### Thread safety
By default, Stateless is **NOT** thread-safe.
`FiringMode.Serial` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks.

Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very problematic behavior, especially in an environment with high load on SM. Consider that the first caller starts processing the event, and then 100 more callers enqueue additional events. The first caller will continue processing all these events, which may take a considerable amount of time and, in extreme cases, may be ongoing indefinitely, blocking the first caller forever.
A better solution, in my opinion, would be to start an internal worker thread that pulls tasks to execute from an internal queue (a dedicated queue for Serial mode, not the _eventQueue). Fire methods will add their events to this queue.
However, this introduces another complexity: do callers expect the Fire(Async) method to return immediately after queuing their tasks, or only after their task has been processed? I think both scenarios have valid usages, and both should be supported by the configuration flag. The blocking scenario can be achieved by enqueuing a complex object that, in addition to the trigger, will also hold TaskCompletionSource instance. The InternalFireSerial(Async) will wait on its Task, and the worker thread will SetResult/SetException/SetCancelled when done/failed/canelled. The sync InternalFireSerial method can call Wait on the TaskCompletionSource.Task without a risk of sync-over-async problems, because this Task is not async, so deadlock is not possible even in environments with SynchronizationContext.
The blocking scenario introduces another problem: the waiting may need to be cancelled, so Fire methods reqiure support for CancellationToken (both sync and async).
And the final one: need a way to stop the worker thread gracefully, so the state machine should now implement I(Async)Disposable.

@tedchirvasiu tedchirvasiu Dec 19, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very problematic behavior, especially in an environment with high load on SM. Consider that the first caller starts processing the event, and then 100 more callers enqueue additional events. The first caller will continue processing all these events, which may take a considerable amount of time and, in extreme cases, may be ongoing indefinitely, blocking the first caller forever.

In the Serial mode, technically no caller is required to await on FireAsync. So you can fire and forget if you are worried about having the initial task blocked, but just as in the case of having a separate worker thread for processing, you lose the ability to try-catch errors (unless you use Task.Run and add the error handling in there too)

Having a dedicated worker thread for processing events would be great, but I think it would pretty much require a totally different architecture than the one that exists. Think about exception handling. If you queue all work on a Task completely separate from your caller Tasks, exception handling would no longer work with a simple try-catch surrounding the call to FireAsync. You would need a separate OnError event or as you suggested, a dedicated TaskCompletionSource for each queued InternalFireOneAsync.

Also, another question is what happens when processing an event throws an error? Do you keep going with the next event? Do you halt and require a separate function call to resume?

In the current FiringMode.Queued implementation the execution halts and it would only resume on the next FireAsync call (since events don't get dropped).

However, this introduces another complexity: do callers expect the Fire(Async) method to return immediately after queuing their tasks, or only after their task has been processed?

In the existing Queued mode, callers can't expect FireAsync to return only after THEIR task has been processed because it returns immediately after queueing if _firing is true. So in Serial mode they could in theory always expect to have the task return as soon as the item has been queued and it would still be consistent.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention in this PR was to propose the simplest form of thread safety without entirely changing the library's design around supporting multi-threading.

Reading the maintainers' comments on the original issue and on other PRs regarding this issue, I got the feeling they are pretty reluctant to implementing Serial mode in fear of overcomplicating the library.

I support the idea of having a dedicated worker thread, but I personally can't think of a simple, elegant and safe enough implementation.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gao-artur This should be fixed in #640


In `FiringMode.Serial`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing.
Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour.
Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default).

## Building

Stateless runs on .NET runtime version 4+ and practically all modern .NET platforms by targeting .NET Framework 4.6.2, .NET Standard 2.0, and .NET 8.0, 9.0 and 10.0. Visual Studio 2017 or later is required to build the solution.
Expand Down
62 changes: 62 additions & 0 deletions src/Stateless/StateMachine.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,74 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args)
case FiringMode.Queued:
await InternalFireQueuedAsync(trigger, args);
break;
case FiringMode.Serial:
await InternalFireSerialAsync(trigger, args);
break;
default:
// If something is completely messed up we let the user know ;-)
throw new InvalidOperationException("The firing mode has not been configured!");
}
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
/// </summary>
/// <param name="trigger"> The trigger. </param>
/// <param name="args"> A variable-length parameters list containing arguments. </param>
async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) {

lock (_serialModeLock)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a regular lock in an async method isn't a best practice as it blocks the thread during the wait for the lock. Consider using SemaphoreSlim(1,1) that supports both sync and async waiting instead.

@tedchirvasiu tedchirvasiu Dec 19, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at SemaphoreSlim's source code.

It also uses a regular lock under the hood. In addition to that, it may cause a thread switch. Considering all the overhead, I really don't think it's best practice or more performant to use a SemahporeSlim for a very short lived lock such as flipping a boolean and pushing an element to a Queue, especially since no deadlocking can occur.

{
// Add trigger to queue
_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });

// If a trigger is already being handled then the trigger will be queued (FIFO) and processed later.
if (_firing)
return;

_firing = true;
}

try
{

// Empty queue for triggers
while (true)
{

QueuedTrigger queuedEvent;

lock (_serialModeLock)
{

if (_eventQueue.Count == 0)
{
_firing = false;
break;
}

queuedEvent = _eventQueue.Dequeue();
}

await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext);
}
}
catch
{

lock (_serialModeLock)
{
if (DropUnprocessedEventsOnErrorInSerialMode)
_eventQueue.Clear();

_firing = false;
}

throw;
}
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
Expand Down
81 changes: 80 additions & 1 deletion src/Stateless/StateMachine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ public enum FiringMode
/// <summary> Use immediate mode when the queuing of trigger events are not needed. Care must be taken when using this mode, as there is no run-to-completion guaranteed.</summary>
Immediate,
/// <summary> Use the queued <c>Fire</c>ing mode when run-to-completion is required. This is the recommended mode.</summary>
Queued
Queued,
/// <summary> Equivalent to Queued mode, but thread-safe for Fire.</summary>
Serial
}

/// <summary>
Expand All @@ -40,9 +42,20 @@ private class QueuedTrigger
public object[] Args { get; set; }
}

private object _serialModeLock = new object();
private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private bool _firing;

/// <summary>
/// If the main processing thread throws an error, unprocessed triggers should be removed from
/// the queue in order to ensure consistency. Otherwise the event queue
/// may still hold unprocessed triggers which would require another
/// Fire() call to resume processing.
/// Set this to true if you need consistent behaviour.
/// Set this to false if you don't want triggers to be dropped.
/// </summary>
public bool DropUnprocessedEventsOnErrorInSerialMode { get; set; } = false;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and other configurations should be immutable. They should be set on StateMachine creation, and their change should not be allowed after that.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about it too, but that would require adding yet another constructor or another parameter to the existing ones.

Is it worth it?

Because I think nobody would be crazy enough to change this while the state machine is running, right?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about crazy users. It's about preventing mistakes by less experienced developers. Is it possible to use init instead of set with supported TFM's?

@tedchirvasiu tedchirvasiu Dec 19, 2025

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think it might work with preprocessor symbols, but I haven't done it before on a project supporting all .Nets. Do you have a working snippet by any chance?

It's not about crazy users. It's about preventing mistakes by less experienced developers

But I think even if someone were to flip this while the SM is running, nothing would blow up.


/// <summary>
/// Construct a state machine with external state storage.
/// </summary>
Expand Down Expand Up @@ -341,12 +354,78 @@ void InternalFire(TTrigger trigger, params object[] args)
case FiringMode.Queued:
InternalFireQueued(trigger, args);
break;
case FiringMode.Serial:
InternalFireSerial(trigger, args);
break;
default:
// If something is completely messed up we let the user know ;-)
throw new InvalidOperationException("The firing mode has not been configured!");
}
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
/// This method is almost equivalent to InternalFireQueued, but it employs simple locks
/// in order to ensure thread-safety.
/// Warning! If processing a trigger throws an unexpected error, unprocessed events will be dropped
/// if DropUnprocessedEventsOnErrorInSerialMode is set to true.
/// </summary>
/// <param name="trigger"> The trigger. </param>
/// <param name="args"> A variable-length parameters list containing arguments. </param>
private void InternalFireSerial(TTrigger trigger, params object[] args) {

lock (_serialModeLock)
{
// Add trigger to queue
_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });

// If a trigger is already being handled then the trigger will be queued (FIFO) and processed later.
if (_firing)
return;

_firing = true;
}

try
{

// Empty queue for triggers
while (true)
{

QueuedTrigger queuedEvent;

lock (_serialModeLock)
{

if (_eventQueue.Count == 0)
{
_firing = false;
break;
}

queuedEvent = _eventQueue.Dequeue();
}

InternalFireOne(queuedEvent.Trigger, queuedEvent.Args);
}
}
catch
{

lock (_serialModeLock)
{
if (DropUnprocessedEventsOnErrorInSerialMode)
_eventQueue.Clear();

_firing = false;
}

throw;
}
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
Expand Down
126 changes: 126 additions & 0 deletions test/Stateless.Tests/SerialModeThreadSafetyFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace Stateless.Tests {
/// <summary>
/// We disable parallelization to ensure other threads are not occupied by other tests.
/// Some tests regarding DropUnprocessedEventsOnErrorInSerialMode need to be added.
/// </summary>
[CollectionDefinition("SerialModeThreadSafetyFixture", DisableParallelization = true)]
public class SerialModeThreadSafetyFixture {

[Fact]
public async Task IncrementIsThreadSafeUnderContentionSync() {

var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial);

int counter = 0;
var startGate = new ManualResetEventSlim(false);

stateMachine.Configure(State.A)
.OnEntry(() => {
counter = counter + 1;
})
.PermitReentry(Trigger.X);

var tasks = Enumerable.Range(0, 16).Select(_ =>
Task.Run(() => {
startGate.Wait();
for (int i = 0; i < 10_000; i++) {
stateMachine.Fire(Trigger.X);
}
})
).ToArray();

startGate.Set();
await Task.WhenAll(tasks);

Assert.Equal(160_000, counter);
}

[Fact]
public async Task IncrementIsThreadSafeUnderContentionAsync() {

var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial);

int counter = 0;
var startGate = new ManualResetEventSlim(false);

stateMachine.Configure(State.A)
.OnEntryAsync(async () => {
await Task.Yield();
counter = counter + 1;
})
.PermitReentry(Trigger.X);

var tasks = Enumerable.Range(0, 16).Select(_ =>
Task.Run(async () => {
startGate.Wait();
for (int i = 0; i < 1_000; i++) {
await stateMachine.FireAsync(Trigger.X);
}
})
).ToArray();

startGate.Set();
await Task.WhenAll(tasks);

Assert.Equal(16_000, counter);
}

[Fact]
public async Task RecursiveFireDoesNotDeadlockSync() {

var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial);

stateMachine.Configure(State.A)
.OnEntry(() => {
stateMachine.Fire(Trigger.Y);
})
.Permit(Trigger.Y, State.B)
.PermitReentry(Trigger.X);

stateMachine.Configure(State.B)
.OnEntry(() => {
stateMachine.Fire(Trigger.Y);
})
.Permit(Trigger.Y, State.C)
.PermitReentry(Trigger.X);

stateMachine.Configure(State.C);

stateMachine.Fire(Trigger.X);

Assert.Equal(State.C, stateMachine.State);
}

[Fact]
public async Task RecursiveFireDoesNotDeadlockAsync() {

var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial);

stateMachine.Configure(State.A)
.OnEntryAsync(async () => {
await stateMachine.FireAsync(Trigger.Y);
})
.Permit(Trigger.Y, State.B)
.PermitReentry(Trigger.X);

stateMachine.Configure(State.B)
.OnEntryAsync(async () => {
await stateMachine.FireAsync(Trigger.Y);
})
.Permit(Trigger.Y, State.C)
.PermitReentry(Trigger.X);

stateMachine.Configure(State.C);

await stateMachine.FireAsync(Trigger.X);

Assert.Equal(State.C, stateMachine.State);
}

}
}