diff --git a/docs/reference/graphql/graphql_API.md b/docs/reference/graphql/graphql_API.md index 8b6dda2019..b0ee335066 100644 --- a/docs/reference/graphql/graphql_API.md +++ b/docs/reference/graphql/graphql_API.md @@ -5971,7 +5971,7 @@ Key of a property. values -[String!]! +[PropertyOutput!]! Return the values of the properties. @@ -5980,7 +5980,7 @@ Return the values of the properties. at -String +PropertyOutput @@ -5990,12 +5990,12 @@ Return the values of the properties. latest -String +PropertyOutput unique -[String!]! +[PropertyOutput!]! @@ -6008,6 +6008,73 @@ Return the values of the properties. Boolean! + +sum +PropertyOutput + + +Sum of all updates. Returns null if the dtype is not additive or the property is empty. + + + + +mean +PropertyOutput + + +Mean of all updates as an F64. Returns null if any value is non-numeric or the property is +empty. + + + + +average +PropertyOutput + + +Alias for `mean`. + + + + +min +PropertyTuple + + +Minimum `(time, value)` pair. Returns null if the dtype is not comparable or the property is +empty. + + + + +max +PropertyTuple + + +Maximum `(time, value)` pair. Returns null if the dtype is not comparable or the property is +empty. + + + + +median +PropertyTuple + + +Median `(time, value)` pair (lower median on even-length inputs). Returns null if the dtype +is not comparable or the property is empty. + + + + +count +Int! + + +Number of updates. + + + diff --git a/python/tests/test_base_install/test_graphql/test_gql_temporal_aggregates.py b/python/tests/test_base_install/test_graphql/test_gql_temporal_aggregates.py new file mode 100644 index 0000000000..7084ec3404 --- /dev/null +++ b/python/tests/test_base_install/test_graphql/test_gql_temporal_aggregates.py @@ -0,0 +1,465 @@ +from utils import run_group_graphql_test +from raphtory import Graph + + +def create_graph() -> Graph: + graph = Graph() + + # Bare node event so "A" exists at t=50 before any score updates + graph.add_node(50, "A") + # Node "A" with a numeric temporal property "score" at 4 timestamps + graph.add_node(100, "A", properties={"score": 10}) + graph.add_node(200, "A", properties={"score": 20}) + graph.add_node(300, "A", properties={"score": 30}) + graph.add_node(400, "A", properties={"score": 40}) + + # Edge "A -> B" with "weight" on two layers + graph.add_edge(100, "A", "B", properties={"weight": 1.0}, layer="layer1") + graph.add_edge(200, "A", "B", properties={"weight": 2.0}, layer="layer1") + graph.add_edge(300, "A", "B", properties={"weight": 3.0}, layer="layer2") + graph.add_edge(400, "A", "B", properties={"weight": 4.0}, layer="layer2") + + return graph + + +def test_node_temporal_aggregates(): + graph = create_graph() + queries_and_expected_outputs = [] + + # full timeline: score = [10, 20, 30, 40] + query = """ + { + graph(path: "g") { + node(name: "A") { + properties { + temporal { + get(key: "score") { + sum + mean + average + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "node": { + "properties": { + "temporal": { + "get": { + "sum": 100, + "mean": 25.0, + "average": 25.0, + "count": 4, + "min": {"time": {"timestamp": 100}, "value": 10}, + "max": {"time": {"timestamp": 400}, "value": 40}, + # lower median on even-length input: sorted[(4-1)/2] = index 1 + "median": {"time": {"timestamp": 200}, "value": 20}, + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # windowed [150, 350): score = [20, 30] + query = """ + { + graph(path: "g") { + window(start: 150, end: 350) { + node(name: "A") { + properties { + temporal { + get(key: "score") { + sum + mean + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "window": { + "node": { + "properties": { + "temporal": { + "get": { + "sum": 50, + "mean": 25.0, + "count": 2, + "min": {"time": {"timestamp": 200}, "value": 20}, + "max": {"time": {"timestamp": 300}, "value": 30}, + # lower median of [20, 30] => index 0 + "median": {"time": {"timestamp": 200}, "value": 20}, + } + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # window with no updates in range: score property doesn't exist => get is null + query = """ + { + graph(path: "g") { + window(start: 40, end: 60) { + node(name: "A") { + properties { + temporal { + get(key: "score") { + sum + mean + count + } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "window": { + "node": { + "properties": { + "temporal": {"get": {"sum": None, "mean": None, "count": 0}} + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + run_group_graphql_test(queries_and_expected_outputs, graph) + + +def create_non_numeric_graph() -> Graph: + graph = Graph() + # string-valued temporal property (insertion order != sorted order, so + # min/max/median land on interior timestamps) + graph.add_node(1, "A", properties={"name": "cherry"}) + graph.add_node(2, "A", properties={"name": "apple"}) + graph.add_node(3, "A", properties={"name": "banana"}) + graph.add_node(4, "A", properties={"name": "date"}) + # bool-valued temporal property, mixed so True/False appear multiple times + graph.add_node(1, "A", properties={"flag": True}) + graph.add_node(2, "A", properties={"flag": False}) + graph.add_node(3, "A", properties={"flag": True}) + graph.add_node(4, "A", properties={"flag": False}) + graph.add_node(5, "A", properties={"flag": True}) + return graph + + +def test_temporal_aggregates_on_non_numeric(): + """Pin down semantics for aggregates on non-numeric temporal properties. + + - Strings: `sum` concatenates (strings are additive), `min/max/median` work + lexicographically, `mean` is null (not f64-convertible). + - Bools: `sum`/`mean` are null (not additive, not f64-convertible), but + `min/max/median` work (False < True). + """ + graph = create_non_numeric_graph() + queries_and_expected_outputs = [] + + # strings + query = """ + { + graph(path: "g") { + node(name: "A") { + properties { + temporal { + get(key: "name") { + sum + mean + average + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + """ + # timeline: cherry(t=1), apple(t=2), banana(t=3), date(t=4) + # sorted lex: apple(t=2) < banana(t=3) < cherry(t=1) < date(t=4) + expected_output = { + "graph": { + "node": { + "properties": { + "temporal": { + "get": { + # concatenation in insertion order + "sum": "cherryapplebananadate", + "mean": None, + "average": None, + "count": 4, + "min": {"time": {"timestamp": 2}, "value": "apple"}, + "max": {"time": {"timestamp": 4}, "value": "date"}, + # lower median of len=4 is sorted[(4-1)/2] = index 1 + "median": {"time": {"timestamp": 3}, "value": "banana"}, + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # bools + query = """ + { + graph(path: "g") { + node(name: "A") { + properties { + temporal { + get(key: "flag") { + sum + mean + average + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + """ + # timeline: True(t=1), False(t=2), True(t=3), False(t=4), True(t=5) + # min fold keeps first smaller value encountered => first False at t=2 + # max fold keeps first larger-or-equal value encountered => True at t=1 + # median: stable sort by value gives [False@2, False@4, True@1, True@3, True@5] + # lower median of len=5 => sorted[(5-1)/2] = index 2 = True at t=1 + expected_output = { + "graph": { + "node": { + "properties": { + "temporal": { + "get": { + "sum": None, + "mean": None, + "average": None, + "count": 5, + "min": {"time": {"timestamp": 2}, "value": False}, + "max": {"time": {"timestamp": 1}, "value": True}, + "median": {"time": {"timestamp": 1}, "value": True}, + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + run_group_graphql_test(queries_and_expected_outputs, graph) + + +def test_edge_temporal_aggregates_across_layers(): + graph = create_graph() + queries_and_expected_outputs = [] + + # full (both layers): weight = [1.0, 2.0, 3.0, 4.0] + query = """ + { + graph(path: "g") { + edge(src: "A", dst: "B") { + properties { + temporal { + get(key: "weight") { + sum + mean + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "edge": { + "properties": { + "temporal": { + "get": { + "sum": 10.0, + "mean": 2.5, + "count": 4, + "min": {"time": {"timestamp": 100}, "value": 1.0}, + "max": {"time": {"timestamp": 400}, "value": 4.0}, + "median": {"time": {"timestamp": 200}, "value": 2.0}, + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # layer1 only: weight = [1.0, 2.0] + query = """ + { + graph(path: "g") { + layer(name: "layer1") { + edge(src: "A", dst: "B") { + properties { + temporal { + get(key: "weight") { + sum + mean + count + min { time { timestamp } value } + max { time { timestamp } value } + } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "layer": { + "edge": { + "properties": { + "temporal": { + "get": { + "sum": 3.0, + "mean": 1.5, + "count": 2, + "min": {"time": {"timestamp": 100}, "value": 1.0}, + "max": {"time": {"timestamp": 200}, "value": 2.0}, + } + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # layer2 only: weight = [3.0, 4.0] + query = """ + { + graph(path: "g") { + layer(name: "layer2") { + edge(src: "A", dst: "B") { + properties { + temporal { + get(key: "weight") { + sum + mean + count + min { time { timestamp } value } + max { time { timestamp } value } + } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "layer": { + "edge": { + "properties": { + "temporal": { + "get": { + "sum": 7.0, + "mean": 3.5, + "count": 2, + "min": {"time": {"timestamp": 300}, "value": 3.0}, + "max": {"time": {"timestamp": 400}, "value": 4.0}, + } + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + # combined: windowed + layer-filtered (layer1, window [150, 400)) + # => weight = [2.0] + query = """ + { + graph(path: "g") { + layer(name: "layer1") { + window(start: 150, end: 400) { + edge(src: "A", dst: "B") { + properties { + temporal { + get(key: "weight") { + sum + mean + count + min { time { timestamp } value } + max { time { timestamp } value } + median { time { timestamp } value } + } + } + } + } + } + } + } + } + """ + expected_output = { + "graph": { + "layer": { + "window": { + "edge": { + "properties": { + "temporal": { + "get": { + "sum": 2.0, + "mean": 2.0, + "count": 1, + "min": {"time": {"timestamp": 200}, "value": 2.0}, + "max": {"time": {"timestamp": 200}, "value": 2.0}, + "median": { + "time": {"timestamp": 200}, + "value": 2.0, + }, + } + } + } + } + } + } + } + } + queries_and_expected_outputs.append((query, expected_output)) + + run_group_graphql_test(queries_and_expected_outputs, graph) diff --git a/raphtory-graphql/schema.graphql b/raphtory-graphql/schema.graphql index 7222921302..2840a39ffb 100644 --- a/raphtory-graphql/schema.graphql +++ b/raphtory-graphql/schema.graphql @@ -3280,11 +3280,43 @@ type TemporalProperty { """ Return the values of the properties. """ - values: [String!]! - at(t: TimeInput!): String - latest: String - unique: [String!]! + values: [PropertyOutput!]! + at(t: TimeInput!): PropertyOutput + latest: PropertyOutput + unique: [PropertyOutput!]! orderedDedupe(latestTime: Boolean!): [PropertyTuple!]! + """ + Sum of all updates. Returns null if the dtype is not additive or the property is empty. + """ + sum: PropertyOutput + """ + Mean of all updates as an F64. Returns null if any value is non-numeric or the property is + empty. + """ + mean: PropertyOutput + """ + Alias for `mean`. + """ + average: PropertyOutput + """ + Minimum `(time, value)` pair. Returns null if the dtype is not comparable or the property is + empty. + """ + min: PropertyTuple + """ + Maximum `(time, value)` pair. Returns null if the dtype is not comparable or the property is + empty. + """ + max: PropertyTuple + """ + Median `(time, value)` pair (lower median on even-length inputs). Returns null if the dtype + is not comparable or the property is empty. + """ + median: PropertyTuple + """ + Number of updates. + """ + count: Int! } input TemporalPropertyInput { diff --git a/raphtory-graphql/src/model/graph/property.rs b/raphtory-graphql/src/model/graph/property.rs index d08b3f377c..f658431658 100644 --- a/raphtory-graphql/src/model/graph/property.rs +++ b/raphtory-graphql/src/model/graph/property.rs @@ -339,29 +339,29 @@ impl GqlTemporalProperty { } /// Return the values of the properties. - async fn values(&self) -> Vec { + async fn values(&self) -> Vec { let self_clone = self.clone(); - blocking_compute(move || self_clone.prop.values().map(|x| x.to_string()).collect()).await + blocking_compute(move || self_clone.prop.values().map(GqlPropertyOutputVal).collect()).await } - async fn at(&self, t: GqlTimeInput) -> Option { + async fn at(&self, t: GqlTimeInput) -> Option { let self_clone = self.clone(); - blocking_compute(move || self_clone.prop.at(t.into_time()).map(|x| x.to_string())).await + blocking_compute(move || self_clone.prop.at(t.into_time()).map(GqlPropertyOutputVal)).await } - async fn latest(&self) -> Option { + async fn latest(&self) -> Option { let self_clone = self.clone(); - blocking_compute(move || self_clone.prop.latest().map(|x| x.to_string())).await + blocking_compute(move || self_clone.prop.latest().map(GqlPropertyOutputVal)).await } - async fn unique(&self) -> Vec { + async fn unique(&self) -> Vec { let self_clone = self.clone(); blocking_compute(move || { self_clone .prop .unique() .into_iter() - .map(|x| x.to_string()) + .map(GqlPropertyOutputVal) .collect_vec() }) .await @@ -379,6 +379,52 @@ impl GqlTemporalProperty { }) .await } + + /// Sum of all updates. Returns null if the dtype is not additive or the property is empty. + async fn sum(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.sum().map(GqlPropertyOutputVal)).await + } + + /// Mean of all updates as an F64. Returns null if any value is non-numeric or the property is + /// empty. + async fn mean(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.mean().map(GqlPropertyOutputVal)).await + } + + /// Alias for `mean`. + async fn average(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.average().map(GqlPropertyOutputVal)).await + } + + /// Minimum `(time, value)` pair. Returns null if the dtype is not comparable or the property is + /// empty. + async fn min(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.min().map(GqlPropertyTuple::from)).await + } + + /// Maximum `(time, value)` pair. Returns null if the dtype is not comparable or the property is + /// empty. + async fn max(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.max().map(GqlPropertyTuple::from)).await + } + + /// Median `(time, value)` pair (lower median on even-length inputs). Returns null if the dtype + /// is not comparable or the property is empty. + async fn median(&self) -> Option { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.median().map(GqlPropertyTuple::from)).await + } + + /// Number of updates. + async fn count(&self) -> usize { + let self_clone = self.clone(); + blocking_compute(move || self_clone.prop.count()).await + } } #[derive(ResolvedObject, Clone)] diff --git a/raphtory/src/db/api/properties/temporal_props.rs b/raphtory/src/db/api/properties/temporal_props.rs index 4dc8c09cef..ff000a9a5f 100644 --- a/raphtory/src/db/api/properties/temporal_props.rs +++ b/raphtory/src/db/api/properties/temporal_props.rs @@ -115,6 +115,79 @@ impl TemporalPropertyView

