diff --git a/.github/workflows/dotnet-build.yml b/.github/workflows/dotnet-build.yml index d1794a4..5d148cc 100644 --- a/.github/workflows/dotnet-build.yml +++ b/.github/workflows/dotnet-build.yml @@ -8,7 +8,16 @@ jobs: runs-on: windows-latest steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v6 + - name: Setup .NET + uses: actions/setup-dotnet@v5 + with: + dotnet-version: | + 3.1.x + 6.0.x + 8.0.x + 9.0.x + 10.0.x - name: Build with dotnet run: dotnet build --configuration Release diff --git a/.github/workflows/nuget-tag-publish.yml b/.github/workflows/nuget-tag-publish.yml index 2a0cc6f..5b25e8e 100644 --- a/.github/workflows/nuget-tag-publish.yml +++ b/.github/workflows/nuget-tag-publish.yml @@ -11,7 +11,19 @@ jobs: runs-on: windows-latest steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v6 + - name: Setup .NET + uses: actions/setup-dotnet@v5 + with: + dotnet-version: | + 3.1.x + 6.0.x + 8.0.x + 9.0.x + 10.0.x + + - name: Add .NET global tools to PATH + run: echo "$HOME/.dotnet/tools" >> $GITHUB_PATH - name: Install dotnet tool run: dotnet tool install -g dotnetCampus.TagToVersion @@ -24,16 +36,7 @@ jobs: dotnet build --configuration Release dotnet pack --configuration Release --no-build - - name: Install Nuget - uses: nuget/setup-nuget@v1 - with: - nuget-version: '5.x' - - - name: Add private GitHub registry to NuGet - run: | - nuget sources add -name github -Source https://nuget.pkg.github.com/dotnet-campus/index.json -Username dotnet-campus -Password ${{ secrets.GITHUB_TOKEN }} - - - name: Push generated package to GitHub registry + - name: Publish run: | - nuget push .\bin\Release\*.nupkg -Source github -SkipDuplicate - nuget push .\bin\Release\*.nupkg -Source https://api.nuget.org/v3/index.json -SkipDuplicate -ApiKey ${{ secrets.NugetKey }} + dotnet nuget push ".\bin\Release\*.nupkg" --api-key ${{ secrets.NugetKey }} --source https://api.nuget.org/v3/index.json --skip-duplicate + dotnet nuget push ".\bin\Release\*.nupkg" --api-key ${{ secrets.GITHUB_TOKEN }} --source https://nuget.pkg.github.com/${{ github.repository_owner }} --skip-duplicate \ No newline at end of file diff --git a/Directory.Build.props b/Directory.Build.props index d0c79f5..63c7f8d 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -12,7 +12,8 @@ git MIT - Copyright © 2023 dotnet campus, All Rights Reserved. + $([System.DateTime]::Now.ToString(`yyyy`)) + Copyright © 2023-$(ThisYear) dotnet campus, All Rights Reserved. README.md diff --git a/LightWorkFlowManager.sln b/LightWorkFlowManager.sln index 0dd978a..2983bd3 100644 --- a/LightWorkFlowManager.sln +++ b/LightWorkFlowManager.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 17 -VisualStudioVersion = 17.6.33723.286 +# Visual Studio Version 18 +VisualStudioVersion = 18.3.11520.95 d18.3 MinimumVisualStudioVersion = 15.0.26124.0 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{5D196596-756D-45C2-8A05-C8E4AB8A36E6}" EndProject @@ -9,6 +9,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LightWorkFlowManager", "src EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "LightWorkFlowManager.Tests", "src\LightWorkFlowManager.Tests\LightWorkFlowManager.Tests.csproj", "{0B617CB5-1D30-47FC-B914-066A6D5B1D63}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{D0518ACA-BAE0-4F24-8A0B-89B7913BEBD5}" + ProjectSection(SolutionItems) = preProject + Directory.Build.props = Directory.Build.props + README.md = README.md + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -31,4 +37,7 @@ Global {F7ED61F4-920C-49EB-8DC1-74B2BE6AF272} = {5D196596-756D-45C2-8A05-C8E4AB8A36E6} {0B617CB5-1D30-47FC-B914-066A6D5B1D63} = {5D196596-756D-45C2-8A05-C8E4AB8A36E6} EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {486FE7AF-E46F-4BC4-95A0-D5013F5A2F82} + EndGlobalSection EndGlobal diff --git a/README.en-US.md b/README.en-US.md new file mode 100644 index 0000000..8116625 --- /dev/null +++ b/README.en-US.md @@ -0,0 +1,272 @@ +# LightWorkFlowManager +## Lightweight Work Process Management + +### What are the features of this workflow management library? +- Supports chain calls, allowing you to write the entire workflow serially without additional branch judgment logic +- Automatic input/output parameter passing, supports both manual and explicit assignment modes +- Built-in auxiliary mechanisms: + - Automatic failure retry + - Automatic error propagation + - Built-in error code system +- Observable execution status via the `IWorkerRunMonitor` mechanism +- Highly customizable: + - Overridable custom context management + - Overridable worker execution management + - Overridable exception capture and logging + +To avoid conflicts with existing code, the base Worker type is named `MessageWorker` in this library. + +--- + +## Usage +### 1. Create a MessageWorkerManager instance +```csharp +// Each task is assigned a unique TaskId +string taskId = Guid.NewGuid().ToString(); +// Tasks of the same type share the same name, e.g. for PPT parsing tasks +string taskName = "PPT Parsing"; +// Provide DI service scope +IServiceScope serviceScope = serviceProvider.CreateScope(); + +var workerManager = new MessageWorkerManager(taskId, taskName, serviceScope); +``` + +### 2. Define MessageWorker workers +```csharp +record InputType(); + +record OutputType(); + +class FooWorker : MessageWorker +{ + protected override ValueTask> DoInnerAsync(InputType input) + { + // Your business logic here + } +} +``` + +Each worker can declare its input and output types. The input type will be automatically injected by the framework, and the output type will be automatically stored in the framework context. + +> Please make sure to register your defined workers to the DI container. We recommend using `AddTransient` (transient injection) or `AddScoped` (scoped injection). **Do NOT use `AddSingleton` (singleton injection)** to avoid state confusion across multiple workflow runs. + +### 3. Execute workers +```csharp +var result = await workerManager + .GetWorker() + .RunAsync(); +``` + +The above code can also be abbreviated as: +```csharp +var result = await workerManager.RunWorker(); +``` + +--- + +## Mechanisms & Features +### Worker Parameters +Worker parameters are read from the `IWorkerContext` managed by `MessageWorkerManager`. The return value type of each worker will be automatically set to the `IWorkerContext`, so the output of the previous worker can be automatically used as the input of the next worker. + +You can also set context information explicitly via `SetContext` in any worker. + +You can manually pass input parameters when starting to execute a worker, as shown in the following examples: +```csharp +// Example 1: Get the worker first, then pass the parameter to the execution method +await Manager + .GetWorker() + .RunAsync(new InputType()); + +// Example 2: Set parameters via SetContext before executing the worker +await Manager + .SetContext(new InputType()) + .RunWorker(); +``` + +If you need to convert parameters between the input and output of different workers, you can also pass a conversion delegate to `SetContext`: +```csharp +// The following example converts the Foo1Type in the current context to the Foo2Type required by FooWorker +await Manager + .SetContext((Foo1Type foo1) => ConvertFoo1ToFoo2(foo1)) + .RunWorker(); +``` + +#### Chained Calls +In actual business scenarios, you usually need to call multiple workers continuously, using the output of the previous step as the input of the next step. The framework supports concatenating the entire call process into a clear chain. + +Example: +```csharp +WorkerResult step1Result = await messageWorkerManager + .GetWorker() + .RunAsync(new Info1()); + +WorkerResult step2Result = await messageWorkerManager + .GetWorker() + .RunAsync(step1Result); + +// Assume there is no Worker3 in between, we can convert parameters manually +WorkerResult info4Result = step2Result.Convert((Info3 info3) => new Info4()); + +WorkerResult step3Result = await messageWorkerManager + .GetWorker() + .RunAsync(info4Result); + +// Note: Assume Worker5 returns a failed result +WorkerResult step4Result = await messageWorkerManager + .GetWorker() + .RunAsync(step3Result); + +// Even if Worker5 fails, its result can still be passed to Worker6 as input. This design eliminates a large amount of error judgment branch code, ensuring the entire workflow chain is written serially. When receiving an input with a failure status, `Worker6` will not actually be executed, and the error will be directly propagated further +WorkerResult step5Result = await messageWorkerManager + .GetWorker() + .RunAsync(step4Result); + +// The final result of the chain is consistent with the MessageWorkerStatus +Assert.AreEqual(messageWorkerManager.MessageWorkerStatus.Status, step5Result.ErrorCode); +// MessageWorkerStatus also records the information of the first failed worker +// As shown below, the final result step5Result is output by Worker6, but the recorded failed worker information is still the first failed Worker5 +Assert.AreEqual(nameof(Worker5), messageWorkerManager.MessageWorkerStatus.FailWorker?.WorkerName); +``` + +In the `RunAsync` method, you can either pass an explicit input parameter like `RunAsync(new Info1())`, or directly pass the result of the previous step like `RunAsync(step1Result)`, which makes the code for concatenating multiple workers very natural. + +If the input/output types do not match in a certain step, you can perform a conversion first before continuing execution, just like the `step2Result.Convert((Info3 info3) => new Info4())` in the above example. + +When a worker fails to execute, subsequent links can still get a `WorkerResult` object. For example, after `Worker5` returns a failure, you can still get `WorkerResult step4Result`, but this result already carries a failure status. When it is passed to `Worker6` for execution, `Worker6` will not be actually executed, and the error will be directly propagated further. + +Definition of the sample workers in the above example: +```csharp +class Worker1 : MessageWorker +{ + protected override async ValueTask> DoInnerAsync(Info1 input) + { + await Task.CompletedTask; + return new Info2(); + } +} + +class Worker2 : MessageWorker +{ + protected override async ValueTask> DoInnerAsync(Info2 input) + { + await Task.CompletedTask; + return new Info3(); + } +} + +class Worker4 : MessageWorker +{ + protected override async ValueTask> DoInnerAsync(Info4 input) + { + await Task.CompletedTask; + return new Info5(); + } +} + +class Worker5 : MessageWorker +{ + protected override async ValueTask> DoInnerAsync(Info5 input) + { + await Task.CompletedTask; + return Fail(new WorkFlowErrorCode(123, "The error message"), canRetry: false); + } +} + +class Worker6 : MessageWorker +{ + protected override async ValueTask> DoInnerAsync(Info6 input) + { + await Task.CompletedTask; + return new Info7(); + } +} + +record Info1(); +record Info2(); +record Info3(); +record Info4(); +record Info5(); +record Info6(); +record Info7(); +``` + +--- + +### Exception Interruption and Retry +Each worker can return a `WorkerResult` type value, which tells the framework whether the current worker executed successfully. When execution fails, you can assign an error code for easy debugging and output, and specify whether the framework needs to retry. + +There are two ways to interrupt the execution of subsequent workers: +1. Return a `WorkerResult` with a failed status. Once the worker manager is in `IsFail` state, it will block the execution of all workers that are not marked with `CanRunWhenFail = true`. In other words, except for workers that need to run regardless of success or failure status, all other workers will not be executed, including the delegate conversion in `SetContext`. +2. Throw an exception, which will stop all subsequent logic execution according to .NET's exception mechanism. + +Both methods are recommended by the framework. For performance-sensitive scenarios, method 1 is preferred. For complex business logic with a large amount of code outside the workflow, method 2 allows for more convenient interruption. + +--- + +### Execute Other Workers Inside a Worker +You can execute other workers inside a worker, which allows you to implement branch logic and nested execution flexibly. + +Example: +First define Worker2: +```csharp +class Worker2 : MessageWorker +{ + protected override ValueTask> DoInnerAsync(InputType input) + { + return SuccessTask(new OutputType()); + } +} + +record InputType(); +record OutputType(); +``` + +Then execute Worker2 inside Worker1: +```csharp +class Worker1 : MessageWorkerBase +{ + public override async ValueTask Do(IWorkerContext context) + { + await Manager + .GetWorker() + .RunAsync(new InputType()); + + return WorkerResult.Success(); + } +} +``` + +--- + +### Delegate Worker +For very small and lightweight logic that you want to add to the workflow without defining a separate worker, you can use a delegate worker: +```csharp +var delegateMessageWorker = new DelegateMessageWorker(_ => +{ + // Your business logic here +}); + +var result = await messageWorkerManager.RunWorker(delegateMessageWorker); +``` + +If you don't even want to create a `DelegateMessageWorker` instance, you can directly pass a delegate to the `RunWorker` method of `MessageWorkerManager`: +```csharp +await messageWorkerManager.RunWorker((IWorkerContext context) => +{ + // Your business logic here +}); +``` + +--- + +### Built-in Mechanisms +In addition to standardizing the execution method of workers, `MessageWorker` also has built-in mechanisms that you can use directly when writing workers to reduce boilerplate code: +- Customizable worker name: By default, `WorkerName` uses the current type name. If you want a clearer name in logs or debugging, you can override the `WorkerName` property. +- Configurable retry support: `CanRetry` defaults to `true`. If a worker is not suitable for repeated execution after failure, you can set it to `false`. To specify retry permission precisely for a single failure, you can also return the result via `Fail(errorCode, canRetry)` or `FailTask(errorCode, canRetry)`. +- Configurable execution on upstream failure: `CanRunWhenFail` defaults to `false`. If the worker is used for finalization, logging, resource cleanup and other scenarios that need to run regardless of upstream status, you can set it to `true`. +- Multiple output support: You can write additional results to the context via `SetContext` for subsequent workers to read, no need to rely on only a single output value. +- Simplified success return: If execution succeeds and there is no return value, you can directly call `Success()` to return a success result. If there is no real asynchronous logic in the execution method, you can also call `SuccessTask()` directly to get the return value. +- Simplified failure return: If execution fails, you can call `Fail()` or `FailTask()` to quickly return a failure result. +- Generic worker helper methods: For types inheriting from `MessageWorker`, you can use `Success(TOutput output)` to directly return a success result with output; if you get a failure result from another call, you can convert it to a failure result of the current output type via `WorkerResult Fail(WorkerResult failResult)`. +- Failure result conversion: You can use the `WorkerResult.AsFail` method to convert a failure state to a failure state of another type, while retaining information such as error code and retry permission. +- Dispose registration: You can use the `RegisterOnDispose` series of methods to register logic to be executed when the entire `MessageWorkerManager` is disposed, such as cleaning temporary files, deleting folders, or performing other finalization work. \ No newline at end of file diff --git a/README.md b/README.md index 4c1769c..3d50e29 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -# LightWorkFlowManager +# LightWorkFlowManager + +| [中文文档](./README.md) | [English Document](./README.en-US.md) | 轻量的工作过程管理 @@ -11,6 +13,20 @@ 世界上有许多现有的工作过程管理,那为什么还重新造了一个?没啥原因,自己造的用的顺手 +本工作过程管理库有什么特色? + +- 支持链调用,可全程串行编写工作过程,无需分支判断逻辑 +- 自动输入输出参数传递,可手动可自动 +- 内建各种辅助机制: + - 自动失败重试 + - 自动错误传递 + - 内建错误码机制 +- 可观测执行状态: IWorkerRunMonitor 机制 +- 高定制模式 + - 可重写定制上下文管理 + - 可重写工作器执行管理 + - 可重写异常捕获与记录 + 为了不和旧代码冲突,这里命名 Worker 为 MessageWorker 类型 ## 使用方法 @@ -46,6 +62,8 @@ class FooWorker : MessageWorker 每个工作器都可以声明其输入和输出类型,输入类型将会自动由框架注入,输出类型将会自动存储到框架里面 +请确保定义的工作器注册到容器里面,推荐使用 `AddTransient` 做瞬时注入或 `AddScoped` 做作用域注入;最好**不要**使用 `AddSingleton` 做单例注入,避免多次工作流状态混乱 + 3、 执行 Worker 工作器 ```csharp @@ -92,6 +110,112 @@ class FooWorker : MessageWorker .RunWorker(); ``` +#### 链路调用 + +在实际业务里,通常会连续调用多个 Worker 工作器,将上一步的输出作为下一步的输入。框架支持将整个调用过程串起来,形成一个清晰的链路。 + +如以下例子: + +```csharp + WorkerResult step1Result = await messageWorkerManager + .GetWorker() + .RunAsync(new Info1()); + + WorkerResult step2Result = await messageWorkerManager + .GetWorker() + .RunAsync(step1Result); + + // 假定中间没有 Worker3 ,此时可以手动转换参数 + WorkerResult info4Result = step2Result.Convert((Info3 info3) => new Info4()); + + WorkerResult step3Result = await messageWorkerManager + .GetWorker() + .RunAsync(info4Result); + + // 注: 假定在 Worker5 里面将返回失败结果 + WorkerResult step4Result = await messageWorkerManager + .GetWorker() + .RunAsync(step3Result); + + // 尽管 Worker5 失败了,但是其结果依然可以传递给 Worker6 作为入参。这样的设计可以减少大量的错误判断分支代码,尽量确保整个工作链路串行编写。收到带上失败状态的入参时, 不会真的执行 `Worker6` 而是直接将错误继续往后传递 + WorkerResult step5Result = await messageWorkerManager + .GetWorker() + .RunAsync(step4Result); + + // 链路最后拿到的结果,会和 MessageWorkerStatus 状态保持一致 + Assert.AreEqual(messageWorkerManager.MessageWorkerStatus.Status, step5Result.ErrorCode); + // 同时在 MessageWorkerStatus 也会记录最初失败的 Worker 信息 + // 如下所示,最后一步的结果是由 Worker6 输出的 `step5Result`,但记录的失败工作器信息依然是最初失败的 Worker5 信息 + Assert.AreEqual(nameof(Worker5), messageWorkerManager.MessageWorkerStatus.FailWorker?.WorkerName); +``` + +在 `RunAsync` 方法里面,既可以传入明确的输入参数类型,如 `RunAsync(new Info1())` ,也可以直接传入上一步执行得到的结果类型,如 `RunAsync(step1Result)` 。这样在串联多个 Worker 时,代码会比较自然。 + +如果中间某一步的输入输出类型对不上,可以和上文工作器参数一节一样,先进行一次转换,再继续往下执行。以上例子里的 `step2Result.Convert((Info3 info3) => new Info4())` 就是如此。 + +当某一个 Worker 执行失败时,后续链路依然可以继续拿到一个 `WorkerResult` 对象。例如 `Worker5` 返回失败之后,依然可以拿到 `WorkerResult step4Result` 。但这个结果已经带上失败状态,再继续传给 `Worker6` 执行时,`Worker6` 不会真的被执行,而是直接将错误继续往后传递。 + +因此链路最后拿到的结果,会和 `messageWorkerManager.MessageWorkerStatus.Status` 保持一致。也就是可以通过 `Assert.AreEqual(messageWorkerManager.MessageWorkerStatus.Status, step5Result.ErrorCode);` 这样的方式,确认最终结果就是当前工作流管理器记录下来的状态。 + +同时,在 `messageWorkerManager.MessageWorkerStatus.FailWorker` 里面,还会记录最初失败的是哪一个 Worker 。如以上例子,在 `Worker5` 首次失败之后,即可通过 `Assert.AreEqual(nameof(Worker5), messageWorkerManager.MessageWorkerStatus.FailWorker?.WorkerName);` 确认失败工作器信息。 + +以下是各示例工作器类型的代码定义: + +```csharp + class Worker1 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info1 input) + { + await Task.CompletedTask; + return new Info2(); + } + } + + class Worker2 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info2 input) + { + await Task.CompletedTask; + return new Info3(); + } + } + + class Worker4 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info4 input) + { + await Task.CompletedTask; + return new Info5(); + } + } + + class Worker5 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info5 input) + { + await Task.CompletedTask; + return Fail(new WorkFlowErrorCode(123, "The error message"), canRetry: false); + } + } + + class Worker6 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info6 input) + { + await Task.CompletedTask; + return new Info7(); + } + } + + record Info1(); + record Info2(); + record Info3(); + record Info4(); + record Info5(); + record Info6(); + record Info7(); +``` + ### 异常中断和重试 每个 Worker 都可以返回 WorkerResult 类型的返回值,可以在返回值里面告知框架层是否当前的 Worker 执行成功。在执行失败时,可以赋值错误码,方便定位调试或输出。在执行失败时,可以返回告知框架层是否需要重试 @@ -164,6 +288,22 @@ class FooWorker : MessageWorker }); ``` +### 内建机制 + + + +`MessageWorker` 除了约定工作器的执行方式之外,还内建了一些在自己编写工作器时可直接使用的机制,用来减少样板代码。 + +- 可重写工作器名称。默认情况下,`WorkerName` 使用当前类型名,如果希望在日志或调试时显示更明确的名称,可以重写 `WorkerName` 属性。 +- 可以为工作器设置自己能否被重试。`CanRetry` 默认值为 `true`,如某个工作器失败后不适合重复执行,可以自行设置为 `false`。如果要在某次失败时精确指定是否允许重试,也可以通过 `Fail(errorCode, canRetry)` 或 `FailTask(errorCode, canRetry)` 返回结果。 +- 可设置当前置工作器执行失败时,当前工作器是否依然继续执行。`CanRunWhenFail` 默认值为 `false`,如果当前工作器属于收尾、记录日志、清理现场之类的逻辑,可以将其设置为 `true`。 +- 如果有多个输出内容,可以通过 `SetContext` 将额外结果写入上下文,供后续工作器继续读取,不必只依赖单一输出值。 +- 如果执行成功且没有返回值,可以直接调用 `Success()` 返回成功结果。如果执行方法里面没有真正的异步逻辑,也可以直接调用 `SuccessTask()` 获取返回值。 +- 如果执行失败,可以调用 `Fail` 或 `FailTask` 快速返回失败结果。 +- 对于继承自 `MessageWorker` 的类型,还可以使用 `Success(TOutput output)` 直接返回带输出的成功结果;如果当前拿到的是一个失败结果,也可以通过 `WorkerResult Fail(WorkerResult failResult)` 将其转换成当前输出类型的失败结果。 +- 可以用 `WorkerResult.AsFail` 方法将一个失败状态转换为另一个类型的失败状态,错误码和是否允许重试等信息会被保留。 +- 可以用 `RegisterOnDispose` 系列方法注册当整个 `MessageWorkerManager` 被 `Dispose` 时执行的逻辑,例如清理临时文件、删除文件夹或执行其他收尾工作。 + ## 相似的项目 https://github.com/danielgerlag/workflow-core \ No newline at end of file diff --git a/src/LightWorkFlowManager.Tests/LightWorkFlowManager.Tests.csproj b/src/LightWorkFlowManager.Tests/LightWorkFlowManager.Tests.csproj index 1b41f6a..19c775c 100644 --- a/src/LightWorkFlowManager.Tests/LightWorkFlowManager.Tests.csproj +++ b/src/LightWorkFlowManager.Tests/LightWorkFlowManager.Tests.csproj @@ -1,4 +1,4 @@ - + net6.0 @@ -8,8 +8,6 @@ - - diff --git a/src/LightWorkFlowManager.Tests/MessageWorkerChainTest.cs b/src/LightWorkFlowManager.Tests/MessageWorkerChainTest.cs new file mode 100644 index 0000000..35dcb09 --- /dev/null +++ b/src/LightWorkFlowManager.Tests/MessageWorkerChainTest.cs @@ -0,0 +1,121 @@ +using DC.LightWorkFlowManager; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Threading.Tasks; +using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Protocols; +using DC.LightWorkFlowManager.Workers; + +namespace LightWorkFlowManager.Tests; + +[TestClass] +public class MessageWorkerChainTest +{ + [TestMethod] + public async Task TestChainCallMessageWorker() + { + var serviceCollection = new ServiceCollection(); + serviceCollection.AddLogging(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + //serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + + ServiceProvider serviceProvider = serviceCollection.BuildServiceProvider(); + IServiceScope serviceScope = serviceProvider.CreateScope(); + + var taskId = Guid.NewGuid().ToString(); + var taskName = nameof(TestChainCallMessageWorker); + await using var messageWorkerManager = new MessageWorkerManager(taskId, taskName, serviceScope); + + WorkerResult step1Result = await messageWorkerManager + .GetWorker() + .RunAsync(new Info1()); + + WorkerResult step2Result = await messageWorkerManager + .GetWorker() + .RunAsync(step1Result); + + // Not exist worker3 + WorkerResult info4Result = step2Result.Convert((Info3 info3) => new Info4()); + + WorkerResult step3Result = await messageWorkerManager + .GetWorker() + .RunAsync(info4Result); + + WorkerResult step4Result = await messageWorkerManager + .GetWorker() + .RunAsync(step3Result); + + WorkerResult step5Result = await messageWorkerManager + .GetWorker() + .RunAsync(step4Result); + + Assert.AreEqual(messageWorkerManager.MessageWorkerStatus.Status, step5Result.ErrorCode); + + Assert.IsNotNull(messageWorkerManager.MessageWorkerStatus.FailWorker); + Assert.AreEqual(nameof(Worker5), messageWorkerManager.MessageWorkerStatus.FailWorker?.WorkerName); + } + + class Worker1 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info1 input) + { + await Task.CompletedTask; + return new Info2(); + } + } + + class Worker2 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info2 input) + { + await Task.CompletedTask; + return new Info3(); + } + } + + class Worker4 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info4 input) + { + await Task.CompletedTask; + return new Info5(); + } + } + + class Worker5 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info5 input) + { + await Task.CompletedTask; + return Fail(new WorkFlowErrorCode(123, "The error message"), canRetry: false); + } + } + + class Worker6 : MessageWorker + { + protected override async ValueTask> DoInnerAsync(Info6 input) + { + await Task.CompletedTask; + return new Info7(); + } + } + + record Info1(); + + record Info2(); + + record Info3(); + + record Info4(); + + record Info5(); + + record Info6(); + + record Info7(); +} \ No newline at end of file diff --git a/src/LightWorkFlowManager.Tests/MessageWorkerManagerTest.cs b/src/LightWorkFlowManager.Tests/MessageWorkerManagerTest.cs index e2ca92a..efe4334 100644 --- a/src/LightWorkFlowManager.Tests/MessageWorkerManagerTest.cs +++ b/src/LightWorkFlowManager.Tests/MessageWorkerManagerTest.cs @@ -12,11 +12,17 @@ namespace LightWorkFlowManager.Tests; +/// +/// `MessageWorkerManager` 相关测试。 +/// [TestClass] public class MessageWorkerManagerTest { private const int UnknownError = 7000; + /// + /// 验证工作器管理器在失败、重试与异常场景下的行为。 + /// [ContractTestCase] public void RunFail() { @@ -120,6 +126,58 @@ await Assert.ThrowsExceptionAsync(async () }); } + [TestMethod] + public async Task RunWorker_WithInputWorkerAndDirectInput_RunsWorkerWithProvidedInput() + { + var messageWorkerManager = GetTestMessageWorkerManager(); + + await messageWorkerManager.RunWorker(new DirectWorkerInput("direct-input")); + + var observedInput = messageWorkerManager.Context.GetEnsureContext(); + Assert.AreEqual("direct-input", observedInput.Value); + } + + [TestMethod] + public async Task RunWorker_WithInputWorkerAndConverter_RunsWorkerWithConvertedInput() + { + var messageWorkerManager = GetTestMessageWorkerManager(); + messageWorkerManager.SetContext(new WorkerArgument("argument")); + + await messageWorkerManager.RunWorker(argument => new ConvertedWorkerInput($"{argument.Value}-converted")); + + var observedInput = messageWorkerManager.Context.GetEnsureContext(); + Assert.AreEqual("argument-converted", observedInput.Value); + } + + [TestMethod] + public async Task RunWorker_WithInputOutputWorkerAndDirectInput_ReturnsWorkerOutput() + { + var messageWorkerManager = GetTestMessageWorkerManager(); + + var result = await messageWorkerManager.RunWorker(new DirectWorkerInput("direct-input")); + + Assert.AreEqual("output:direct-input", result.Result?.Value); + } + + [TestMethod] + public async Task RunWorker_WithInputOutputWorkerAndConverter_ReturnsConvertedWorkerOutput() + { + var messageWorkerManager = GetTestMessageWorkerManager(); + messageWorkerManager.SetContext(new WorkerArgument("argument")); + + var result = await messageWorkerManager.RunWorker(argument => new ConvertedWorkerInput($"{argument.Value}-converted")); + + Assert.AreEqual("output:argument-converted", result.Result?.Value); + } + + /// + /// 创建用于测试的工作器管理器。 + /// + /// 可选的服务提供器。 + /// 可选的任务标识。 + /// 可选的任务名称。 + /// 重试次数。 + /// 测试用工作器管理器实例。 public static MessageWorkerManager GetTestMessageWorkerManager(IServiceProvider? serviceProvider = null, string? taskId = null, string? taskName = null, int retryCount = 3) { @@ -128,19 +186,37 @@ public static MessageWorkerManager GetTestMessageWorkerManager(IServiceProvider? return messageWorkerManager; } + /// + /// 构建测试用服务提供器。 + /// + /// 测试用服务提供器实例。 public static IServiceProvider BuildServiceProvider() { var serviceCollection = new ServiceCollection(); serviceCollection.AddLogging(); serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); + serviceCollection.AddTransient(); var serviceProvider = serviceCollection.BuildServiceProvider(); return serviceProvider; } + private record DirectWorkerInput(string Value); + + private record WorkerArgument(string Value); + + private record ConvertedWorkerInput(string Value); + + private record ObservedWorkerInput(string Value); + + private record WorkerOutput(string Value); + private class RequestInputMessageWorker : MessageWorker { - protected override ValueTask DoAsync(TestInputFoo input) + protected override ValueTask DoInnerAsync(TestInputFoo input) { return ValueTask.FromResult(WorkerResult.Success()); } @@ -150,11 +226,45 @@ private class TestInputFoo { } - private class FailTestMessageWorker : MessageWorkerBase + private class CaptureProvidedInputWorker : MessageWorker + { + protected override ValueTask DoInnerAsync(DirectWorkerInput input) + { + SetContext(new ObservedWorkerInput(input.Value)); + return ValueTask.FromResult(WorkerResult.Success()); + } + } + + private class CaptureConvertedInputWorker : MessageWorker + { + protected override ValueTask DoInnerAsync(ConvertedWorkerInput input) + { + SetContext(new ObservedWorkerInput(input.Value)); + return ValueTask.FromResult(WorkerResult.Success()); + } + } + + private class ReturnProvidedOutputWorker : MessageWorker + { + protected override ValueTask> DoInnerAsync(DirectWorkerInput input) + { + return ValueTask.FromResult>(new WorkerOutput($"output:{input.Value}")); + } + } + + private class ReturnConvertedOutputWorker : MessageWorker + { + protected override ValueTask> DoInnerAsync(ConvertedWorkerInput input) + { + return ValueTask.FromResult>(new WorkerOutput($"output:{input.Value}")); + } + } + + private class FailTestMessageWorker : MessageWorker { public bool Success { get; set; } - public override ValueTask Do(IWorkerContext context) + protected override ValueTask DoInnerAsync(IWorkerContext context) { var result = Success ? WorkerResult.Success() : WorkerResult.Fail(UnknownError); Success = !Success; diff --git a/src/LightWorkFlowManager.Tests/MessageWorkerTest.cs b/src/LightWorkFlowManager.Tests/MessageWorkerTest.cs index 91df4dd..7527e6d 100644 --- a/src/LightWorkFlowManager.Tests/MessageWorkerTest.cs +++ b/src/LightWorkFlowManager.Tests/MessageWorkerTest.cs @@ -11,9 +11,15 @@ namespace LightWorkFlowManager.Tests; +/// +/// `MessageWorker` 相关测试。 +/// [TestClass] public class MessageWorkerTest { + /// + /// 验证工作器内部可以继续运行其他工作器。 + /// [ContractTestCase] public void RunWorkerOnWorker() { @@ -35,9 +41,9 @@ public void RunWorkerOnWorker() }); } - class Worker1 : MessageWorkerBase + class Worker1 : MessageWorker { - public override async ValueTask Do(IWorkerContext context) + protected override async ValueTask DoInnerAsync(IWorkerContext context) { await Manager .GetWorker() diff --git a/src/LightWorkFlowManager.Tests/ProgressCompositorTest.cs b/src/LightWorkFlowManager.Tests/ProgressCompositorTest.cs index 04ca90f..aa76072 100644 --- a/src/LightWorkFlowManager.Tests/ProgressCompositorTest.cs +++ b/src/LightWorkFlowManager.Tests/ProgressCompositorTest.cs @@ -6,9 +6,15 @@ namespace LightWorkFlowManager.Tests; +/// +/// `ProgressCompositor` 相关测试。 +/// [TestClass] public class ProgressCompositorTest { + /// + /// 验证子进度合成后的进度计算结果。 + /// [ContractTestCase] public void TestSubProgressReport() { diff --git a/src/LightWorkFlowManager.Tests/StructWorkerResultTest.cs b/src/LightWorkFlowManager.Tests/StructWorkerResultTest.cs new file mode 100644 index 0000000..b990ffa --- /dev/null +++ b/src/LightWorkFlowManager.Tests/StructWorkerResultTest.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; +using DC.LightWorkFlowManager; +using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Protocols; +using DC.LightWorkFlowManager.Workers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.VisualStudio.TestTools.UnitTesting; +using MSTest.Extensions.Contracts; + +namespace LightWorkFlowManager.Tests; + +[TestClass] +public class StructWorkerResultTest +{ + [ContractTestCase] + public void TestStructResult() + { + "当一个 Worker 的返回值是 struct 结构体类型时,返回失败时,应该被记录为失败".Test(async () => + { + var serviceCollection = new ServiceCollection(); + serviceCollection + .AddTransient() + .AddLogging(); + + await using var serviceProvider = serviceCollection.BuildServiceProvider(); + var serviceScope = serviceProvider.CreateScope(); + + await using var messageWorkerManager = new MessageWorkerManager(taskId: Guid.NewGuid().ToString(), taskName: "TestRunWorkerOnWorker", serviceScope); + + var result = await messageWorkerManager.RunWorker(); + Assert.AreEqual(true, result.IsFail); + }); + } + + public class FooWorker : MessageWorker + { + protected override ValueTask> DoInnerAsync(int input) + { + return FailTask(new WorkFlowErrorCode(-1, "Foo"), canRetry: false); + } + } +} \ No newline at end of file diff --git a/src/LightWorkFlowManager/Contexts/IWorkerContext.cs b/src/LightWorkFlowManager/Contexts/IWorkerContext.cs index 52520c8..f9ca996 100644 --- a/src/LightWorkFlowManager/Contexts/IWorkerContext.cs +++ b/src/LightWorkFlowManager/Contexts/IWorkerContext.cs @@ -11,15 +11,15 @@ public interface IWorkerContext /// /// 获取上下文信息 /// - /// - /// 如果获取不到,返回空 + /// 上下文类型。 + /// 如果获取不到则返回空。 T? GetContext(); /// /// 设置上下文信息 /// - /// - /// + /// 上下文类型。 + /// 要保存的上下文对象。 void SetContext(T context); } @@ -31,9 +31,9 @@ public static class MessageContextExtension /// /// 获取一定存在的上下文信息 /// - /// - /// - /// + /// 上下文类型。 + /// 工作器上下文。 + /// 已存在的上下文对象。 /// 如果上下文信息不存在,就抛出异常 public static T GetEnsureContext(this IWorkerContext workerContext) { diff --git a/src/LightWorkFlowManager/Contexts/WorkFlowErrorCode.cs b/src/LightWorkFlowManager/Contexts/WorkFlowErrorCode.cs index df0467b..a827cfd 100644 --- a/src/LightWorkFlowManager/Contexts/WorkFlowErrorCode.cs +++ b/src/LightWorkFlowManager/Contexts/WorkFlowErrorCode.cs @@ -12,6 +12,8 @@ namespace DC.LightWorkFlowManager.Contexts; /// /// 创建错误码 /// + /// 错误码数值。 + /// 错误码对应的可读信息。 public WorkFlowErrorCode(int code, string message) { Code = code; @@ -35,11 +37,16 @@ public WorkFlowErrorCode(int code, string message) /// public static WorkFlowErrorCode Ok => new WorkFlowErrorCode(0, "Ok"); + internal static WorkFlowErrorCode FromException(Exception e) + { + return new WorkFlowErrorCode(-1, e.ToString()); + } + /// /// 追加信息 /// - /// - /// + /// 要追加的描述信息。 + /// 追加描述后的错误码。 public WorkFlowErrorCode AppendMessage(string? appendMessage) { if (appendMessage == null) @@ -55,7 +62,8 @@ public WorkFlowErrorCode AppendMessage(string? appendMessage) /// /// 隐式转换为 int 类型 /// - /// + /// 工作流错误码。 + /// 错误码数值。 public static implicit operator int(WorkFlowErrorCode code) { return code.Code; @@ -64,7 +72,8 @@ public static implicit operator int(WorkFlowErrorCode code) /// /// 从 int 类型隐式转换为错误信息 /// - /// + /// 错误码数值。 + /// 对应的工作流错误码。 public static implicit operator WorkFlowErrorCode(int code) { if (ErrorCodeDictionary.TryGetValue(code, out var value)) @@ -107,9 +116,9 @@ public override int GetHashCode() /// /// 判断相等 /// - /// - /// - /// + /// 左侧错误码。 + /// 右侧错误码。 + /// 两个错误码是否相等。 public static bool operator ==(WorkFlowErrorCode left, WorkFlowErrorCode right) { return left.Equals(right); @@ -118,9 +127,9 @@ public override int GetHashCode() /// /// 判断不相等 /// - /// - /// - /// + /// 左侧错误码。 + /// 右侧错误码。 + /// 两个错误码是否不相等。 public static bool operator !=(WorkFlowErrorCode left, WorkFlowErrorCode right) { return !left.Equals(right); diff --git a/src/LightWorkFlowManager/Contexts/WorkerContext.cs b/src/LightWorkFlowManager/Contexts/WorkerContext.cs index e1f5a23..84f38bc 100644 --- a/src/LightWorkFlowManager/Contexts/WorkerContext.cs +++ b/src/LightWorkFlowManager/Contexts/WorkerContext.cs @@ -3,8 +3,12 @@ namespace DC.LightWorkFlowManager.Contexts; +/// +/// 提供基于类型索引的工作器上下文实现。 +/// public class WorkerContext : IWorkerContext { + /// [System.Diagnostics.DebuggerStepThrough] public T? GetContext() { @@ -16,6 +20,7 @@ public class WorkerContext : IWorkerContext return default; } + /// public void SetContext(T context) { _contextDictionary[typeof(T)] = context; diff --git a/src/LightWorkFlowManager/Exceptions/MessageWorkerException.cs b/src/LightWorkFlowManager/Exceptions/MessageWorkerException.cs index 2114fef..ff8de22 100644 --- a/src/LightWorkFlowManager/Exceptions/MessageWorkerException.cs +++ b/src/LightWorkFlowManager/Exceptions/MessageWorkerException.cs @@ -11,7 +11,7 @@ public class MessageWorkerException : WorkFlowException /// /// 工作过程的异常 /// - /// + /// 工作流错误码。 /// 默认 false 表示不能重试 public MessageWorkerException(WorkFlowErrorCode errorCode, bool canRetryWorker = false) { @@ -22,6 +22,8 @@ public MessageWorkerException(WorkFlowErrorCode errorCode, bool canRetryWorker = /// /// 工作过程的异常 /// + /// 工作流错误码。 + /// 导致当前异常的内部异常。 public MessageWorkerException(WorkFlowErrorCode errorCode, Exception innerException) : base(errorCode.Message, innerException) { ErrorCode = errorCode; @@ -32,7 +34,12 @@ public MessageWorkerException(WorkFlowErrorCode errorCode, Exception innerExcept /// 是否可以重试 /// public bool CanRetryWorker { get; } + + /// + /// 获取当前异常对应的工作流错误码。 + /// public WorkFlowErrorCode ErrorCode { get; } + /// public override string Message => ErrorCode.Message; } diff --git a/src/LightWorkFlowManager/Exceptions/MessageWorkerInputArgumentException.cs b/src/LightWorkFlowManager/Exceptions/MessageWorkerInputArgumentException.cs index c694ad6..bf628bb 100644 --- a/src/LightWorkFlowManager/Exceptions/MessageWorkerInputArgumentException.cs +++ b/src/LightWorkFlowManager/Exceptions/MessageWorkerInputArgumentException.cs @@ -7,6 +7,10 @@ namespace DC.LightWorkFlowManager.Exceptions; /// public class MessageWorkerInputArgumentException : MessageWorkerException { + /// + /// 使用指定错误码初始化输入参数异常。 + /// + /// 输入参数错误对应的工作流错误码。 public MessageWorkerInputArgumentException(WorkFlowErrorCode errorCode) : base(errorCode,canRetryWorker: false) { } diff --git a/src/LightWorkFlowManager/Exceptions/MessageWorkerInputNotFoundException.cs b/src/LightWorkFlowManager/Exceptions/MessageWorkerInputNotFoundException.cs index c8deada..f859232 100644 --- a/src/LightWorkFlowManager/Exceptions/MessageWorkerInputNotFoundException.cs +++ b/src/LightWorkFlowManager/Exceptions/MessageWorkerInputNotFoundException.cs @@ -7,6 +7,10 @@ namespace DC.LightWorkFlowManager.Exceptions; /// public class MessageWorkerInputNotFoundException : InvalidOperationException, IWorkFlowException { + /// + /// 使用指定错误消息初始化异常。 + /// + /// 异常消息。 public MessageWorkerInputNotFoundException(string? message) : base(message) { } diff --git a/src/LightWorkFlowManager/Exceptions/WorkFlowException.cs b/src/LightWorkFlowManager/Exceptions/WorkFlowException.cs index a01c6ac..26a04b4 100644 --- a/src/LightWorkFlowManager/Exceptions/WorkFlowException.cs +++ b/src/LightWorkFlowManager/Exceptions/WorkFlowException.cs @@ -8,18 +8,35 @@ namespace DC.LightWorkFlowManager.Exceptions; /// public abstract class WorkFlowException : Exception, IWorkFlowException { + /// + /// 初始化工作流异常。 + /// protected WorkFlowException() { } + /// + /// 使用序列化信息初始化工作流异常。 + /// + /// 保存序列化对象数据的对象。 + /// 有关源或目标的上下文信息。 protected WorkFlowException(SerializationInfo info, StreamingContext context) : base(info, context) { } + /// + /// 使用指定错误消息初始化工作流异常。 + /// + /// 异常消息。 protected WorkFlowException(string? message) : base(message) { } + /// + /// 使用指定错误消息和内部异常初始化工作流异常。 + /// + /// 异常消息。 + /// 导致当前异常的内部异常。 protected WorkFlowException(string? message, Exception? innerException) : base(message, innerException) { } diff --git a/src/LightWorkFlowManager/Exceptions/WorkerContextNotFoundException.cs b/src/LightWorkFlowManager/Exceptions/WorkerContextNotFoundException.cs index c228b9c..c9a2db4 100644 --- a/src/LightWorkFlowManager/Exceptions/WorkerContextNotFoundException.cs +++ b/src/LightWorkFlowManager/Exceptions/WorkerContextNotFoundException.cs @@ -1,13 +1,24 @@ namespace DC.LightWorkFlowManager.Exceptions; +/// +/// 表示无法从工作器上下文中找到指定类型数据的异常。 +/// public class WorkerContextNotFoundException : WorkFlowException { + /// + /// 使用缺失的上下文键初始化异常。 + /// + /// 缺失的上下文键。 public WorkerContextNotFoundException(string key) { Key = key; } + /// + /// 获取缺失的上下文键。 + /// public string Key { get; } + /// public override string Message => $"Can not find {Key}"; } diff --git a/src/LightWorkFlowManager/LightWorkFlowManager.csproj b/src/LightWorkFlowManager/LightWorkFlowManager.csproj index b40dafb..a42cf15 100644 --- a/src/LightWorkFlowManager/LightWorkFlowManager.csproj +++ b/src/LightWorkFlowManager/LightWorkFlowManager.csproj @@ -1,7 +1,7 @@ - net6.0 + net6.0;net8.0;net9.0;net10.0 true enable @@ -11,6 +11,10 @@ DC.LightWorkFlowManager + + + + diff --git a/src/LightWorkFlowManager/MessageWorkerManager.cs b/src/LightWorkFlowManager/MessageWorkerManager.cs index bb21760..302e8aa 100644 --- a/src/LightWorkFlowManager/MessageWorkerManager.cs +++ b/src/LightWorkFlowManager/MessageWorkerManager.cs @@ -4,11 +4,13 @@ using System.Linq; using System.Runtime.ExceptionServices; using System.Threading.Tasks; + using DC.LightWorkFlowManager.Contexts; using DC.LightWorkFlowManager.Exceptions; using DC.LightWorkFlowManager.Monitors; using DC.LightWorkFlowManager.Protocols; using DC.LightWorkFlowManager.Workers; + using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -22,12 +24,12 @@ public class MessageWorkerManager : IAsyncDisposable /// /// 创建工作器管理 /// - /// + /// 任务标识。 /// 任务名,任务类型,如 PDF 解析或 PPT 解析等 - /// + /// 用于解析工作器与依赖项的服务作用域。 /// 每个工作器的失败重试次数,默认三次 /// 参数上下文信息 - /// + /// 工作器运行监控器。 public MessageWorkerManager(string taskId, string taskName, IServiceScope serviceScope, int retryCount = 3, IWorkerContext? context = null, IWorkerRunMonitor? workerRunMonitor = null) { @@ -96,12 +98,20 @@ public MessageWorkerManager(string taskId, string taskName, IServiceScope servic /// public ILogger Logger { get; protected set; } + /// + /// 是否应该吞掉异常。设置为 true 时,将在执行工作器过程中,工作器所有抛出的异常都不能对外抛出,无法通过异常打断外部流程,框架会当做工作器执行失败来处理。设置为 false 时,将允许工作器抛出异常,外部流程可以通过捕获异常来打断流程。默认值为 false。 + /// + /// + /// 默认为 false 表示不吞掉工作器执行过程的异常,保留原本的 dotnet 异常流程设计,仅对异常做记录 + /// + public bool ShouldSwallowException { get; init; } = false; + /// /// 设置上下文信息。设计上要求一个类型对应一个参数,不允许相同的类型作为不同的参数 /// - /// - /// - /// + /// 上下文类型。 + /// 要保存的上下文对象。 + /// 当前工作器管理器。 public MessageWorkerManager SetContext(T context) { Context.SetContext(context); @@ -113,9 +123,9 @@ public MessageWorkerManager SetContext(T context) /// /// 现有的参数类型 /// 转换后的参数类型 - /// + /// 参数转换委托。 /// 如果前置步骤失败,即 为 IsFail 时,将不执行委托内容 - /// + /// 当前工作器管理器。 public MessageWorkerManager SetContext(Func worker) { if (MessageWorkerStatus.IsFail) @@ -133,8 +143,8 @@ public MessageWorkerManager SetContext(Func wo /// /// 获取工作器,获取到的工作器将会被注入信息 /// - /// - /// + /// 工作器类型。 + /// 已注入管理器信息的工作器实例。 public T GetWorker() where T : IMessageWorker { var messageWorker = ServiceProvider.GetRequiredService(); @@ -145,12 +155,12 @@ public T GetWorker() where T : IMessageWorker /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// - /// - /// + /// 输入参数类型。 + /// 输出参数类型。 + /// 同步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(messageTask, workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -160,12 +170,12 @@ public ValueTask RunWorker(Func /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// - /// - /// + /// 输入参数类型。 + /// 输出参数类型。 + /// 异步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Func> messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(messageTask, workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -175,11 +185,11 @@ public ValueTask RunWorker(Func /// 执行委托工作器,执行的内容为 参数的内容 /// - /// - /// + /// 输入参数类型。 + /// 异步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(messageTask, workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -189,10 +199,10 @@ public ValueTask RunWorker(Func message /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// + /// 异步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(_ => messageTask(), workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -202,11 +212,11 @@ public ValueTask RunWorker(Func messageTask, string? wo /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// - /// + /// 输入参数类型。 + /// 同步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Action messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(input => @@ -220,10 +230,10 @@ public ValueTask RunWorker(Action messageTask, str /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// + /// 同步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Action messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(messageTask, workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -233,10 +243,10 @@ public ValueTask RunWorker(Action messageTask, str /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// + /// 同步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Action messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(_ => messageTask(), workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -246,10 +256,10 @@ public ValueTask RunWorker(Action messageTask, string? workerName /// /// 执行委托工作器,执行的内容为 参数的内容 /// - /// + /// 异步执行委托。 /// 此委托代表的工作器名,用于调试和埋点上报 /// 是否在当前 前置步骤已失败时,依然可以执行。默认为 false 表示在前置步骤失败时,不执行 - /// + /// 工作器执行结果。 public ValueTask RunWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { var worker = new DelegateMessageWorker(messageTask, workerName ?? messageTask.Method.DeclaringType?.FullName, canRunWhenFail); @@ -259,8 +269,8 @@ public ValueTask RunWorker(Func message /// /// 执行工作器 /// - /// - /// + /// 工作器类型。 + /// 工作器执行结果。 public ValueTask RunWorker() where TWorker : IMessageWorker { var worker = ServiceProvider.GetRequiredService(); @@ -270,8 +280,8 @@ public ValueTask RunWorker() where TWorker : IMessageWork /// /// 执行工作器 /// - /// - /// + /// 要执行的工作器。 + /// 工作器执行结果。 public virtual async ValueTask RunWorker(IMessageWorker worker) { SetManager(worker as IMessageWorkerManagerSensitive); @@ -301,8 +311,17 @@ public virtual async ValueTask RunWorker(IMessageWorker worker) OnWorkerRunException(worker, e); MessageWorkerStatus.LastException = e; - // 继续对外抛出 - throw; + if (ShouldSwallowException) + { + // 不再对外抛出异常 + Debug.Assert(MessageWorkerStatus.IsFail); + return WorkerResult.Fail(MessageWorkerStatus.Status, canRetry: false); + } + else + { + // 继续对外抛出 + throw; + } } // 运行工作任务的核心入口 @@ -397,7 +416,7 @@ async ValueTask RunWithRetry() /// protected virtual ValueTask RunWorkerCore(IMessageWorker worker) // 可以在这里打断点,调试被执行的逻辑 - => worker.Do(Context); + => worker.DoAsync(Context); /// /// 在当前是失败的状态下,跳过工作器的执行 @@ -447,7 +466,7 @@ protected virtual void OnWorkerRunException(IMessageWorker worker, Exception e) //} else { - RecordWorkerError(worker, new WorkFlowErrorCode(-1, e.ToString())); + RecordWorkerError(worker, WorkFlowErrorCode.FromException(e)); } } @@ -508,6 +527,10 @@ internal WorkerResult GetFailResult() canRetry: false); } + /// + /// 异步释放当前管理器持有的工作器和服务作用域。 + /// + /// 表示异步释放操作的任务。 public async ValueTask DisposeAsync() { while (_workerStack.TryPop(out var worker)) @@ -541,4 +564,81 @@ public override string ToString() return $"[{name}] {status} WorkerList:{string.Join('-', _workerStack.Reverse().Select(worker => worker.WorkerName))}"; } + + #region RunWorker + + // 双向和单向的参数不好写,参见单元测试里面的用法: + // await messageWorkerManager.RunWorker(new DirectWorkerInput("direct-input")); + // 可以看到泛型参数数量太多,也不好一开始就写出来。完全不如 GetWorker 后 RunAsync 的写法: + // var downloadResult = await messageWorkerManager + // .GetWorker() + // .RunAsync(new FileDownloadInfo() + // { + // DownloadUrl = request.DocumentDownloadFileUrl, + // FileName = request.FileName, + // }); + // 因此决定将以下几个方法都标记对外不公开,这样就不会让上层开发者感觉重载方法太多而不知道该怎么用 + + /// + /// 使用指定输入参数解析并执行单输入工作器。 + /// + /// 工作器类型。 + /// 输入参数类型。 + /// 本次执行所需的输入参数。 + /// 工作器执行结果。 + internal ValueTask RunWorker(TInput input) where TWorker : MessageWorker + { + var worker = GetWorker(); + + return worker.RunAsync(input); + } + + /// + /// 从当前上下文读取参数,经转换后解析并执行单输入工作器。 + /// + /// 工作器类型。 + /// 当前上下文中已有的参数类型。 + /// 工作器输入参数类型。 + /// 将上下文参数转换为工作器输入的委托。 + /// 工作器执行结果。 + internal ValueTask RunWorker(Func converter) + where TWorker : MessageWorker + { + var worker = GetWorker(); + + return worker.RunAsync(converter); + } + + /// + /// 使用指定输入参数解析并执行带输出的工作器。 + /// + /// 工作器类型。 + /// 工作器输入参数类型。 + /// 工作器输出参数类型。 + /// 本次执行所需的输入参数。 + /// 包含输出参数的工作器执行结果。 + internal ValueTask> RunWorker(TInput input) where TWorker : MessageWorker + { + var worker = GetWorker(); + + return worker.RunAsync(input); + } + + /// + /// 从当前上下文读取参数,经转换后解析并执行带输出的工作器。 + /// + /// 工作器类型。 + /// 当前上下文中已有的参数类型。 + /// 工作器输入参数类型。 + /// 工作器输出参数类型。 + /// 将上下文参数转换为工作器输入的委托。 + /// 包含输出参数的工作器执行结果。 + internal ValueTask> RunWorker(Func converter) where TWorker : MessageWorker + { + var worker = GetWorker(); + + return worker.RunAsync(converter); + } + + #endregion } diff --git a/src/LightWorkFlowManager/Monitors/IWorkerRunMonitor.cs b/src/LightWorkFlowManager/Monitors/IWorkerRunMonitor.cs index 6d2f2b2..79a40d0 100644 --- a/src/LightWorkFlowManager/Monitors/IWorkerRunMonitor.cs +++ b/src/LightWorkFlowManager/Monitors/IWorkerRunMonitor.cs @@ -4,9 +4,28 @@ namespace DC.LightWorkFlowManager.Monitors; +/// +/// 提供工作器执行过程的监控回调。 +/// public interface IWorkerRunMonitor { + /// + /// 在工作器开始执行时调用。 + /// + /// 即将执行的工作器。 void OnWorkerStart(IMessageWorker worker); + + /// + /// 在工作器执行完成时调用。 + /// + /// 已执行完成的工作器。 + /// 工作器执行结果。 void OnWorkerFinish(IMessageWorker worker, WorkerResult result); + + /// + /// 在工作器执行出现异常时调用。 + /// + /// 发生异常的工作器。 + /// 执行过程中抛出的异常。 void OnWorkerException(IMessageWorker worker, Exception exception); } diff --git a/src/LightWorkFlowManager/Monitors/Progress/ProgressCompositor.cs b/src/LightWorkFlowManager/Monitors/Progress/ProgressCompositor.cs index bb218ee..beef029 100644 --- a/src/LightWorkFlowManager/Monitors/Progress/ProgressCompositor.cs +++ b/src/LightWorkFlowManager/Monitors/Progress/ProgressCompositor.cs @@ -6,7 +6,7 @@ namespace DC.LightWorkFlowManager.Monitors; /// /// 进度合成器,允许包含多个子进度 /// -/// +/// 随进度一起上报的数据类型。 /// 规则: /// - 可以注册多个子进度,每个子进度都有自己的权值 /// - 子进度的进度贡献到上级进度时,将叠加上子进度自己的权值。如占比一半权值的子进度,就最多只贡献一半的进度 @@ -16,7 +16,7 @@ public class ProgressCompositor /// /// 创建进度合成器 /// - /// + /// 进度名。 public ProgressCompositor(string name) { Name = name; @@ -88,6 +88,11 @@ public IReadOnlyList> RegisterSubProgressCompositors(IRead return subProgressList; } + /// + /// 注册单个子进度合成器。 + /// + /// 子进度注册信息。 + /// 创建出的子进度合成器。 public ProgressCompositor RegisterSubProgressCompositor(SubProgressCompositorInfo subProgressCompositor) { var progressCompositor = new ProgressCompositor(subProgressCompositor.Name); @@ -109,8 +114,8 @@ private void SubProgressCompositor_Reported(object? sender, ProgressReportedEven /// /// 上报进度 /// - /// - /// + /// 当前进度值。 + /// 随进度上报的数据。 public void Report(ProgressPercentage percentage, T? value = default) { _selfProgressPercentage = percentage; @@ -127,8 +132,8 @@ public void Report(ProgressPercentage percentage, T? value = default) /// /// 上报增量进度,将叠加上当前的进度 /// - /// - /// + /// 要增加的进度值。 + /// 随进度上报的数据。 public void ReportIncreased(ProgressPercentage percentage, T? value = default) { var currentPercentageValue = _selfProgressPercentage.Value + percentage.Value; diff --git a/src/LightWorkFlowManager/Monitors/Progress/ProgressPercentage.cs b/src/LightWorkFlowManager/Monitors/Progress/ProgressPercentage.cs index 343e896..1aa4ff5 100644 --- a/src/LightWorkFlowManager/Monitors/Progress/ProgressPercentage.cs +++ b/src/LightWorkFlowManager/Monitors/Progress/ProgressPercentage.cs @@ -10,7 +10,7 @@ public readonly record struct ProgressPercentage /// /// 进度百分比 0-1 范围 /// - /// + /// 进度值,范围在 0 到 1 之间。 public ProgressPercentage(double value) { Value = value; @@ -22,9 +22,18 @@ public ProgressPercentage(double value) } } + /// + /// 获取当前进度值。 + /// public double Value { get; init; } + /// + /// 获取最小进度值。 + /// public static ProgressPercentage MinValue => new ProgressPercentage(0); + /// + /// 获取最大进度值。 + /// public static ProgressPercentage MaxValue => new ProgressPercentage(1); } \ No newline at end of file diff --git a/src/LightWorkFlowManager/Monitors/Progress/ProgressReportedEventArgument.cs b/src/LightWorkFlowManager/Monitors/Progress/ProgressReportedEventArgument.cs index 2864f41..1354d7b 100644 --- a/src/LightWorkFlowManager/Monitors/Progress/ProgressReportedEventArgument.cs +++ b/src/LightWorkFlowManager/Monitors/Progress/ProgressReportedEventArgument.cs @@ -1,3 +1,29 @@ namespace DC.LightWorkFlowManager.Monitors; -public readonly record struct ProgressReportedEventArgument(ProgressPercentage ProgressPercentage, T? Value); \ No newline at end of file +/// +/// 表示进度上报事件参数。 +/// +/// 随进度一同上报的数据类型。 +public readonly record struct ProgressReportedEventArgument +{ + /// + /// 初始化进度上报事件参数。 + /// + /// 当前进度值。 + /// 随进度上报的数据。 + public ProgressReportedEventArgument(ProgressPercentage progressPercentage, T? value) + { + ProgressPercentage = progressPercentage; + Value = value; + } + + /// + /// 获取当前进度值。 + /// + public ProgressPercentage ProgressPercentage { get; init; } + + /// + /// 获取随进度上报的数据。 + /// + public T? Value { get; init; } +} diff --git a/src/LightWorkFlowManager/Monitors/Progress/SubProgressCompositorInfo.cs b/src/LightWorkFlowManager/Monitors/Progress/SubProgressCompositorInfo.cs index 03b177c..fd02f7d 100644 --- a/src/LightWorkFlowManager/Monitors/Progress/SubProgressCompositorInfo.cs +++ b/src/LightWorkFlowManager/Monitors/Progress/SubProgressCompositorInfo.cs @@ -1,3 +1,28 @@ namespace DC.LightWorkFlowManager.Monitors; -public readonly record struct SubProgressCompositorInfo(string Name, double Weight); \ No newline at end of file +/// +/// 表示子进度合成器的注册信息。 +/// +public readonly record struct SubProgressCompositorInfo +{ + /// + /// 初始化子进度合成器信息。 + /// + /// 子进度名称。 + /// 子进度权重。 + public SubProgressCompositorInfo(string name, double weight) + { + Name = name; + Weight = weight; + } + + /// + /// 获取子进度名称。 + /// + public string Name { get; init; } + + /// + /// 获取子进度权重。 + /// + public double Weight { get; init; } +} diff --git a/src/LightWorkFlowManager/Protocols/MessageWorkerStatus.cs b/src/LightWorkFlowManager/Protocols/MessageWorkerStatus.cs index 9ed4063..550e3ae 100644 --- a/src/LightWorkFlowManager/Protocols/MessageWorkerStatus.cs +++ b/src/LightWorkFlowManager/Protocols/MessageWorkerStatus.cs @@ -4,12 +4,24 @@ namespace DC.LightWorkFlowManager.Protocols; +/// +/// 表示工作流管理器的当前执行状态。 +/// public class MessageWorkerStatus { + /// + /// 获取当前状态是否为失败。 + /// public bool IsFail => Status != WorkFlowErrorCode.Ok; + /// + /// 获取当前记录的状态码。 + /// public WorkFlowErrorCode Status { get; private set; } = WorkFlowErrorCode.Ok; + /// + /// 获取或设置最后一次异常。 + /// public Exception? LastException { get; set; } /// @@ -17,8 +29,18 @@ public class MessageWorkerStatus /// public IMessageWorker? FailWorker { get; private set; } + /// + /// 设置当前状态码。 + /// + /// 要设置的状态码。 public void SetErrorCode(WorkFlowErrorCode errorCode) => Status = errorCode; + /// + /// 在当前尚未失败时尝试记录失败状态。 + /// + /// 失败状态码。 + /// 触发失败的工作器。 + /// 如果成功记录失败状态则返回 ;否则返回 public bool TrySetErrorCode(WorkFlowErrorCode errorCode, IMessageWorker failWorker) { if (IsFail) @@ -32,6 +54,7 @@ public bool TrySetErrorCode(WorkFlowErrorCode errorCode, IMessageWorker failWork return true; } + /// public override string ToString() { if (IsFail) diff --git a/src/LightWorkFlowManager/Protocols/WorkerResult.cs b/src/LightWorkFlowManager/Protocols/WorkerResult.cs index 389f85e..1b3a2f1 100644 --- a/src/LightWorkFlowManager/Protocols/WorkerResult.cs +++ b/src/LightWorkFlowManager/Protocols/WorkerResult.cs @@ -1,11 +1,22 @@ using System; +using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Workers; namespace DC.LightWorkFlowManager.Protocols; +/// +/// 表示工作器执行结果。 +/// public class WorkerResult { + /// + /// 创建工作器执行结果。 + /// + /// 是否执行成功。 + /// 执行失败时的错误码。 + /// 执行失败后是否允许重试。 public WorkerResult(bool isSuccess, WorkFlowErrorCode errorCode, bool canRetry) { IsSuccess = isSuccess; @@ -18,10 +29,19 @@ public WorkerResult(bool isSuccess, WorkFlowErrorCode errorCode, bool canRetry) } } + /// + /// 获取当前执行是否成功。 + /// public virtual bool IsSuccess { get; } + /// + /// 获取当前执行是否失败。 + /// public bool IsFail => !IsSuccess; + /// + /// 获取执行失败时的错误码。 + /// public WorkFlowErrorCode ErrorCode { get; } /// @@ -29,9 +49,31 @@ public WorkerResult(bool isSuccess, WorkFlowErrorCode errorCode, bool canRetry) /// public bool CanRetry { get; } + /// + /// 创建一个成功结果。 + /// + /// 表示成功的执行结果。 public static WorkerResult Success() => new WorkerResult(true, WorkFlowErrorCode.Ok, false); + + /// + /// 创建一个失败结果。 + /// + /// 失败时的错误码。 + /// 失败后是否允许重试。 + /// 表示失败的执行结果。 public static WorkerResult Fail(WorkFlowErrorCode errorCode, bool canRetry=true) => new WorkerResult(false, errorCode, canRetry); + public WorkerResult AsFail() + { + if (IsSuccess) + { + throw new InvalidOperationException($"仅当 Result 为失败时,才能作为另一个失败的结果"); + } + + return new WorkerResult(ErrorCode, CanRetry); + } + + /// public override string ToString() { if (IsSuccess) @@ -45,29 +87,94 @@ public override string ToString() } } +/// +/// 表示带返回值的工作器执行结果。 +/// +/// 结果值类型。 public class WorkerResult : WorkerResult { + /// + /// 创建一个成功的执行结果。 + /// + /// 工作器输出结果。 public WorkerResult(T result) : base(isSuccess: true, WorkFlowErrorCode.Ok, canRetry: false) { Result = result; } + /// + /// 创建一个失败的执行结果。 + /// + /// 失败时的错误码。 + /// 失败后是否允许重试。 public WorkerResult(WorkFlowErrorCode errorCode, bool canRetry) : base(isSuccess: false, errorCode, canRetry) { Result = default; } + /// + /// 获取当前执行是否成功。 + /// [MemberNotNullWhen(true, nameof(Result))] - public override bool IsSuccess => Result != null; + public override bool IsSuccess => base.IsSuccess && Result != null; + + /// + /// 获取工作器输出结果。 + /// public T? Result { get; } + // 为什么不做链调用呢? 这是因为代码实际写起来不好看: + // WorkerResult r1 = xx; + // var r2 = r1.RunWorker(); + // 这时可以看到在 RunWorker 里面必须加上 TOutput 参数,导致实际不好写,所以就不做链调用了 + // 决定在带 Input 的 MessageWorker 的 RunAsync 方法提供 WorkerResult 作为入参,这样的结果可能更好 +#if false + private WorkerResult RunWorker() + where TWorker : MessageWorker + { + throw null; + } +#endif + + /// + /// 将此结果转换为另一个结果 + /// + /// + /// 无需提前判断结果是否为成功,可自动当成功时才执行转换。对 IsFail 的结果调用转换,结果也是返回一个失败的结果,且错误码和是否可重试与原结果一致 + /// + /// + /// 转换器,当结果为成功时才会被调用 + /// + public WorkerResult Convert(Func converter) + { + if (IsSuccess) + { + return converter(Result); + } + else + { + return AsFail(); + } + } + + /// + /// 将输出结果隐式转换为成功的执行结果。 + /// + /// 输出结果。 + /// 包装后的执行结果。 public static implicit operator WorkerResult(T workerResult) { return new WorkerResult(workerResult); } + /// + /// 从执行结果中隐式获取输出结果。 + /// + /// 执行结果。 + /// 输出结果。 public static implicit operator T?(WorkerResult workerResult) => workerResult.Result; + /// public override string ToString() { if (IsSuccess) diff --git a/src/LightWorkFlowManager/Workers/DelegateMessageWorker.cs b/src/LightWorkFlowManager/Workers/DelegateMessageWorker.cs index 46e9aab..c588aab 100644 --- a/src/LightWorkFlowManager/Workers/DelegateMessageWorker.cs +++ b/src/LightWorkFlowManager/Workers/DelegateMessageWorker.cs @@ -5,8 +5,18 @@ namespace DC.LightWorkFlowManager.Workers; +/// +/// 表示基于委托实现的单输入工作器。 +/// +/// 输入参数类型。 public class DelegateMessageWorker : MessageWorker { + /// + /// 使用异步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { _messageTask = messageTask; @@ -15,11 +25,12 @@ public DelegateMessageWorker(Func messageTask, string? worker CanRunWhenFail = canRunWhenFail; } + /// public override string WorkerName => _workerName ?? base.WorkerName; private readonly string? _workerName; - protected override async ValueTask DoAsync(TInput input) + protected override async ValueTask DoInnerAsync(TInput input) { await _messageTask(input); return WorkerResult.Success(); @@ -28,8 +39,19 @@ protected override async ValueTask DoAsync(TInput input) private readonly Func _messageTask; } +/// +/// 表示基于委托实现的输入输出工作器。 +/// +/// 输入参数类型。 +/// 输出参数类型。 public class DelegateMessageWorker : MessageWorker { + /// + /// 使用同步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Func messageTask, string? workerName = null, bool canRunWhenFail = false) { _workerName = workerName; @@ -42,6 +64,12 @@ public DelegateMessageWorker(Func messageTask, string? workerNa CanRunWhenFail = canRunWhenFail; } + /// + /// 使用异步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Func> messageTask, string? workerName = null, bool canRunWhenFail = false) { _messageTask = messageTask; @@ -57,13 +85,23 @@ protected override async ValueTask> DoInnerAsync(TInput in private readonly Func> _messageTask; + /// public override string WorkerName => _workerName ?? base.WorkerName; private readonly string? _workerName; } -public class DelegateMessageWorker : MessageWorkerBase +/// +/// 表示基于委托实现的无固定输入工作器。 +/// +public class DelegateMessageWorker : MessageWorker { + /// + /// 使用同步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Action messageAction, string? workerName = null, bool canRunWhenFail=false) { _messageTask = c => @@ -76,6 +114,12 @@ public DelegateMessageWorker(Action messageAction, string? worke CanRunWhenFail = canRunWhenFail; } + /// + /// 使用异步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Func messageTask, string? workerName = null, bool canRunWhenFail=false) { _messageTask = async c => @@ -88,6 +132,12 @@ public DelegateMessageWorker(Func messageTask, string CanRunWhenFail = canRunWhenFail; } + /// + /// 使用返回执行结果的异步委托初始化工作器。 + /// + /// 实际执行逻辑。 + /// 工作器名称。 + /// 当前置步骤失败时是否仍可运行。 public DelegateMessageWorker(Func> messageTask, string? workerName = null, bool canRunWhenFail = false) { _messageTask = messageTask; @@ -96,13 +146,15 @@ public DelegateMessageWorker(Func> messa CanRunWhenFail = canRunWhenFail; } + /// public override string WorkerName => _workerName ?? base.WorkerName; private readonly string? _workerName; private readonly Func> _messageTask; - public override ValueTask Do(IWorkerContext context) + /// + protected override ValueTask DoInnerAsync(IWorkerContext context) { return _messageTask(context); } diff --git a/src/LightWorkFlowManager/Workers/IMessageWorker.cs b/src/LightWorkFlowManager/Workers/IMessageWorker.cs index ef2755b..8c54386 100644 --- a/src/LightWorkFlowManager/Workers/IMessageWorker.cs +++ b/src/LightWorkFlowManager/Workers/IMessageWorker.cs @@ -10,6 +10,9 @@ namespace DC.LightWorkFlowManager.Workers; /// public interface IMessageWorker { + /// + /// 获取或设置当前任务标识。 + /// string TaskId { get; set; } /// @@ -36,10 +39,19 @@ public interface IMessageWorker /// /// 遇到失败时,是否能执行 /// - /// bool CanRunWhenFail { get; } - ValueTask Do(IWorkerContext context); + /// + /// 执行当前工作器。 + /// + /// 工作器上下文。 + /// 工作器执行结果。 + ValueTask DoAsync(IWorkerContext context); + /// + /// 在工作器被释放时执行清理逻辑。 + /// + /// 工作器上下文。 + /// 表示异步清理操作的任务。 ValueTask OnDisposeAsync(IWorkerContext context); } diff --git a/src/LightWorkFlowManager/Workers/MessageWorker.cs b/src/LightWorkFlowManager/Workers/MessageWorker.cs index d757911..1a2c334 100644 --- a/src/LightWorkFlowManager/Workers/MessageWorker.cs +++ b/src/LightWorkFlowManager/Workers/MessageWorker.cs @@ -1,97 +1,216 @@ -using System; -using System.Threading.Tasks; -using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Contexts; using DC.LightWorkFlowManager.Exceptions; using DC.LightWorkFlowManager.Protocols; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +using System; +using System.Threading.Tasks; + namespace DC.LightWorkFlowManager.Workers; -public abstract class MessageWorker : MessageWorkerBase +/// +/// 工作器基类 +/// +public abstract class MessageWorker : IMessageWorker, IMessageWorkerManagerSensitive { - public sealed override ValueTask Do(IWorkerContext context) - { - var input = context.GetContext(); + /// + public string TaskId { get; set; } = null!; - if (input == null) - { - throw new MessageWorkerInputNotFoundException($"Do not find {typeof(TInput)} in {WorkerName} worker. 无法在{WorkerName}找到{typeof(TInput)}输入,请确保前置步骤完成输出或初始化进行输入"); - } + /// + /// 获取当前工作器名称。 + /// + public virtual string WorkerName => GetType().Name; - return DoAsync(input); - } + /// + /// 设置或获取是否可以重试 + /// + public virtual bool CanRetry { protected set; get; } = true; - protected abstract ValueTask DoAsync(TInput input); -} + /// + /// 获取重试前的等待时间。 + /// + public virtual TimeSpan RetryDelayTime => TimeSpan.FromSeconds(1); -public abstract class MessageWorker : MessageWorker -{ - protected sealed override async ValueTask DoAsync(TInput input) + /// + /// 设置当前的 是否在 处于失败状态时能否运行 + /// + public bool CanRunWhenFail { get; protected set; } + // 默认遇到错误不能运行 + = false; + + /// + ValueTask IMessageWorker.DoAsync(IWorkerContext context) { - WorkerResult output = await DoInnerAsync(input); + return DoInnerAsync(context); + } + + protected abstract ValueTask DoInnerAsync(IWorkerContext context); + + protected MessageWorkerStatus Status => GetEnsureContext(); + protected IWorkerContext CurrentContext => Manager.Context; + protected IServiceProvider ServiceProvider => Manager.ServiceProvider; + + /// + /// 获取当前工作器使用的日志器。 + /// + public ILogger Logger { get; private set; } + // 框架注入 + = null!; + protected MessageWorkerManager Manager { get; private set; } + // 框架注入 + = null!; - if (output.IsSuccess) + /// + /// 从 获取服务,如果获取不到,则从 获取,获取到后设置到上下文 + /// + /// + /// + protected T GetScopeWithContext() where T:notnull + { + var context = GetContext(); + if (context is null) { - CurrentContext.SetContext(output.Result); + context = ServiceProvider.GetRequiredService(); + SetContext(context); } - return output; + return context; } - public ValueTask> RunAsync(Func converter) + protected T GetEnsureContext() { - var argument = CurrentContext.GetEnsureContext(); - var input = converter(argument); - return RunAsync(input); + ThrowNotManager(); + return CurrentContext.GetEnsureContext(); } - public ValueTask> RunAsync(TInput input) + protected T? GetContext() { ThrowNotManager(); + return CurrentContext.GetContext(); + } - SetContext(input); - return RunAsync(); + protected void SetContext(T context) + { + ThrowNotManager(); + CurrentContext.SetContext(context); } - public new async ValueTask> RunAsync() + /// + /// 获取输入的上下文 + /// + /// + /// + /// + /// 仅框架内容,给带 Input 的调用 + /// + /// + private protected T GetInputContext() { ThrowNotManager(); + var input = GetContext(); - var manager = Manager; - await manager.RunWorker(this); - if (Status.IsFail) + if (input == null) { - return manager.GetFailResult(); + throw new MessageWorkerInputNotFoundException($"Do not find {typeof(T)} in {WorkerName} worker. 无法在{WorkerName}找到{typeof(T)}输入,请确保前置步骤完成输出或初始化进行输入"); } - else + + return input; + } + + /// + /// 通过当前管理器执行当前工作器。 + /// + /// 工作器执行结果。 + public async ValueTask RunAsync() + { + ThrowNotManager(); + + var result = await Manager.RunWorker(this); + return result; + } + + /// + public async ValueTask OnDisposeAsync(IWorkerContext context) + { + if (_onDispose != null) { - return GetEnsureContext(); + await _onDispose.Invoke(context); } + await OnDisposeInnerAsync(context); } - protected abstract ValueTask> DoInnerAsync(TInput input); + protected virtual ValueTask OnDisposeInnerAsync(IWorkerContext context) + { + return ValueTask.CompletedTask; + } /// - /// 返回且标记失败 + /// 注册释放的执行内容 /// - /// - /// 是否需要重试 - /// - protected WorkerResult Fail(WorkFlowErrorCode errorCode, bool retry = true) + /// + protected void RegisterOnDispose(Action action) + { + _onDispose += _ => + { + action(); + return ValueTask.CompletedTask; + }; + } + + /// + /// 注册释放的执行内容 + /// + /// + protected void RegisterOnDispose(Action action) + { + _onDispose += context => + { + action(context); + return ValueTask.CompletedTask; + }; + } + + protected void RegisterOnDispose(Func onDispose) => _onDispose += onDispose; + + private Func? _onDispose; + + void IMessageWorkerManagerSensitive.SetMessageWorkerManager(MessageWorkerManager manager) + { + if (ReferenceEquals(Manager, manager)) + { + return; + } + + Manager = manager; + + Logger = ServiceProvider.GetRequiredService>(); + } + + protected void ThrowNotManager() + { + if (Manager == null) + { + throw new InvalidOperationException($"MessageWorkerManager is null. 没有注入 MessageWorkerManager 对象,请确保 {GetType().FullName} 在 MessageWorkerManager 里运行。如调用 {nameof(MessageWorkerManager.GetWorker)} 或 {nameof(MessageWorkerManager.RunWorker)} 执行"); + } + } + + protected WorkerResult Fail(WorkFlowErrorCode errorCode, bool canRetry = true) { - return new WorkerResult(errorCode, retry); + return WorkerResult.Fail(errorCode, canRetry); } - protected ValueTask> FailTask(WorkFlowErrorCode errorCode, bool retry = true) - => ValueTask.FromResult(Fail(errorCode, retry)); + protected ValueTask FailTask(WorkFlowErrorCode errorCode, bool canRetry = true) + => ValueTask.FromResult(Fail(errorCode, canRetry)); - protected WorkerResult Success(TOutput output) + protected WorkerResult Success() { - return new WorkerResult(output); + return WorkerResult.Success(); } - protected ValueTask> SuccessTask(TOutput output) + protected ValueTask SuccessTask() { - var result = Success(output); - return ValueTask.FromResult(result); + return ValueTask.FromResult(Success()); } } diff --git a/src/LightWorkFlowManager/Workers/MessageWorkerBase.cs b/src/LightWorkFlowManager/Workers/MessageWorkerBase.cs deleted file mode 100644 index 1223f8c..0000000 --- a/src/LightWorkFlowManager/Workers/MessageWorkerBase.cs +++ /dev/null @@ -1,131 +0,0 @@ -using System; -using System.Threading.Tasks; -using DC.LightWorkFlowManager.Contexts; -using DC.LightWorkFlowManager.Protocols; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -namespace DC.LightWorkFlowManager.Workers; - -/// -/// 工作器基类 -/// -public abstract class MessageWorkerBase : IMessageWorker, IMessageWorkerManagerSensitive -{ - public string TaskId { get; set; } = null!; - public virtual string WorkerName => GetType().Name; - - /// - /// 设置或获取是否可以重试 - /// - public virtual bool CanRetry { protected set; get; } = true; - - public virtual TimeSpan RetryDelayTime => TimeSpan.FromSeconds(1); - - /// - /// 设置当前的 是否在 处于失败状态时能否运行 - /// - public bool CanRunWhenFail { get; protected set; } - // 默认遇到错误不能运行 - = false; - - public abstract ValueTask Do(IWorkerContext context); - - protected MessageWorkerStatus Status => GetEnsureContext(); - protected IWorkerContext CurrentContext => Manager.Context; - protected IServiceProvider ServiceProvider => Manager.ServiceProvider; - public ILogger Logger { get; private set; } - // 框架注入 - = null!; - protected MessageWorkerManager Manager { get; private set; } - // 框架注入 - = null!; - - /// - /// 从 获取服务,如果获取不到,则从 获取,获取到后设置到上下文 - /// - /// - /// - protected T GetScopeWithContext() where T:notnull - { - var context = GetContext(); - if (context is null) - { - context = ServiceProvider.GetRequiredService(); - SetContext(context); - } - - return context; - } - - protected T GetEnsureContext() => CurrentContext.GetEnsureContext(); - protected T? GetContext() => CurrentContext.GetContext(); - protected void SetContext(T context) => CurrentContext.SetContext(context); - - public async ValueTask RunAsync() - { - ThrowNotManager(); - - var result = await Manager.RunWorker(this); - return result; - } - - public async ValueTask OnDisposeAsync(IWorkerContext context) - { - if (_onDispose != null) - { - await _onDispose.Invoke(context); - } - await OnDisposeInnerAsync(context); - } - - protected virtual ValueTask OnDisposeInnerAsync(IWorkerContext context) - { - return ValueTask.CompletedTask; - } - - /// - /// 注册释放的执行内容 - /// - /// - protected void RegisterOnDispose(Action action) - { - _onDispose += _ => - { - action(); - return ValueTask.CompletedTask; - }; - } - - /// - /// 注册释放的执行内容 - /// - /// - protected void RegisterOnDispose(Action action) - { - _onDispose += context => - { - action(context); - return ValueTask.CompletedTask; - }; - } - - protected void RegisterOnDispose(Func onDispose) => _onDispose += onDispose; - - private Func? _onDispose; - - void IMessageWorkerManagerSensitive.SetMessageWorkerManager(MessageWorkerManager manager) - { - Manager = manager; - - Logger = ServiceProvider.GetRequiredService>(); - } - - protected void ThrowNotManager() - { - if (Manager == null) - { - throw new InvalidOperationException($"MessageWorkerManager is null. 没有注入 MessageWorkerManager 对象,请确保 {GetType().FullName} 在 MessageWorkerManager 里运行。如调用 {nameof(MessageWorkerManager.GetWorker)} 或 {nameof(MessageWorkerManager.RunWorker)} 执行"); - } - } -} diff --git a/src/LightWorkFlowManager/Workers/MessageWorker_Input.cs b/src/LightWorkFlowManager/Workers/MessageWorker_Input.cs new file mode 100644 index 0000000..4a1c894 --- /dev/null +++ b/src/LightWorkFlowManager/Workers/MessageWorker_Input.cs @@ -0,0 +1,72 @@ +using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Exceptions; +using DC.LightWorkFlowManager.Protocols; + +using System; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace DC.LightWorkFlowManager.Workers; + +/// +/// 表示需要单个输入参数的工作器基类。 +/// +/// 输入参数类型。 +public abstract class MessageWorker : MessageWorker +{ + /// + protected sealed override ValueTask DoInnerAsync(IWorkerContext context) + { + var input = GetInputContext(); + + return DoInnerAsync(input); + } + + protected abstract ValueTask DoInnerAsync(TInput input); + + /// + /// 使用指定输入参数运行工作器。 + /// + /// 工作器输入参数。 + /// 带输出结果的工作器执行结果。 + public ValueTask RunAsync(TInput input) + { + ThrowNotManager(); + + SetContext(input); + return RunAsync(); + } + + /// + /// 使用指定输入参数运行工作器。 + /// + /// 工作器输入参数。 + /// 带输出结果的工作器执行结果。 + public ValueTask RunAsync(WorkerResult input) + { + if (input.IsSuccess) + { + return RunAsync(input.Result); + + } + else + { + Debug.Assert(input.IsFail); + WorkerResult result = input; + return ValueTask.FromResult(result); + } + } + + /// + /// 使用当前上下文中的参数转换为输入后运行工作器。 + /// + /// 当前上下文中的参数类型。 + /// 将上下文参数转换为输入参数的委托。 + /// 带输出结果的工作器执行结果。 + public ValueTask RunAsync(Func converter) + { + var argument = GetInputContext(); + var input = converter(argument); + return RunAsync(input); + } +} \ No newline at end of file diff --git a/src/LightWorkFlowManager/Workers/MessageWorker_Input_Output.cs b/src/LightWorkFlowManager/Workers/MessageWorker_Input_Output.cs new file mode 100644 index 0000000..859dc2a --- /dev/null +++ b/src/LightWorkFlowManager/Workers/MessageWorker_Input_Output.cs @@ -0,0 +1,134 @@ +using DC.LightWorkFlowManager.Contexts; +using DC.LightWorkFlowManager.Exceptions; +using DC.LightWorkFlowManager.Protocols; + +using System; +using System.Diagnostics; +using System.Threading.Tasks; + +namespace DC.LightWorkFlowManager.Workers; + +/// +/// 表示带输入与输出参数的工作器基类。 +/// +/// 输入参数类型。 +/// 输出参数类型。 +public abstract class MessageWorker : MessageWorker +{ + /// + protected sealed override async ValueTask DoInnerAsync(IWorkerContext context) + { + var input = GetInputContext(); + + WorkerResult output = await DoInnerAsync(input); + + if (output.IsSuccess) + { + CurrentContext.SetContext(output.Result); + } + + return output; + } + + protected abstract ValueTask> DoInnerAsync(TInput input); + + /// + /// 使用指定输入参数运行工作器。 + /// + /// 工作器输入参数。 + /// 带输出结果的工作器执行结果。 + public ValueTask> RunAsync(TInput input) + { + ThrowNotManager(); + + SetContext(input); + return RunAsync(); + } + + /// + /// 使用指定输入参数运行工作器。 + /// + /// 工作器输入参数。 + /// 带输出结果的工作器执行结果。 + public ValueTask> RunAsync(WorkerResult input) + { + if (input.IsSuccess) + { + return RunAsync(input.Result); + } + else + { + Debug.Assert(input.IsFail); + var result = input.AsFail(); + return ValueTask.FromResult(result); + } + } + + /// + /// 使用当前上下文中的参数转换为输入后运行工作器。 + /// + /// 当前上下文中的参数类型。 + /// 将上下文参数转换为输入参数的委托。 + /// 带输出结果的工作器执行结果。 + public ValueTask> RunAsync(Func converter) + { + var argument = CurrentContext.GetEnsureContext(); + var input = converter(argument); + return RunAsync(input); + } + + /// + /// 使用当前上下文运行工作器并返回输出结果。 + /// + /// 带输出结果的工作器执行结果。 + public new async ValueTask> RunAsync() + { + ThrowNotManager(); + + var manager = Manager; + await manager.RunWorker(this); + if (Status.IsFail) + { + return manager.GetFailResult(); + } + else + { + return GetEnsureContext(); + } + } + + /// + /// 返回且标记失败 + /// + /// + /// 是否需要重试 + /// + protected new WorkerResult Fail(WorkFlowErrorCode errorCode, bool canRetry = true) + { + return new WorkerResult(errorCode, canRetry); + } + + protected WorkerResult Fail(WorkerResult failResult) + { + return failResult.AsFail(); + } + + protected new ValueTask> FailTask(WorkFlowErrorCode errorCode, bool canRetry = true) + => ValueTask.FromResult(Fail(errorCode, canRetry)); + + protected ValueTask> FailTask(WorkerResult failResult) + { + return FailTask(failResult.ErrorCode, failResult.CanRetry); + } + + protected WorkerResult Success(TOutput output) + { + return new WorkerResult(output); + } + + protected ValueTask> SuccessTask(TOutput output) + { + var result = Success(output); + return ValueTask.FromResult(result); + } +} \ No newline at end of file