diff --git a/python/python/raphtory/algorithms/__init__.pyi b/python/python/raphtory/algorithms/__init__.pyi index 0920041416..6a17e58523 100644 --- a/python/python/raphtory/algorithms/__init__.pyi +++ b/python/python/raphtory/algorithms/__init__.pyi @@ -289,6 +289,7 @@ def pagerank( max_diff: Optional[float] = None, use_l2_norm: bool = True, damping_factor: float = 0.85, + personalization: Optional[dict[NodeInput, float]] = None, ) -> NodeStateF64: """ Pagerank -- pagerank centrality value of the nodes in a graph @@ -305,6 +306,9 @@ def pagerank( is less than the max diff value given. use_l2_norm (bool): Flag for choosing the norm to use for convergence checks, True for l2 norm, False for l1 norm. Defaults to True. damping_factor (float): The damping factor for the PageRank calculation. Defaults to 0.85. + personalization (Optional[dict[NodeInput, float]]): A dictionary mapping nodes to personalization values. + When provided, the random walk teleports to nodes proportionally to these values + instead of uniformly. Values are normalized to sum to 1. Defaults to None (uniform). Returns: NodeStateF64: Mapping of nodes to their pagerank value. diff --git a/python/tests/test_base_install/test_graphdb/test_algorithms.py b/python/tests/test_base_install/test_graphdb/test_algorithms.py index 44d9524eac..0a3ee7aefb 100644 --- a/python/tests/test_base_install/test_graphdb/test_algorithms.py +++ b/python/tests/test_base_install/test_graphdb/test_algorithms.py @@ -312,6 +312,21 @@ def test_page_rank(): assert actual == expected +def test_personalized_page_rank(): + g = Graph() + edges = [(1, 2), (1, 4), (2, 3), (3, 1), (4, 1)] + for src, dst in edges: + g.add_edge(0, src, dst, {}) + + actual = algorithms.pagerank(g, iter_count=1000, personalization={"1": 1.0, "2": 0.0, "3": 0.0, "4": 0.0}) + for node, expected in [("1", 0.45223), ("2", 0.19220), ("3", 0.16337), ("4", 0.19220)]: + assert abs(actual[node] - expected) < 1e-5, f"node {node}: {actual[node]} != {expected}" + + actual = algorithms.pagerank(g, iter_count=1000, max_diff=1e-10, use_l2_norm=False, personalization={"1": 0.5, "3": 0.5}) + for node, expected in [("1", 0.41832), ("2", 0.17778), ("3", 0.22612), ("4", 0.17778)]: + assert abs(actual[node] - expected) < 1e-5, f"node {node}: {actual[node]} != {expected}" + + def test_temporal_reachability(): g = gen_graph() diff --git a/raphtory-benchmark/benches/algobench.rs b/raphtory-benchmark/benches/algobench.rs index 3465253c76..3436d7ca1c 100644 --- a/raphtory-benchmark/benches/algobench.rs +++ b/raphtory-benchmark/benches/algobench.rs @@ -87,7 +87,7 @@ pub fn graphgen_large_pagerank(c: &mut Criterion) { &graph, |b, graph| { b.iter(|| { - let result = unweighted_page_rank(graph, Some(100), None, None, true, None); + let result = unweighted_page_rank(graph, Some(100), None, None, true, None, None); black_box(result); }); }, diff --git a/raphtory-graphql/src/model/plugins/algorithms.rs b/raphtory-graphql/src/model/plugins/algorithms.rs index 4d16c96a2d..2886ad17b2 100644 --- a/raphtory-graphql/src/model/plugins/algorithms.rs +++ b/raphtory-graphql/src/model/plugins/algorithms.rs @@ -103,6 +103,7 @@ fn apply_pagerank<'b>( tol, true, damping_factor, + None, ); let result = binding .into_iter() diff --git a/raphtory/src/algorithms/centrality/pagerank.rs b/raphtory/src/algorithms/centrality/pagerank.rs index cf762530d5..ac3d062e49 100644 --- a/raphtory/src/algorithms/centrality/pagerank.rs +++ b/raphtory/src/algorithms/centrality/pagerank.rs @@ -14,6 +14,56 @@ use crate::{ prelude::GraphViewOps, }; use num_traits::abs; +use raphtory_api::core::entities::VID; +use std::collections::HashMap; +use std::sync::Arc; + +trait Teleport: Clone + Send + Sync + 'static { + fn teleport_value(&self, vid_index: usize, damp: f64) -> f64; + fn sink_contribution(&self, prev_score: f64) -> f64; + fn distribute_sink(&self, total_sink: f64, vid_index: usize, damp: f64) -> f64; +} + +#[derive(Clone)] +struct Uniform { + teleport_prob: f64, + factor: f64, +} + +impl Teleport for Uniform { + #[inline] + fn teleport_value(&self, _vid_index: usize, _damp: f64) -> f64 { + self.teleport_prob + } + #[inline] + fn sink_contribution(&self, prev_score: f64) -> f64 { + self.factor * prev_score + } + #[inline] + fn distribute_sink(&self, total_sink: f64, _vid_index: usize, _damp: f64) -> f64 { + total_sink + } +} + +#[derive(Clone)] +struct Personalized { + weights: Arc>, +} + +impl Teleport for Personalized { + #[inline] + fn teleport_value(&self, vid_index: usize, damp: f64) -> f64 { + (1.0 - damp) * self.weights[vid_index] + } + #[inline] + fn sink_contribution(&self, prev_score: f64) -> f64 { + prev_score + } + #[inline] + fn distribute_sink(&self, total_sink: f64, vid_index: usize, damp: f64) -> f64 { + damp * total_sink * self.weights[vid_index] + } +} #[derive(Clone, Debug, Default)] struct PageRankState { @@ -45,6 +95,9 @@ impl PageRankState { /// - `tol`: The tolerance value for convergence /// - `use_l2_norm`: Whether to use L2 norm for convergence /// - `damping_factor`: Probability of likelihood the spread will continue +/// - `personalization`: Optional map from node VID to personalization weight. +/// When provided, the random walk teleports proportionally to these weights +/// instead of uniformly. Values are normalized to sum to 1. /// /// # Returns /// @@ -57,16 +110,58 @@ pub fn unweighted_page_rank( tol: Option, use_l2_norm: bool, damping_factor: Option, + personalization: Option>, +) -> NodeState<'static, f64, G> { + let n = g.count_nodes(); + let damp = damping_factor.unwrap_or(0.85); + + match personalization { + Some(p) => { + let total: f64 = p.values().sum(); + let mut weights = vec![0.0f64; n]; + for (&vid, &value) in &p { + weights[vid.index()] = value / total; + } + run_pagerank( + g, + iter_count, + threads, + tol, + use_l2_norm, + damp, + Personalized { weights: Arc::new(weights) }, + ) + } + None => run_pagerank( + g, + iter_count, + threads, + tol, + use_l2_norm, + damp, + Uniform { + teleport_prob: (1f64 - damp) / n as f64, + factor: damp / n as f64, + }, + ), + } +} + +fn run_pagerank( + g: &G, + iter_count: Option, + threads: Option, + tol: Option, + use_l2_norm: bool, + damp: f64, + teleport: T, ) -> NodeState<'static, f64, G> { let n = g.count_nodes(); let mut ctx: Context = g.into(); let tol: f64 = tol.unwrap_or(0.000001f64); - let damp = damping_factor.unwrap_or(0.85); let iter_count = iter_count.unwrap_or(20); - let teleport_prob = (1f64 - damp) / n as f64; - let factor = damp / n as f64; let max_diff = accumulators::sum::(2); @@ -83,35 +178,42 @@ pub fn unweighted_page_rank( Step::Continue }); - let step2: ATask = ATask::new(move |s| { - // reset score - { - let state: &mut PageRankState = s.get_mut(); - state.reset(); - } + let step2: ATask = ATask::new({ + let teleport = teleport.clone(); + move |s| { + { + let state: &mut PageRankState = s.get_mut(); + state.reset(); + } - for t in s.in_neighbours() { - let prev = t.prev(); + for t in s.in_neighbours() { + let prev = t.prev(); - s.get_mut().score += prev.score / prev.out_degree as f64; - } + s.get_mut().score += prev.score / prev.out_degree as f64; + } - s.get_mut().score *= damp; + s.get_mut().score *= damp; - s.get_mut().score += teleport_prob; - Step::Continue + s.get_mut().score += teleport.teleport_value(s.node.index(), damp); + Step::Continue + } }); - let step3 = ATask::new(move |s| { - let state: &mut PageRankState = s.get_mut(); + let step3 = ATask::new({ + let teleport = teleport.clone(); + move |s| { + let state: &mut PageRankState = s.get_mut(); - if state.out_degree == 0 { - let curr = s.prev().score; + if state.out_degree == 0 { + let curr = s.prev().score; - let ts_contrib = factor * curr; - s.global_update(&total_sink_contribution, ts_contrib); + s.global_update( + &total_sink_contribution, + teleport.sink_contribution(curr), + ); + } + Step::Continue } - Step::Continue }); let step4 = ATask::new(move |s| { @@ -120,8 +222,10 @@ pub fn unweighted_page_rank( .read_global_state(&total_sink_contribution) .unwrap_or_default(); // update local score with total sink contribution + let vid_index = s.node.index(); let state: &mut PageRankState = s.get_mut(); - state.score += total_sink_contribution; + state.score += + teleport.distribute_sink(total_sink_contribution, vid_index, damp); // update global max diff diff --git a/raphtory/src/python/packages/algorithms.rs b/raphtory/src/python/packages/algorithms.rs index 3670dc440b..73a4a45fcb 100644 --- a/raphtory/src/python/packages/algorithms.rs +++ b/raphtory/src/python/packages/algorithms.rs @@ -70,11 +70,12 @@ use crate::{ utils::PyNodeRef, }, }; +use crate::prelude::GraphViewOps; use pyo3::{prelude::*, types::PyList}; use rand::{prelude::StdRng, SeedableRng}; use raphtory_api::core::{entities::LayerIds, storage::timeindex::EventTime, Direction}; use raphtory_storage::core_ops::CoreGraphOps; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; /// Helper function to parse single-vertex or multi-vertex parameters to a Vec of vertices fn process_node_param(param: &Bound) -> PyResult> { @@ -268,18 +269,29 @@ pub fn out_component( /// is less than the max diff value given. /// use_l2_norm (bool): Flag for choosing the norm to use for convergence checks, True for l2 norm, False for l1 norm. Defaults to True. /// damping_factor (float): The damping factor for the PageRank calculation. Defaults to 0.85. +/// personalization (Optional[dict[Node, float]]): A dictionary mapping nodes to personalization values. +/// When provided, the random walk teleports to nodes proportionally to these values +/// instead of uniformly. Values are normalized to sum to 1. Defaults to None (uniform). /// /// Returns: /// NodeStateF64: Mapping of nodes to their pagerank value. #[pyfunction] -#[pyo3(signature = (graph, iter_count=20, max_diff=None, use_l2_norm=true, damping_factor=0.85))] +#[pyo3(signature = (graph, iter_count=20, max_diff=None, use_l2_norm=true, damping_factor=0.85, personalization=None))] pub fn pagerank( graph: &PyGraphView, iter_count: usize, max_diff: Option, use_l2_norm: bool, damping_factor: Option, + personalization: Option>, ) -> NodeState<'static, f64, DynamicGraph> { + let personalization = personalization.map(|p| { + p.into_iter() + .filter_map(|(node_ref, value)| { + graph.graph.node(node_ref).map(|n| (n.node, value)) + }) + .collect() + }); unweighted_page_rank( &graph.graph, Some(iter_count), @@ -287,6 +299,7 @@ pub fn pagerank( max_diff, use_l2_norm, damping_factor, + personalization, ) } diff --git a/raphtory/tests/algo_tests/centrality.rs b/raphtory/tests/algo_tests/centrality.rs index 2d3f5c71e5..d0c8fa10e0 100644 --- a/raphtory/tests/algo_tests/centrality.rs +++ b/raphtory/tests/algo_tests/centrality.rs @@ -144,7 +144,7 @@ fn test_page_rank() { } test_storage!(&graph, |graph| { - let results = unweighted_page_rank(graph, Some(1000), Some(1), None, true, None); + let results = unweighted_page_rank(graph, Some(1000), Some(1), None, true, None, None); assert_eq_f64(results.get_by_node("1"), Some(&0.38694), 5); assert_eq_f64(results.get_by_node("2"), Some(&0.20195), 5); @@ -188,7 +188,7 @@ fn motif_page_rank() { } test_storage!(&graph, |graph| { - let results = unweighted_page_rank(graph, Some(1000), Some(4), None, true, None); + let results = unweighted_page_rank(graph, Some(1000), Some(4), None, true, None, None); assert_eq_f64(results.get_by_node("10"), Some(&0.072082), 5); assert_eq_f64(results.get_by_node("8"), Some(&0.136473), 5); @@ -215,7 +215,7 @@ fn two_nodes_page_rank() { } test_storage!(&graph, |graph| { - let results = unweighted_page_rank(graph, Some(1000), Some(4), None, false, None); + let results = unweighted_page_rank(graph, Some(1000), Some(4), None, false, None, None); assert_eq_f64(results.get_by_node("1"), Some(&0.5), 3); assert_eq_f64(results.get_by_node("2"), Some(&0.5), 3); @@ -233,7 +233,7 @@ fn three_nodes_page_rank_one_dangling() { } test_storage!(&graph, |graph| { - let results = unweighted_page_rank(graph, Some(10), Some(4), None, false, None); + let results = unweighted_page_rank(graph, Some(10), Some(4), None, false, None, None); assert_eq_f64(results.get_by_node("1"), Some(&0.303), 3); assert_eq_f64(results.get_by_node("2"), Some(&0.393), 3); @@ -270,7 +270,7 @@ fn dangling_page_rank() { graph.add_edge(t, src, dst, NO_PROPS, None).unwrap(); } test_storage!(&graph, |graph| { - let results = unweighted_page_rank(graph, Some(1000), Some(4), None, true, None); + let results = unweighted_page_rank(graph, Some(1000), Some(4), None, true, None, None); assert_eq_f64(results.get_by_node("1"), Some(&0.055), 3); assert_eq_f64(results.get_by_node("2"), Some(&0.079), 3); @@ -286,6 +286,70 @@ fn dangling_page_rank() { }); } +#[test] +fn test_personalized_page_rank() { + let graph = Graph::new(); + let edges = vec![(1, 2), (1, 4), (2, 3), (3, 1), (4, 1)]; + for (src, dst) in edges { + graph.add_edge(0, src, dst, NO_PROPS, None).unwrap(); + } + + test_storage!(&graph, |graph| { + let mut personalization = HashMap::new(); + personalization.insert(graph.node("1").unwrap().node, 1.0); + personalization.insert(graph.node("2").unwrap().node, 0.0); + personalization.insert(graph.node("3").unwrap().node, 0.0); + personalization.insert(graph.node("4").unwrap().node, 0.0); + + let results = unweighted_page_rank( + graph, + Some(1000), + Some(1), + None, + true, + None, + Some(personalization), + ); + + // nx.pagerank(G, alpha=0.85, personalization={1:1.0, 2:0.0, 3:0.0, 4:0.0}) + assert_eq_f64(results.get_by_node("1"), Some(&0.45223), 5); + assert_eq_f64(results.get_by_node("2"), Some(&0.19220), 5); + assert_eq_f64(results.get_by_node("3"), Some(&0.16337), 5); + assert_eq_f64(results.get_by_node("4"), Some(&0.19220), 5); + }); +} + +#[test] +fn test_personalized_page_rank_partial() { + let graph = Graph::new(); + let edges = vec![(1, 2), (1, 4), (2, 3), (3, 1), (4, 1)]; + for (src, dst) in edges { + graph.add_edge(0, src, dst, NO_PROPS, None).unwrap(); + } + + test_storage!(&graph, |graph| { + let mut personalization = HashMap::new(); + personalization.insert(graph.node("1").unwrap().node, 0.5); + personalization.insert(graph.node("3").unwrap().node, 0.5); + + let results = unweighted_page_rank( + graph, + Some(1000), + Some(1), + Some(1e-10), + false, + None, + Some(personalization), + ); + + // nx.pagerank(G, alpha=0.85, personalization={1:0.5, 3:0.5}) + assert_eq_f64(results.get_by_node("1"), Some(&0.41832), 5); + assert_eq_f64(results.get_by_node("2"), Some(&0.17778), 5); + assert_eq_f64(results.get_by_node("3"), Some(&0.22612), 5); + assert_eq_f64(results.get_by_node("4"), Some(&0.17778), 5); + }); +} + pub fn assert_eq_f64 + PartialEq + std::fmt::Debug>( a: Option, b: Option,