{ unique_props.into_iter().collect() } + /// Compute the sum of all property values, or `None` if the dtype is not additive or the + /// property is empty. + pub fn sum(&self) -> Option { + generalised_reduce(self.values(), |a, b| a.add(b), |p| p.dtype().has_add()) + } + + /// Find the minimum `(time, value)` pair, or `None` if the dtype is not comparable or the + /// property is empty. + pub fn min(&self) -> Option<(EventTime, Prop)> { + generalised_reduce( + self.iter(), + |a, b| { + if a.1.partial_cmp(&b.1)?.is_le() { + Some(a) + } else { + Some(b) + } + }, + |(_, v)| v.dtype().has_cmp(), + ) + } + + /// Find the maximum `(time, value)` pair, or `None` if the dtype is not comparable or the + /// property is empty. + pub fn max(&self) -> Option<(EventTime, Prop)> { + generalised_reduce( + self.iter(), + |a, b| { + if a.1.partial_cmp(&b.1)?.is_ge() { + Some(a) + } else { + Some(b) + } + }, + |(_, v)| v.dtype().has_cmp(), + ) + } + + /// Count the number of property updates. + pub fn count(&self) -> usize { + self.iter().count() + } + + /// Compute the mean of all property values as an `F64` Prop, or `None` if any value cannot be + /// converted to `f64` or the property is empty. + pub fn mean(&self) -> Option { + let mut iter = self.values(); + let mut sum = iter.next()?.as_f64()?; + let mut count = 1usize; + for value in iter { + sum += value.as_f64()?; + count += 1; + } + Some(Prop::F64(sum / count as f64)) + } + + /// Alias for `mean`. + pub fn average(&self) -> Option { + self.mean() + } + + /// Compute the median `(time, value)` pair (lower median on even-length inputs), or `None` if + /// the dtype is not comparable or the property is empty. + pub fn median(&self) -> Option<(EventTime, Prop)> { + let mut sorted: Vec<(EventTime, Prop)> = self.iter().collect(); + if !sorted.first()?.1.dtype().has_cmp() { + return None; + } + sorted.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); + let len = sorted.len(); + Some(sorted.swap_remove((len - 1) / 2)) + } + pub fn ordered_dedupe(&self, latest_time: bool) -> Vec<(EventTime, Prop)> { let mut last_seen_value: Option = None; let mut result: Vec<(EventTime, Prop)> = vec![]; @@ -310,3 +383,16 @@ impl PropArrayUnwrap for TemporalPropertyView< self.latest().into_array() } } + +fn generalised_reduce( + data: impl IntoIterator, + op: impl Fn(V, V) -> Option, + check: impl Fn(&V) -> bool, +) -> Option { + let mut iter = data.into_iter(); + let first = iter.next()?; + if !check(&first) { + return None; + } + iter.try_fold(first, op) +} diff --git a/raphtory/src/python/graph/properties/temporal_props.rs b/raphtory/src/python/graph/properties/temporal_props.rs index d304e99d55..1d2e4251bb 100644 --- a/raphtory/src/python/graph/properties/temporal_props.rs +++ b/raphtory/src/python/graph/properties/temporal_props.rs @@ -303,7 +303,7 @@ impl PyTemporalProp { /// Returns: /// PropValue: The sum of all property values. pub fn sum(&self) -> Option { - compute_generalised_sum(self.prop.values(), |a, b| a.add(b), |d| d.dtype().has_add()) + self.prop.sum() } /// Find the minimum property value and its associated time. @@ -311,17 +311,7 @@ impl PyTemporalProp { /// Returns: /// Tuple[EventTime, PropValue]: A tuple containing the time and the minimum property value. pub fn min(&self) -> Option<(EventTime, Prop)> { - compute_generalised_sum( - self.prop.iter(), - |a, b| { - if a.1.partial_cmp(&b.1)?.is_le() { - Some(a) - } else { - Some(b) - } - }, - |d| d.1.dtype().has_cmp(), - ) + self.prop.min() } /// Find the maximum property value and its associated time. @@ -329,17 +319,7 @@ impl PyTemporalProp { /// Returns: /// Tuple[EventTime, PropValue]: A tuple containing the time and the maximum property value. pub fn max(&self) -> Option<(EventTime, Prop)> { - compute_generalised_sum( - self.prop.iter(), - |a, b| { - if a.1.partial_cmp(&b.1)?.is_ge() { - Some(a) - } else { - Some(b) - } - }, - |d| d.1.dtype().has_cmp(), - ) + self.prop.max() } /// Count the number of properties. @@ -347,7 +327,7 @@ impl PyTemporalProp { /// Returns: /// int: The number of properties. pub fn count(&self) -> usize { - self.prop.iter().count() + self.prop.count() } /// Compute the average of all property values. Alias for mean(). @@ -355,7 +335,7 @@ impl PyTemporalProp { /// Returns: /// PropValue: The average of each property values, or None if count is zero. pub fn average(&self) -> Option { - self.mean() + self.prop.average() } /// Compute the mean of all property values. Alias for mean(). @@ -363,7 +343,7 @@ impl PyTemporalProp { /// Returns: /// PropValue: The mean of each property values, or None if count is zero. pub fn mean(&self) -> Option { - compute_mean(self.prop.values()) + self.prop.mean() } /// Compute the median of all property values. @@ -371,17 +351,7 @@ impl PyTemporalProp { /// Returns: /// Tuple[EventTime, PropValue]: A tuple containing the time and the median property value, or None if empty pub fn median(&self) -> Option<(EventTime, Prop)> { - let mut sorted: Vec<(EventTime, Prop)> = self.prop.iter().collect(); - if !sorted.first()?.1.dtype().has_cmp() { - return None; - } - sorted.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); - let len = sorted.len(); - if len == 0 { - None - } else { - Some(sorted[(len - 1) / 2].clone()) - } + self.prop.median() } pub fn __repr__(&self) -> String {