diff --git a/.gitignore b/.gitignore
index 809f06a76..ed8c08844 100644
--- a/.gitignore
+++ b/.gitignore
@@ -58,3 +58,5 @@ $tf/pendingchanges.tfb
$tf/properties.tf1
project.lock.json
**/.vs/
+
+.claude/
\ No newline at end of file
diff --git a/Sources/.editorconfig b/Sources/.editorconfig
index 1bae2e959..d1110ea7e 100644
--- a/Sources/.editorconfig
+++ b/Sources/.editorconfig
@@ -50,4 +50,4 @@ csharp_new_line_before_catch = true
csharp_new_line_before_finally = true
csharp_indent_case_contents = true
csharp_indent_switch_labels = true
-csharp_preserve_single_line_statements = false
\ No newline at end of file
+csharp_preserve_single_line_statements = false
diff --git a/Sources/Core/Directory.Build.props b/Sources/Core/Directory.Build.props
index 15f391280..fa8e33c0e 100644
--- a/Sources/Core/Directory.Build.props
+++ b/Sources/Core/Directory.Build.props
@@ -3,15 +3,5 @@
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
-
-
- $(EnlistmentRoot)\Sources\Microsoft.StreamProcessing.ruleset
-
diff --git a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
index 1d9de4e69..00948d43b 100644
--- a/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
+++ b/Sources/Core/Microsoft.StreamProcessing.Provider/Microsoft.StreamProcessing.Provider.csproj
@@ -1,7 +1,7 @@
- netstandard2.0
+ net10.0
x64;AnyCPU
@@ -11,19 +11,7 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
+
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
index dcc3e31f0..9073f7692 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctions.cs
@@ -48,8 +48,8 @@ public static IAggregate Wrap aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return aggregate.TransformInput(transform);
}
@@ -62,8 +62,8 @@ public static IAggregate Wrap aggregate,
Expression> transform) where TAggregateInput : struct
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return aggregate.MakeInputNullableAndSkipNulls().TransformInput(transform);
}
@@ -71,8 +71,8 @@ internal static IAggregate TransformInput aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -86,19 +86,19 @@ private static Expression> TransformInput3> func,
Expression> transform)
{
- Contract.Requires(func != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(func);
+ ArgumentNullException.ThrowIfNull(transform);
var result = func.ReplaceParametersInBody(func.Parameters[0], func.Parameters[1], transform.Body);
var transformParam = transform.Parameters[0];
- return Expression.Lambda>(result, new[] { func.Parameters[0], func.Parameters[1], transformParam });
+ return Expression.Lambda>(result, [func.Parameters[0], func.Parameters[1], transformParam]);
}
internal static IAggregate TransformOutput(
this IAggregate aggregate,
Expression> transform)
{
- Contract.Requires(aggregate != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
+ ArgumentNullException.ThrowIfNull(transform);
return GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -112,8 +112,8 @@ private static Expression> TransformOutput> func,
Expression> transform)
{
- Contract.Requires(func != null);
- Contract.Requires(transform != null);
+ ArgumentNullException.ThrowIfNull(func);
+ ArgumentNullException.ThrowIfNull(transform);
var result = transform.ReplaceParametersInBody(func.Body);
return Expression.Lambda>(result, func.Parameters);
}
@@ -127,7 +127,7 @@ public static IAggregate ApplyFilter aggregate,
Expression> filter)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
if (filter == null || filter.Body.ExpressionEquals(Expression.Constant(true))) return aggregate;
Expression> newAccumulate = (oldState, timestamp, input) =>
@@ -151,7 +151,7 @@ public static IAggregate ApplyFilter MakeInputNullableAndSkipNulls(
this IAggregate aggregate) where TInput : struct
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression> newAccumulate = (oldState, timestamp, input) =>
input.HasValue ? CallInliner.Call(aggregate.Accumulate(), oldState, timestamp, input.Value) : oldState;
@@ -175,9 +175,9 @@ public static IAggregate ApplyFilter SkipNulls(
this IAggregate aggregate)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
- var inputType = typeof(TInput).GetTypeInfo();
+ var inputType = typeof(TInput);
return inputType.IsClass
? GeneratedAggregate.Create(
initialState: aggregate.InitialState(),
@@ -229,7 +229,7 @@ private static Expression> AddSkipNullValueLo
public static IAggregate, TResult?> MakeOutputNullableAndOutputNullWhenEmpty(
this IAggregate aggregate) where TResult : struct
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression>> newInitialState =
() => new NullOutputWrapper
@@ -279,7 +279,7 @@ private static Expression> AddSkipNullValueLo
public static IAggregate, TResult> OutputDefaultWhenEmpty(
this IAggregate aggregate)
{
- Contract.Requires(aggregate != null);
+ ArgumentNullException.ThrowIfNull(aggregate);
Expression>> newInitialState =
() => new NullOutputWrapper
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
index e6435694f..74a7ee508 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.cs
@@ -30,8 +30,8 @@ public static IAggregate, TResult> Combine aggregate1,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[1];
Expression, TState1>> target1 = state => state.Item1;
@@ -93,9 +93,9 @@ public static IAggregate, TResult> Combine
IAggregate aggregate2,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[2];
Expression, TState1>> target1 = state => state.Item1;
@@ -171,10 +171,10 @@ public static IAggregate, TResult
IAggregate aggregate3,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[3];
Expression, TState1>> target1 = state => state.Item1;
@@ -268,11 +268,11 @@ public static IAggregate
IAggregate aggregate4,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[4];
Expression, TState1>> target1 = state => state.Item1;
@@ -388,12 +388,12 @@ public static IAggregate aggregate5,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[5];
Expression, TState1>> target1 = state => state.Item1;
@@ -535,13 +535,13 @@ public static IAggregate aggregate6,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[6];
Expression, TState1>> target1 = state => state.Item1;
@@ -713,14 +713,14 @@ public static IAggregate aggregate7,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[7];
Expression, TState1>> target1 = state => state.Item1;
@@ -926,15 +926,15 @@ public static IAggregate aggregate8,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[8];
Expression, TState1>> target1 = state => state.Item1;
@@ -1178,16 +1178,16 @@ public static IAggregate aggregate9,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[9];
Expression, TState1>> target1 = state => state.Item1;
@@ -1473,17 +1473,17 @@ public static IAggregate aggregate10,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[10];
Expression, TState1>> target1 = state => state.Item1;
@@ -1815,18 +1815,18 @@ public static IAggregate aggregate11,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[11];
Expression, TState1>> target1 = state => state.Item1;
@@ -2208,19 +2208,19 @@ public static IAggregate aggregate12,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[12];
Expression, TState1>> target1 = state => state.Item1;
@@ -2656,20 +2656,20 @@ public static IAggregate aggregate13,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[13];
Expression, TState1>> target1 = state => state.Item1;
@@ -3163,21 +3163,21 @@ public static IAggregate aggregate14,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(aggregate14 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(aggregate14);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[14];
Expression, TState1>> target1 = state => state.Item1;
@@ -3733,22 +3733,22 @@ public static IAggregate aggregate15,
Expression> merger)
{
- Contract.Requires(aggregate1 != null);
- Contract.Requires(aggregate2 != null);
- Contract.Requires(aggregate3 != null);
- Contract.Requires(aggregate4 != null);
- Contract.Requires(aggregate5 != null);
- Contract.Requires(aggregate6 != null);
- Contract.Requires(aggregate7 != null);
- Contract.Requires(aggregate8 != null);
- Contract.Requires(aggregate9 != null);
- Contract.Requires(aggregate10 != null);
- Contract.Requires(aggregate11 != null);
- Contract.Requires(aggregate12 != null);
- Contract.Requires(aggregate13 != null);
- Contract.Requires(aggregate14 != null);
- Contract.Requires(aggregate15 != null);
- Contract.Requires(merger != null);
+ ArgumentNullException.ThrowIfNull(aggregate1);
+ ArgumentNullException.ThrowIfNull(aggregate2);
+ ArgumentNullException.ThrowIfNull(aggregate3);
+ ArgumentNullException.ThrowIfNull(aggregate4);
+ ArgumentNullException.ThrowIfNull(aggregate5);
+ ArgumentNullException.ThrowIfNull(aggregate6);
+ ArgumentNullException.ThrowIfNull(aggregate7);
+ ArgumentNullException.ThrowIfNull(aggregate8);
+ ArgumentNullException.ThrowIfNull(aggregate9);
+ ArgumentNullException.ThrowIfNull(aggregate10);
+ ArgumentNullException.ThrowIfNull(aggregate11);
+ ArgumentNullException.ThrowIfNull(aggregate12);
+ ArgumentNullException.ThrowIfNull(aggregate13);
+ ArgumentNullException.ThrowIfNull(aggregate14);
+ ArgumentNullException.ThrowIfNull(aggregate15);
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[15];
Expression, TState1>> target1 = state => state.Item1;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
index 4e9d03d82..1770cb83e 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/AggregateFunctionsTemplate.tt
@@ -35,8 +35,8 @@ namespace Microsoft.StreamProcessing.Aggregates
<#= IterateCommaLine(" IAggregate aggregate$", count) #>,
Expression, TResult>> merger)
{
-<#= IterateLine(" Contract.Requires(aggregate$ != null);", count) #>
- Contract.Requires(merger != null);
+<#= IterateLine(" ArgumentNullException.ThrowIfNull(aggregate$);", count) #>
+ ArgumentNullException.ThrowIfNull(merger);
var duplicate = new bool[<#= count #>];
<# for (int i = 1; i <= count; i++) { #>
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
index db3e0ab86..d4ab16d42 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/CountAggregate.cs
@@ -21,7 +21,7 @@ internal sealed class CountAggregate : ISummableAggregate> diff = (leftCount, rightCount) => leftCount - rightCount;
public Expression> Difference() => diff;
- private static readonly Expression> sum = (leftCount, rightCount) => leftCount - rightCount;
+ private static readonly Expression> sum = (leftCount, rightCount) => leftCount + rightCount;
public Expression> Sum() => sum;
private static readonly Expression> res = count => count;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
index 2c85abddc..ad890db45 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/GeneratedAggregate.cs
@@ -17,7 +17,7 @@ public static GeneratedAggregate Create> deaccumulate,
Expression> difference,
Expression> computeResult)
- => new GeneratedAggregate(initialState, accumulate, deaccumulate, difference, computeResult);
+ => new(initialState, accumulate, deaccumulate, difference, computeResult);
}
internal class GeneratedAggregate : IAggregate
@@ -39,11 +39,11 @@ public GeneratedAggregate(
Expression> difference,
Expression> computeResult)
{
- Contract.Requires(initialState != null);
- Contract.Requires(accumulate != null);
- Contract.Requires(deaccumulate != null);
- Contract.Requires(difference != null);
- Contract.Requires(computeResult != null);
+ ArgumentNullException.ThrowIfNull(initialState);
+ ArgumentNullException.ThrowIfNull(accumulate);
+ ArgumentNullException.ThrowIfNull(deaccumulate);
+ ArgumentNullException.ThrowIfNull(difference);
+ ArgumentNullException.ThrowIfNull(computeResult);
this.initialState = initialState;
this.accumulate = accumulate;
this.deaccumulate = deaccumulate;
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
index 0b8a7c037..0177d4a3b 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/MinMaxAggregate.cs
@@ -16,7 +16,7 @@ protected MinMaxAggregateBase(QueryContainer container) : this(ComparerExpressio
protected MinMaxAggregateBase(IComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var generator = comparer.CreateSortedDictionaryGenerator(container);
Expression>, MinMaxState>> template
@@ -26,7 +26,7 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
private static readonly Expression, long, T, MinMaxState>> acc
= (set, timestamp, input) => new MinMaxState { savedValues = set.savedValues.Add(input) };
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
index c92063a7f..f4703205d 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileContinuous.cs
@@ -18,12 +18,12 @@ public PercentileContinuousDoubleAggregate(double percentile, QueryContainer con
public PercentileContinuousDoubleAggregate(double percentile, IComparerExpression rankComparer, QueryContainer container)
: base(rankComparer, container)
{
- Contract.Requires(rankComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
Contract.Requires(percentile >= 0.0 && percentile <= 1.0);
this.percentile = percentile;
}
- public override Expression, double>> ComputeResult() => set => CalculatePercentile(set);
+ public override Expression, double>> ComputeResult() => set => this.CalculatePercentile(set);
public double CalculatePercentile(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
index 2cd3d8771..0e99e6a86 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/PercentileDiscrete.cs
@@ -18,12 +18,12 @@ public PercentileDiscreteDoubleAggregate(double percentile, QueryContainer conta
public PercentileDiscreteDoubleAggregate(double percentile, IComparerExpression rankComparer, QueryContainer container)
: base(rankComparer, container)
{
- Contract.Requires(rankComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
Contract.Requires(percentile >= 0.0 && percentile <= 1.0);
this.percentile = percentile;
}
- public override Expression, double>> ComputeResult() => set => CalculatePercentile(set);
+ public override Expression, double>> ComputeResult() => set => this.CalculatePercentile(set);
public double CalculatePercentile(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
index fe31f2630..519484698 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMaxAggregate.cs
@@ -19,7 +19,7 @@ public SlidingMaxAggregate(QueryContainer container) : this(ComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
this.comparer = comparer.GetCompareExpr().Compile();
var generator = comparer.CreateSortedDictionaryGenerator(container);
@@ -30,10 +30,10 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
+ => (state, timestamp, input) => this.Accumulate(state, timestamp, input);
private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
{
@@ -75,7 +75,7 @@ private static MinMaxState Difference(MinMaxState leftSet, MinMaxState
return leftSet;
}
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
+ public Expression, T>> ComputeResult() => state => this.ComputeResult(state);
private T ComputeResult(MinMaxState state)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
index 9ad193bec..5e6e203bd 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SlidingMinAggregate.cs
@@ -19,7 +19,7 @@ public SlidingMinAggregate(QueryContainer container) : this(ComparerExpression comparer, QueryContainer container)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
this.comparer = comparer.GetCompareExpr().Compile();
var generator = comparer.CreateSortedDictionaryGenerator(container);
@@ -30,10 +30,10 @@ Expression>, MinMaxState>> template
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
public Expression, long, T, MinMaxState>> Accumulate()
- => (state, timestamp, input) => Accumulate(state, timestamp, input);
+ => (state, timestamp, input) => this.Accumulate(state, timestamp, input);
private MinMaxState Accumulate(MinMaxState state, long timestamp, T input)
{
@@ -75,7 +75,7 @@ private static MinMaxState Difference(MinMaxState leftSet, MinMaxState
return leftSet;
}
- public Expression, T>> ComputeResult() => state => ComputeResult(state);
+ public Expression, T>> ComputeResult() => state => this.ComputeResult(state);
private T ComputeResult(MinMaxState state)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
index 645f9f226..17dc5c772 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/SortedMultisetAggregateBase.cs
@@ -17,11 +17,11 @@ protected SortedMultisetAggregateBase(IComparerExpression comparer, QueryCont
Expression>, SortedMultiSet>> template
= (g) => new SortedMultiSet(g);
var replaced = template.ReplaceParametersInBody(generator);
- initialState = Expression.Lambda>>(replaced);
+ this.initialState = Expression.Lambda>>(replaced);
}
private readonly Expression>> initialState;
- public Expression>> InitialState() => initialState;
+ public Expression>> InitialState() => this.initialState;
private static readonly Expression, long, T, SortedMultiSet>> acc
= (set, timestamp, input) => set.Add(input);
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
index 15ebdacff..c9ff82461 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TopKAggregate.cs
@@ -22,8 +22,8 @@ public TopKAggregate(int k, IComparerExpression rankComparer, QueryContainer
public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpression overallComparer, QueryContainer container)
: base(ThenOrderBy(Reverse(rankComparer), overallComparer), container)
{
- Contract.Requires(rankComparer != null);
- Contract.Requires(overallComparer != null);
+ ArgumentNullException.ThrowIfNull(rankComparer);
+ ArgumentNullException.ThrowIfNull(overallComparer);
Contract.Requires(k > 0);
this.compiledRankComparer = Reverse(rankComparer).GetCompareExpr().Compile();
this.k = k;
@@ -31,7 +31,7 @@ public TopKAggregate(int k, IComparerExpression rankComparer, IComparerExpres
private static IComparerExpression Reverse(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var expression = comparer.GetCompareExpr();
Expression> template = (left, right) => CallInliner.Call(expression, right, left);
var reversedExpression = template.InlineCalls();
@@ -40,8 +40,8 @@ private static IComparerExpression Reverse(IComparerExpression comparer)
private static IComparerExpression ThenOrderBy(IComparerExpression comparer1, IComparerExpression comparer2)
{
- Contract.Requires(comparer1 != null);
- Contract.Requires(comparer2 != null);
+ ArgumentNullException.ThrowIfNull(comparer1);
+ ArgumentNullException.ThrowIfNull(comparer2);
var primary = comparer1.GetCompareExpr();
var secondary = comparer2.GetCompareExpr();
Expression> template =
@@ -53,7 +53,7 @@ private static IComparerExpression ThenOrderBy(IComparerExpression compare
return new ComparerExpression(newExpression);
}
- public override Expression, List>>> ComputeResult() => set => GetTopK(set);
+ public override Expression, List>>> ComputeResult() => set => this.GetTopK(set);
private List> GetTopK(SortedMultiSet set)
{
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
index f53686fa5..a76600c71 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMaxAggregate.cs
@@ -19,7 +19,7 @@ public TumblingMaxAggregate() : this(ComparerExpression.Default) { }
public TumblingMaxAggregate(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var stateExpression = Expression.Parameter(typeof(MinMaxState), "state");
var timestampExpression = Expression.Parameter(typeof(long), "timestamp");
@@ -32,7 +32,7 @@ public TumblingMaxAggregate(IComparerExpression comparer)
var comparerExpression = comparer.GetCompareExpr().ReplaceParametersInBody(
inputExpression, currentValue.ReplaceParametersInBody(stateExpression));
- var typeInfo = typeof(MinMaxState).GetTypeInfo();
+ var minMaxStateType = typeof(MinMaxState);
this.accumulate = Expression.Lambda, long, T, MinMaxState>>(
Expression.Condition(
Expression.OrElse(
@@ -40,8 +40,8 @@ public TumblingMaxAggregate(IComparerExpression comparer)
Expression.GreaterThan(comparerExpression, Expression.Constant(0))),
Expression.MemberInit(
(NewExpression)constructor.Body,
- Expression.Bind(typeInfo.GetField("currentTimestamp"), timestampExpression),
- Expression.Bind(typeInfo.GetField("currentValue"), inputExpression)),
+ Expression.Bind(minMaxStateType.GetField("currentTimestamp"), timestampExpression),
+ Expression.Bind(minMaxStateType.GetField("currentValue"), inputExpression)),
stateExpression),
stateExpression,
timestampExpression,
diff --git a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
index 2427e3e86..7e83e473d 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Aggregates/TumblingMinAggregate.cs
@@ -19,7 +19,7 @@ public TumblingMinAggregate() : this(ComparerExpression.Default) { }
public TumblingMinAggregate(IComparerExpression comparer)
{
- Contract.Requires(comparer != null);
+ ArgumentNullException.ThrowIfNull(comparer);
var stateExpression = Expression.Parameter(typeof(MinMaxState), "state");
var timestampExpression = Expression.Parameter(typeof(long), "timestamp");
@@ -32,7 +32,7 @@ public TumblingMinAggregate(IComparerExpression comparer)
var comparerExpression = comparer.GetCompareExpr().ReplaceParametersInBody(
inputExpression, currentValue.ReplaceParametersInBody(stateExpression));
- var typeInfo = typeof(MinMaxState).GetTypeInfo();
+ var minMaxStateType = typeof(MinMaxState);
this.accumulate = Expression.Lambda, long, T, MinMaxState>>(
Expression.Condition(
Expression.OrElse(
@@ -40,8 +40,8 @@ public TumblingMinAggregate(IComparerExpression comparer)
Expression.LessThan(comparerExpression, Expression.Constant(0))),
Expression.MemberInit(
(NewExpression)constructor.Body,
- Expression.Bind(typeInfo.GetField("currentTimestamp"), timestampExpression),
- Expression.Bind(typeInfo.GetField("currentValue"), inputExpression)),
+ Expression.Bind(minMaxStateType.GetField("currentTimestamp"), timestampExpression),
+ Expression.Bind(minMaxStateType.GetField("currentValue"), inputExpression)),
stateExpression),
stateExpression,
timestampExpression,
diff --git a/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs b/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
index c30e42081..b3ed00143 100644
--- a/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/CacheUtilities/StreamableIO.cs
@@ -149,27 +149,19 @@ private static void InferProperties(
}
}
- internal sealed class QueuedMessageObservable : IObservable>>
+ internal sealed class QueuedMessageObservable(
+ IStreamable stream
+ ) : IObservable>>
{
- private readonly IStreamable streamable;
-
- public QueuedMessageObservable(IStreamable stream) => this.streamable = stream;
-
public IDisposable Subscribe(IObserver>> observer)
- => this.streamable.Subscribe(new QueuedMessageObserver(observer));
+ => stream.Subscribe(new QueuedMessageObserver(observer));
}
- internal sealed class QueuedMessageObserver : IStreamObserver, IDisposable
+ internal sealed class QueuedMessageObserver(
+ IObserver>> observer
+ ) : IStreamObserver, IDisposable
{
- private readonly IObserver>> observer;
-
- public QueuedMessageObserver(IObserver>> observer)
- {
- this.observer = observer;
- this.ClassId = Guid.NewGuid();
- }
-
- public Guid ClassId { get; }
+ public Guid ClassId { get; } = Guid.NewGuid();
public int CurrentlyBufferedOutputCount => 0;
@@ -177,10 +169,10 @@ public QueuedMessageObserver(IObserver throw new NotImplementedException();
- public void OnCompleted() => this.observer.OnCompleted();
+ public void OnCompleted() => observer.OnCompleted();
public void OnError(Exception error) => throw error;
- public void OnFlush() => this.observer.OnNext(new QueuedMessage> { Kind = MessageKind.Flush });
- public void OnNext(StreamMessage value) => this.observer.OnNext(new QueuedMessage> { Kind = MessageKind.DataBatch, Message = value });
+ public void OnFlush() => observer.OnNext(new() { Kind = MessageKind.Flush });
+ public void OnNext(StreamMessage value) => observer.OnNext(new() { Kind = MessageKind.DataBatch, Message = value });
public void ProduceQueryPlan(PlanNode previous) { }
public void Reset() { }
public void Restore(Stream stream) { }
diff --git a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
index 3df34156c..77e8d5a49 100644
--- a/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
+++ b/Sources/Core/Microsoft.StreamProcessing/Collections/CircularBuffer.cs
@@ -134,7 +134,7 @@ public IEnumerable Iterate()
[EditorBrowsable(EditorBrowsableState.Never)]
public sealed class ElasticCircularBuffer : IEnumerable
{
- private readonly LinkedList> buffers = new LinkedList>();
+ private readonly LinkedList> buffers = new();
private LinkedListNode> head;
private LinkedListNode> tail;
@@ -144,7 +144,7 @@ public sealed class ElasticCircularBuffer : IEnumerable
[EditorBrowsable(EditorBrowsableState.Never)]
public ElasticCircularBuffer()
{
- var node = new LinkedListNode