diff --git a/README.md b/README.md index 0913b092..f987ffc8 100644 --- a/README.md +++ b/README.md @@ -358,6 +358,16 @@ var stateMachine = new StateMachine(initialState) Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains. +### Thread safety +By default, Stateless is **NOT** thread-safe. +`FiringMode.Serial` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. + +Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time. + +In `FiringMode.Serial`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. +Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour. +Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default). + ## Building Stateless runs on .NET runtime version 4+ and practically all modern .NET platforms by targeting .NET Framework 4.6.2, .NET Standard 2.0, and .NET 8.0, 9.0 and 10.0. Visual Studio 2017 or later is required to build the solution. diff --git a/src/Stateless/StateMachine.Async.cs b/src/Stateless/StateMachine.Async.cs index 685c751e..ca230cb5 100644 --- a/src/Stateless/StateMachine.Async.cs +++ b/src/Stateless/StateMachine.Async.cs @@ -162,12 +162,74 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args) case FiringMode.Queued: await InternalFireQueuedAsync(trigger, args); break; + case FiringMode.Serial: + await InternalFireSerialAsync(trigger, args); + break; default: // If something is completely messed up we let the user know ;-) throw new InvalidOperationException("The firing mode has not been configured!"); } } + /// + /// Queue events and then fire in order. + /// If only one event is queued, this behaves identically to the non-queued version. + /// + /// The trigger. + /// A variable-length parameters list containing arguments. + async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) { + + lock (_serialModeLock) + { + // Add trigger to queue + _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. + if (_firing) + return; + + _firing = true; + } + + try + { + + // Empty queue for triggers + while (true) + { + + QueuedTrigger queuedEvent; + + lock (_serialModeLock) + { + + if (_eventQueue.Count == 0) + { + _firing = false; + break; + } + + queuedEvent = _eventQueue.Dequeue(); + } + + await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext); + } + } + catch + { + + lock (_serialModeLock) + { + if (DropUnprocessedEventsOnErrorInSerialMode) + _eventQueue.Clear(); + + _firing = false; + } + + throw; + } + } + /// /// Queue events and then fire in order. /// If only one event is queued, this behaves identically to the non-queued version. diff --git a/src/Stateless/StateMachine.cs b/src/Stateless/StateMachine.cs index 999297e2..855ba711 100644 --- a/src/Stateless/StateMachine.cs +++ b/src/Stateless/StateMachine.cs @@ -14,7 +14,9 @@ public enum FiringMode /// Use immediate mode when the queuing of trigger events are not needed. Care must be taken when using this mode, as there is no run-to-completion guaranteed. Immediate, /// Use the queued Fireing mode when run-to-completion is required. This is the recommended mode. - Queued + Queued, + /// Equivalent to Queued mode, but thread-safe for Fire. + Serial } /// @@ -40,9 +42,20 @@ private class QueuedTrigger public object[] Args { get; set; } } + private object _serialModeLock = new object(); private readonly Queue _eventQueue = new Queue(); private bool _firing; + /// + /// If the main processing thread throws an error, unprocessed triggers should be removed from + /// the queue in order to ensure consistency. Otherwise the event queue + /// may still hold unprocessed triggers which would require another + /// Fire() call to resume processing. + /// Set this to true if you need consistent behaviour. + /// Set this to false if you don't want triggers to be dropped. + /// + public bool DropUnprocessedEventsOnErrorInSerialMode { get; set; } = false; + /// /// Construct a state machine with external state storage. /// @@ -341,12 +354,78 @@ void InternalFire(TTrigger trigger, params object[] args) case FiringMode.Queued: InternalFireQueued(trigger, args); break; + case FiringMode.Serial: + InternalFireSerial(trigger, args); + break; default: // If something is completely messed up we let the user know ;-) throw new InvalidOperationException("The firing mode has not been configured!"); } } + /// + /// Queue events and then fire in order. + /// If only one event is queued, this behaves identically to the non-queued version. + /// This method is almost equivalent to InternalFireQueued, but it employs simple locks + /// in order to ensure thread-safety. + /// Warning! If processing a trigger throws an unexpected error, unprocessed events will be dropped + /// if DropUnprocessedEventsOnErrorInSerialMode is set to true. + /// + /// The trigger. + /// A variable-length parameters list containing arguments. + private void InternalFireSerial(TTrigger trigger, params object[] args) { + + lock (_serialModeLock) + { + // Add trigger to queue + _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. + if (_firing) + return; + + _firing = true; + } + + try + { + + // Empty queue for triggers + while (true) + { + + QueuedTrigger queuedEvent; + + lock (_serialModeLock) + { + + if (_eventQueue.Count == 0) + { + _firing = false; + break; + } + + queuedEvent = _eventQueue.Dequeue(); + } + + InternalFireOne(queuedEvent.Trigger, queuedEvent.Args); + } + } + catch + { + + lock (_serialModeLock) + { + if (DropUnprocessedEventsOnErrorInSerialMode) + _eventQueue.Clear(); + + _firing = false; + } + + throw; + } + } + /// /// Queue events and then fire in order. /// If only one event is queued, this behaves identically to the non-queued version. diff --git a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs new file mode 100644 index 00000000..60eafaf9 --- /dev/null +++ b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs @@ -0,0 +1,126 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Stateless.Tests { + /// + /// We disable parallelization to ensure other threads are not occupied by other tests. + /// Some tests regarding DropUnprocessedEventsOnErrorInSerialMode need to be added. + /// + [CollectionDefinition("SerialModeThreadSafetyFixture", DisableParallelization = true)] + public class SerialModeThreadSafetyFixture { + + [Fact] + public async Task IncrementIsThreadSafeUnderContentionSync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + int counter = 0; + var startGate = new ManualResetEventSlim(false); + + stateMachine.Configure(State.A) + .OnEntry(() => { + counter = counter + 1; + }) + .PermitReentry(Trigger.X); + + var tasks = Enumerable.Range(0, 16).Select(_ => + Task.Run(() => { + startGate.Wait(); + for (int i = 0; i < 10_000; i++) { + stateMachine.Fire(Trigger.X); + } + }) + ).ToArray(); + + startGate.Set(); + await Task.WhenAll(tasks); + + Assert.Equal(160_000, counter); + } + + [Fact] + public async Task IncrementIsThreadSafeUnderContentionAsync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + int counter = 0; + var startGate = new ManualResetEventSlim(false); + + stateMachine.Configure(State.A) + .OnEntryAsync(async () => { + await Task.Yield(); + counter = counter + 1; + }) + .PermitReentry(Trigger.X); + + var tasks = Enumerable.Range(0, 16).Select(_ => + Task.Run(async () => { + startGate.Wait(); + for (int i = 0; i < 1_000; i++) { + await stateMachine.FireAsync(Trigger.X); + } + }) + ).ToArray(); + + startGate.Set(); + await Task.WhenAll(tasks); + + Assert.Equal(16_000, counter); + } + + [Fact] + public async Task RecursiveFireDoesNotDeadlockSync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + stateMachine.Configure(State.A) + .OnEntry(() => { + stateMachine.Fire(Trigger.Y); + }) + .Permit(Trigger.Y, State.B) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.B) + .OnEntry(() => { + stateMachine.Fire(Trigger.Y); + }) + .Permit(Trigger.Y, State.C) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.C); + + stateMachine.Fire(Trigger.X); + + Assert.Equal(State.C, stateMachine.State); + } + + [Fact] + public async Task RecursiveFireDoesNotDeadlockAsync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + stateMachine.Configure(State.A) + .OnEntryAsync(async () => { + await stateMachine.FireAsync(Trigger.Y); + }) + .Permit(Trigger.Y, State.B) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.B) + .OnEntryAsync(async () => { + await stateMachine.FireAsync(Trigger.Y); + }) + .Permit(Trigger.Y, State.C) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.C); + + await stateMachine.FireAsync(Trigger.X); + + Assert.Equal(State.C, stateMachine.State); + } + + } +}