-
Notifications
You must be signed in to change notification settings - Fork 805
Implemented FiringMode.Serial with standard locks #639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -358,6 +358,16 @@ var stateMachine = new StateMachine<State, Trigger>(initialState) | |
|
|
||
| Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains. | ||
|
|
||
| ### Thread safety | ||
| By default, Stateless is **NOT** thread-safe. | ||
| `FiringMode.Sequential` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. | ||
|
|
||
| Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time. | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is very problematic behavior, especially in an environment with high load on SM. Consider that the first caller starts processing the event, and then 100 more callers enqueue additional events. The first caller will continue processing all these events, which may take a considerable amount of time and, in extreme cases, may be ongoing indefinitely, blocking the first caller forever.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
In the Serial mode, technically no caller is required to await on FireAsync. So you can fire and forget if you are worried about having the initial task blocked, but just as in the case of having a separate worker thread for processing, you lose the ability to try-catch errors (unless you use Task.Run and add the error handling in there too) Having a dedicated worker thread for processing events would be great, but I think it would pretty much require a totally different architecture than the one that exists. Think about exception handling. If you queue all work on a Task completely separate from your caller Tasks, exception handling would no longer work with a simple try-catch surrounding the call to FireAsync. You would need a separate OnError event or as you suggested, a dedicated TaskCompletionSource for each queued InternalFireOneAsync. Also, another question is what happens when processing an event throws an error? Do you keep going with the next event? Do you halt and require a separate function call to resume? In the current FiringMode.Queued implementation the execution halts and it would only resume on the next FireAsync call (since events don't get dropped).
In the existing Queued mode, callers can't expect FireAsync to return only after THEIR task has been processed because it returns immediately after queueing if
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The intention in this PR was to propose the simplest form of thread safety without entirely changing the library's design around supporting multi-threading. Reading the maintainers' comments on the original issue and on other PRs regarding this issue, I got the feeling they are pretty reluctant to implementing Serial mode in fear of overcomplicating the library. I support the idea of having a dedicated worker thread, but I personally can't think of a simple, elegant and safe enough implementation.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gao-artur This should be fixed in #640 |
||
|
|
||
| In `FiringMode.Sequential`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. | ||
| Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour. | ||
| Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default). | ||
|
|
||
| ## Building | ||
|
|
||
| Stateless runs on .NET runtime version 4+ and practically all modern .NET platforms by targeting .NET Framework 4.6.2, .NET Standard 2.0, and .NET 8.0, 9.0 and 10.0. Visual Studio 2017 or later is required to build the solution. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -162,12 +162,74 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args) | |
| case FiringMode.Queued: | ||
| await InternalFireQueuedAsync(trigger, args); | ||
| break; | ||
| case FiringMode.Serial: | ||
| await InternalFireSerialAsync(trigger, args); | ||
| break; | ||
| default: | ||
| // If something is completely messed up we let the user know ;-) | ||
| throw new InvalidOperationException("The firing mode has not been configured!"); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Queue events and then fire in order. | ||
| /// If only one event is queued, this behaves identically to the non-queued version. | ||
| /// </summary> | ||
| /// <param name="trigger"> The trigger. </param> | ||
| /// <param name="args"> A variable-length parameters list containing arguments. </param> | ||
| async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) { | ||
|
|
||
| lock (_serialModeLock) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a regular lock in an async method isn't a best practice as it blocks the thread during the wait for the lock. Consider using
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please take a look at SemaphoreSlim's source code. It also uses a regular lock under the hood. In addition to that, it may cause a thread switch. Considering all the overhead, I really don't think it's best practice or more performant to use a SemahporeSlim for a very short lived lock such as flipping a boolean and pushing an element to a Queue, especially since no deadlocking can occur. |
||
| { | ||
| // Add trigger to queue | ||
| _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); | ||
|
|
||
| // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. | ||
| if (_firing) | ||
| return; | ||
|
|
||
| _firing = true; | ||
| } | ||
|
|
||
| try | ||
| { | ||
|
|
||
| // Empty queue for triggers | ||
| while (true) | ||
| { | ||
|
|
||
| QueuedTrigger queuedEvent; | ||
|
|
||
| lock (_serialModeLock) | ||
| { | ||
|
|
||
| if (_eventQueue.Count == 0) | ||
| { | ||
| _firing = false; | ||
| break; | ||
| } | ||
|
|
||
| queuedEvent = _eventQueue.Dequeue(); | ||
| } | ||
|
|
||
| await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext); | ||
| } | ||
| } | ||
| catch | ||
| { | ||
|
|
||
| lock (_serialModeLock) | ||
| { | ||
| if (DropUnprocessedEventsOnErrorInSerialMode) | ||
| _eventQueue.Clear(); | ||
|
|
||
| _firing = false; | ||
| } | ||
|
|
||
| throw; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Queue events and then fire in order. | ||
| /// If only one event is queued, this behaves identically to the non-queued version. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,9 @@ public enum FiringMode | |
| /// <summary> Use immediate mode when the queuing of trigger events are not needed. Care must be taken when using this mode, as there is no run-to-completion guaranteed.</summary> | ||
| Immediate, | ||
| /// <summary> Use the queued <c>Fire</c>ing mode when run-to-completion is required. This is the recommended mode.</summary> | ||
| Queued | ||
| Queued, | ||
| /// <summary> Equivalent to Queued mode, but thread-safe for Fire.</summary> | ||
| Serial | ||
| } | ||
|
|
||
| /// <summary> | ||
|
|
@@ -40,9 +42,20 @@ private class QueuedTrigger | |
| public object[] Args { get; set; } | ||
| } | ||
|
|
||
| private object _serialModeLock = new object(); | ||
| private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>(); | ||
| private bool _firing; | ||
|
|
||
| /// <summary> | ||
| /// If the main processing thread throws an error, unprocessed triggers should be removed from | ||
| /// the queue in order to ensure consistency. Otherwise the event queue | ||
| /// may still hold unprocessed triggers which would require another | ||
| /// Fire() call to resume processing. | ||
| /// Set this to true if you need consistent behaviour. | ||
| /// Set this to false if you don't want triggers to be dropped. | ||
| /// </summary> | ||
| public bool DropUnprocessedEventsOnErrorInSerialMode { get; set; } = false; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this and other configurations should be immutable. They should be set on
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thought about it too, but that would require adding yet another constructor or another parameter to the existing ones. Is it worth it? Because I think nobody would be crazy enough to change this while the state machine is running, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not about crazy users. It's about preventing mistakes by less experienced developers. Is it possible to use
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, I think it might work with preprocessor symbols, but I haven't done it before on a project supporting all .Nets. Do you have a working snippet by any chance?
But I think even if someone were to flip this while the SM is running, nothing would blow up. |
||
|
|
||
| /// <summary> | ||
| /// Construct a state machine with external state storage. | ||
| /// </summary> | ||
|
|
@@ -341,12 +354,78 @@ void InternalFire(TTrigger trigger, params object[] args) | |
| case FiringMode.Queued: | ||
| InternalFireQueued(trigger, args); | ||
| break; | ||
| case FiringMode.Serial: | ||
| InternalFireSerial(trigger, args); | ||
| break; | ||
| default: | ||
| // If something is completely messed up we let the user know ;-) | ||
| throw new InvalidOperationException("The firing mode has not been configured!"); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Queue events and then fire in order. | ||
| /// If only one event is queued, this behaves identically to the non-queued version. | ||
| /// This method is almost equivalent to InternalFireQueued, but it employs simple locks | ||
| /// in order to ensure thread-safety. | ||
| /// Warning! If processing a trigger throws an unexpected error, unprocessed events will be dropped | ||
| /// if DropUnprocessedEventsOnErrorInSerialMode is set to true. | ||
| /// </summary> | ||
| /// <param name="trigger"> The trigger. </param> | ||
| /// <param name="args"> A variable-length parameters list containing arguments. </param> | ||
| private void InternalFireSerial(TTrigger trigger, params object[] args) { | ||
|
|
||
| lock (_serialModeLock) | ||
| { | ||
| // Add trigger to queue | ||
| _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); | ||
|
|
||
| // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. | ||
| if (_firing) | ||
| return; | ||
|
|
||
| _firing = true; | ||
| } | ||
|
|
||
| try | ||
| { | ||
|
|
||
| // Empty queue for triggers | ||
| while (true) | ||
| { | ||
|
|
||
| QueuedTrigger queuedEvent; | ||
|
|
||
| lock (_serialModeLock) | ||
| { | ||
|
|
||
| if (_eventQueue.Count == 0) | ||
| { | ||
| _firing = false; | ||
| break; | ||
| } | ||
|
|
||
| queuedEvent = _eventQueue.Dequeue(); | ||
| } | ||
|
|
||
| InternalFireOne(queuedEvent.Trigger, queuedEvent.Args); | ||
| } | ||
| } | ||
| catch | ||
| { | ||
|
|
||
| lock (_serialModeLock) | ||
| { | ||
| if (DropUnprocessedEventsOnErrorInSerialMode) | ||
| _eventQueue.Clear(); | ||
|
|
||
| _firing = false; | ||
| } | ||
|
|
||
| throw; | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Queue events and then fire in order. | ||
| /// If only one event is queued, this behaves identically to the non-queued version. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,126 @@ | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Xunit; | ||
|
|
||
| namespace Stateless.Tests { | ||
| /// <summary> | ||
| /// We disable parallelization to ensure other threads are not occupied by other tests. | ||
| /// Some tests regarding DropUnprocessedEventsOnErrorInSerialMode need to be added. | ||
| /// </summary> | ||
| [CollectionDefinition("SerialModeThreadSafetyFixture", DisableParallelization = true)] | ||
| public class SerialModeThreadSafetyFixture { | ||
|
|
||
| [Fact] | ||
| public async Task IncrementIsThreadSafeUnderContentionSync() { | ||
|
|
||
| var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial); | ||
|
|
||
| int counter = 0; | ||
| var startGate = new ManualResetEventSlim(false); | ||
|
|
||
| stateMachine.Configure(State.A) | ||
| .OnEntry(() => { | ||
| counter = counter + 1; | ||
| }) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| var tasks = Enumerable.Range(0, 16).Select(_ => | ||
| Task.Run(() => { | ||
| startGate.Wait(); | ||
| for (int i = 0; i < 10_000; i++) { | ||
| stateMachine.Fire(Trigger.X); | ||
| } | ||
| }) | ||
| ).ToArray(); | ||
|
|
||
| startGate.Set(); | ||
| await Task.WhenAll(tasks); | ||
|
|
||
| Assert.Equal(160_000, counter); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task IncrementIsThreadSafeUnderContentionAsync() { | ||
|
|
||
| var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial); | ||
|
|
||
| int counter = 0; | ||
| var startGate = new ManualResetEventSlim(false); | ||
|
|
||
| stateMachine.Configure(State.A) | ||
| .OnEntryAsync(async () => { | ||
| await Task.Yield(); | ||
| counter = counter + 1; | ||
| }) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| var tasks = Enumerable.Range(0, 16).Select(_ => | ||
| Task.Run(async () => { | ||
| startGate.Wait(); | ||
| for (int i = 0; i < 1_000; i++) { | ||
| await stateMachine.FireAsync(Trigger.X); | ||
| } | ||
| }) | ||
| ).ToArray(); | ||
|
|
||
| startGate.Set(); | ||
| await Task.WhenAll(tasks); | ||
|
|
||
| Assert.Equal(16_000, counter); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task RecursiveFireDoesNotDeadlockSync() { | ||
|
|
||
| var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial); | ||
|
|
||
| stateMachine.Configure(State.A) | ||
| .OnEntry(() => { | ||
| stateMachine.Fire(Trigger.Y); | ||
| }) | ||
| .Permit(Trigger.Y, State.B) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| stateMachine.Configure(State.B) | ||
| .OnEntry(() => { | ||
| stateMachine.Fire(Trigger.Y); | ||
| }) | ||
| .Permit(Trigger.Y, State.C) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| stateMachine.Configure(State.C); | ||
|
|
||
| stateMachine.Fire(Trigger.X); | ||
|
|
||
| Assert.Equal(State.C, stateMachine.State); | ||
| } | ||
|
|
||
| [Fact] | ||
| public async Task RecursiveFireDoesNotDeadlockAsync() { | ||
|
|
||
| var stateMachine = new StateMachine<State, Trigger>(State.A, FiringMode.Serial); | ||
|
|
||
| stateMachine.Configure(State.A) | ||
| .OnEntryAsync(async () => { | ||
| await stateMachine.FireAsync(Trigger.Y); | ||
| }) | ||
| .Permit(Trigger.Y, State.B) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| stateMachine.Configure(State.B) | ||
| .OnEntryAsync(async () => { | ||
| await stateMachine.FireAsync(Trigger.Y); | ||
| }) | ||
| .Permit(Trigger.Y, State.C) | ||
| .PermitReentry(Trigger.X); | ||
|
|
||
| stateMachine.Configure(State.C); | ||
|
|
||
| await stateMachine.FireAsync(Trigger.X); | ||
|
|
||
| Assert.Equal(State.C, stateMachine.State); | ||
| } | ||
|
|
||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.