diff --git a/graph/src/graph/attribute_store.rs b/graph/src/graph/attribute_store.rs index a9d1a534..4d2fe396 100644 --- a/graph/src/graph/attribute_store.rs +++ b/graph/src/graph/attribute_store.rs @@ -92,6 +92,7 @@ use once_cell::sync::OnceCell; use roaring::RoaringTreemap; use super::attribute_cache::AttributeCache; +use super::graphblas::serialization::{Decode, Encode, Reader, Writer}; use crate::runtime::{ordermap::OrderMap, orderset::OrderSet, value::Value}; /// Create a composite key from entity ID and attribute index. @@ -513,19 +514,6 @@ impl AttributeStore { Ok(nremoved) } - #[must_use] - pub fn get_attr_id( - &self, - attr: &Arc, - ) -> Option { - self.attrs_name.get_index_of(attr) - } - - #[must_use] - pub fn memory_usage(&self) -> usize { - self.cache.memory_usage() - } - /// Bulk import attributes for entities known to be new (no prior state). /// /// Optimized for RDB decode: skips cache/fjall lookups since entities @@ -554,6 +542,19 @@ impl AttributeStore { } } + #[must_use] + pub fn get_attr_id( + &self, + attr: &Arc, + ) -> Option { + self.attrs_name.get_index_of(attr) + } + + #[must_use] + pub fn memory_usage(&self) -> usize { + self.cache.memory_usage() + } + pub fn commit(&mut self) -> Result<(), String> { // Apply pending full entity deletions to fjall. if !self.pending_deletes.is_empty() { @@ -666,6 +667,64 @@ impl AttributeStore { pub const fn cache(&self) -> &Arc { &self.cache } + + /// Encode a range of entities, borrowing the deleted bitmap directly. + pub fn encode_with_range( + &self, + w: &mut dyn Writer, + deleted: &RoaringTreemap, + max_id: u64, + global_attrs: &[Arc], + count: u64, + offset: u64, + ) { + // Build attr remap inline. + let global_index: std::collections::HashMap<&Arc, usize> = global_attrs + .iter() + .enumerate() + .map(|(i, n)| (n, i)) + .collect(); + + let mut remap = vec![u16::MAX; self.attrs_name.len()]; + for (local_id, local_name) in self.attrs_name.iter().enumerate() { + if let Some(&global_id) = global_index.get(local_name) { + remap[local_id] = global_id as u16; + } + } + + let mut skipped = 0u64; + let mut encoded = 0u64; + + for id in 0..=max_id { + if deleted.contains(id) { + continue; + } + if skipped < offset { + skipped += 1; + continue; + } + + w.write_unsigned(id); + + let props = self.get_all_attrs_by_id(id); + w.write_unsigned(props.len() as u64); + + for &(local_attr_id, ref value) in props.iter() { + let global_attr_id = if (local_attr_id as usize) < remap.len() { + remap[local_attr_id as usize] + } else { + local_attr_id + }; + w.write_unsigned(global_attr_id as u64); + value.encode(w); + } + + encoded += 1; + if encoded >= count { + break; + } + } + } } // SAFETY: AttributeStore is Send+Sync because: @@ -675,3 +734,38 @@ impl AttributeStore { // - All other fields (`RoaringTreemap`, `OrderSet`, etc.) are owned and not shared unsafe impl Send for AttributeStore {} unsafe impl Sync for AttributeStore {} + +impl Decode<19> for AttributeStore { + fn decode(_r: &mut dyn Reader) -> Result { + unimplemented!("use decode_with_count for AttributeStore") + } + + fn decode_with_count( + &mut self, + r: &mut dyn Reader, + count: u64, + ) -> Result<(), String> { + for _ in 0..count { + let entity_id = r.read_unsigned()?; + let attr_count = r.read_unsigned()?; + + let mut entries: Vec<(u16, Value)> = Vec::with_capacity(attr_count as usize); + for _ in 0..attr_count { + let attr_id = r.read_unsigned()? as u16; + let value = Value::decode(r)?; + + if (attr_id as usize) < self.attrs_name.len() && !matches!(value, Value::Null) { + entries.push((attr_id, value)); + } + } + + if !entries.is_empty() { + entries.sort_by_key(|(idx, _)| *idx); + self.cache + .insert_entity(entity_id, entries, self.version, true); + self.dirty_entities.insert(entity_id); + } + } + Ok(()) + } +} diff --git a/graph/src/graph/graph.rs b/graph/src/graph/graph.rs index d6b73975..3e52d0b7 100644 --- a/graph/src/graph/graph.rs +++ b/graph/src/graph/graph.rs @@ -73,10 +73,8 @@ use std::{ }; use atomic_refcell::AtomicRefCell; - use itertools::Itertools; use lru::LruCache; - use orx_tree::DynTree; use parking_lot::Mutex; use roaring::RoaringTreemap; @@ -90,6 +88,7 @@ use crate::{ Dup, MaskedElementWiseAdd, MaskedElementWiseMultiply, Matrix, MxM, New, Remove, Set, Size, Transpose, }, + serialization::{Encode, EncodeState, PayloadEntry, Writer}, tensor::Tensor, versioned_matrix::VersionedMatrix, }, @@ -435,6 +434,99 @@ impl Graph { } } + /// Restore a graph from decoded RDB data. + /// + /// Used by the RDB load path to construct a fully-populated graph + /// without going through the mutation API. + #[must_use] + #[allow(clippy::too_many_arguments)] + pub fn restore( + name: &str, + cache_size: usize, + node_count: u64, + relationship_count: u64, + deleted_nodes: RoaringTreemap, + deleted_relationships: RoaringTreemap, + adjacancy_matrix: VersionedMatrix, + node_labels_matrix: VersionedMatrix, + relationship_type_matrix: VersionedMatrix, + all_nodes_matrix: VersionedMatrix, + labels_matices: Vec, + relationship_matrices: Vec, + node_labels: Vec>, + relationship_types: Vec>, + node_attrs: AttributeStore, + relationship_attrs: AttributeStore, + ) -> Self { + let node_cap = node_count + deleted_nodes.len(); + let relationship_cap = relationship_count + deleted_relationships.len(); + let schema_version = (node_labels.len() + relationship_types.len()) as u64; + Self { + name: name.to_string(), + node_cap: node_cap.next_power_of_two().max(64), + relationship_cap: relationship_cap.next_power_of_two().max(64), + reserved_node_count: 0, + reserved_relationship_count: 0, + node_count, + relationship_count, + deleted_nodes, + deleted_relationships, + zero_matrix: VersionedMatrix::new(0, 0), + adjacancy_matrix, + node_labels_matrix, + relationship_type_matrix, + all_nodes_matrix, + labels_matices, + relationship_matrices, + node_attrs, + relationship_attrs, + node_indexer: Indexer::default(), + node_labels, + relationship_types, + cache: Arc::new(Mutex::new(LruCache::new( + NonZeroUsize::new(cache_size.max(1)).expect("cache_size.max(1) is always >= 1"), + ))), + version: 0, + schema_version, + } + } + + /// Rebuild derived matrices after RDB load. + /// + /// - `all_nodes_matrix`: diagonal `(id, id) = true` for all live nodes + /// - `relationship_type_matrix`: `(edge_id, type_index) = true` for all edges + /// - Tensor backward (`mt`): transpose of forward (`m`) + pub fn rebuild_derived_matrices(&mut self) { + // Resize all node-dimension matrices to match the restored graph capacity. + // Decoded matrices may have dimensions from the original graph's node_cap, + // which can differ from the restored node_cap. + self.resize_node_matrices(); + let rc = self.relationship_cap; + self.relationship_type_matrix + .resize(rc, self.relationship_types.len() as u64); + + // Rebuild all_nodes_matrix from per-label matrices + // Each label matrix is diagonal (node_id, node_id), so we just need + // to collect all live node IDs across all labels + for lm in &self.labels_matices { + for (node_id, _) in lm.iter(0, u64::MAX) { + self.all_nodes_matrix.set(node_id, node_id, true); + } + } + + // Rebuild relationship_type_matrix and tensor backward matrices + for (type_idx, tensor) in self.relationship_matrices.iter_mut().enumerate() { + // Rebuild backward (transpose) matrix from forward matrix in one operation + tensor.rebuild_backward(); + + // Iterate edges matrix to rebuild relationship_type_matrix + for (_, _, edge_id) in tensor.iter(0, u64::MAX, false) { + self.relationship_type_matrix + .set(edge_id, type_idx as u64, true); + } + } + } + #[must_use] pub fn new_version(&self) -> Self { debug_assert_eq!(self.reserved_node_count, 0); @@ -478,7 +570,6 @@ impl Graph { &self.name } - #[must_use] pub const fn node_count(&self) -> u64 { self.node_count } @@ -804,6 +895,8 @@ impl Graph { NodeId(self.node_count + self.reserved_node_count - 1) } + /// Increment the reserved node counter without allocating a specific ID. + /// Used by effect replay where the actual ID comes from the primary. pub const fn inc_reserved_node_count(&mut self) { self.reserved_node_count += 1; } @@ -843,6 +936,18 @@ impl Graph { self.reserved_node_count -= nodes.len(); self.deleted_nodes -= nodes; + // Ensure capacity covers the highest node ID (effects replay may + // insert IDs above the current count when applied one-by-one). + if let Some(max_id) = nodes.max() { + let needed = max_id + 1; + if needed > self.node_cap { + while needed > self.node_cap { + self.node_cap *= 2; + } + self.resize_node_matrices(); + } + } + self.resize(); for id in nodes { @@ -858,22 +963,6 @@ impl Graph { self.node_count + self.deleted_nodes.len() - 1 } - /// Bulk import node attributes (for RDB decode, skips indexing/cache lookup). - pub fn import_node_attrs( - &mut self, - attrs: &HashMap, Value>>, - ) { - self.node_attrs.import_attrs(attrs); - } - - /// Bulk import relationship attributes (for RDB decode). - pub fn import_relationship_attrs( - &mut self, - attrs: &HashMap, Value>>, - ) { - self.relationship_attrs.import_attrs(attrs); - } - #[must_use] pub fn max_relationship_id(&self) -> u64 { if self.relationship_count == 0 { @@ -904,6 +993,34 @@ impl Graph { Ok(nremoved) } + pub fn import_node_attrs( + &mut self, + attrs: &HashMap, Value>>, + index_add_docs: &mut HashMap, + ) { + self.node_attrs.import_attrs(attrs); + + if self.node_indexer.has_indices() { + for (id, attrs) in attrs { + for (_, label_id) in self.node_labels_matrix.iter(*id, *id) { + let label = &self.node_labels[label_id as usize]; + for key in attrs.keys() { + if self.node_indexer.has_indexed_attr(label, key) { + index_add_docs.entry(label_id).or_default().insert(*id); + } + } + } + } + } + } + + pub fn import_relationship_attrs( + &mut self, + attrs: &HashMap, Value>>, + ) { + self.relationship_attrs.import_attrs(attrs); + } + pub fn set_nodes_labels( &mut self, nodes_labels: &mut Matrix, @@ -1145,10 +1262,6 @@ impl Graph { self.node_attrs.get_attr_by_idx(id.0, attr_idx) } - pub const fn inc_reserved_relationship_count(&mut self) { - self.reserved_relationship_count += 1; - } - pub fn reserve_relationship(&mut self) -> RelationshipId { if self.reserved_relationship_count < self.deleted_relationships.len() { let id = self @@ -1162,6 +1275,12 @@ impl Graph { RelationshipId(self.relationship_count + self.reserved_relationship_count - 1) } + /// Increment the reserved relationship counter without allocating a specific ID. + /// Used by effect replay where the actual ID comes from the primary. + pub const fn inc_reserved_relationship_count(&mut self) { + self.reserved_relationship_count += 1; + } + pub fn reserve_relationships( &mut self, count: usize, @@ -1203,6 +1322,18 @@ impl Graph { self.deleted_relationships.remove(id.0); } + // Ensure capacity covers the highest relationship ID (effects replay + // may insert IDs above the current count when applied one-by-one). + if let Some(max_id) = relationships.keys().map(|id| id.0).max() { + let needed = max_id + 1; + if needed > self.relationship_cap { + while needed > self.relationship_cap { + self.relationship_cap *= 2; + } + self.resize_relationship_matrices(); + } + } + for ( id, PendingRelationship { @@ -1256,11 +1387,36 @@ impl Graph { self.deleted_nodes.contains(id.0) } + #[must_use] + pub fn deleted_nodes_count(&self) -> u64 { + self.deleted_nodes.len() + } + #[must_use] pub const fn deleted_nodes(&self) -> &RoaringTreemap { &self.deleted_nodes } + #[must_use] + pub fn deleted_relationships_count(&self) -> u64 { + self.deleted_relationships.len() + } + + #[must_use] + pub const fn deleted_relationships(&self) -> &RoaringTreemap { + &self.deleted_relationships + } + + #[must_use] + pub fn label_matrices(&self) -> &[VersionedMatrix] { + &self.labels_matices + } + + #[must_use] + pub fn relationship_tensors(&self) -> &[Tensor] { + &self.relationship_matrices + } + #[must_use] pub fn is_relationship_deleted( &self, @@ -1518,8 +1674,13 @@ impl Graph { pub fn get_node_all_attrs_by_id( &self, id: NodeId, - ) -> Arc> { - self.node_attrs.get_all_attrs_by_id(id.0) + ) -> impl Iterator + '_ { + self.node_attrs + .get_all_attrs_by_id(id.0) + .iter() + .cloned() + .collect::>() + .into_iter() } pub fn get_relationship_attrs( @@ -1540,8 +1701,13 @@ impl Graph { pub fn get_relationship_all_attrs_by_id( &self, id: RelationshipId, - ) -> Arc> { - self.relationship_attrs.get_all_attrs_by_id(id.0) + ) -> impl Iterator + '_ { + self.relationship_attrs + .get_all_attrs_by_id(id.0) + .iter() + .cloned() + .collect::>() + .into_iter() } pub fn create_index( @@ -1564,6 +1730,70 @@ impl Graph { Ok(()) } + /// Create an index and populate it synchronously (for RDB load). + /// Unlike `create_index`, this doesn't spawn async tasks. + pub fn create_index_sync( + &mut self, + index_type: &IndexType, + entity_type: &EntityType, + label: &Arc, + attrs: &Vec>, + options: Option, + ) -> Result<(), String> { + match entity_type { + EntityType::Node => { + let len = self.get_label_matrix_mut(label).nvals(); + self.node_indexer + .create_index(index_type, label, attrs, len, options)?; + // Don't spawn async — caller will populate via populate_index_sync + } + EntityType::Relationship => {} + } + Ok(()) + } + + /// Synchronously populate all pending indexes. + /// Used after RDB load when the graph is fully constructed. + pub fn populate_indexes_sync(&mut self) { + let fields_by_label = self.node_indexer.get_all_pending_fields(); + for (label, attrs) in fields_by_label { + if let Some(lm) = self.get_label_matrix(&label) { + // Pre-resolve attribute indices to avoid string lookups per node + let resolved_attrs: Vec<(u16, Vec<_>)> = attrs + .iter() + .filter_map(|(attr, fields)| { + self.get_node_attribute_id(attr) + .map(|idx| (idx as u16, fields.clone())) + }) + .collect(); + + let mut batch = Vec::new(); + for (n, _) in lm.iter(0, u64::MAX) { + let mut doc = Document::new(n); + let mut has_fields = false; + for (attr_idx, fields) in &resolved_attrs { + let value = self.get_node_attribute_by_idx(NodeId(n), *attr_idx); + if let Some(value) = value { + for field in fields { + doc.set(field, &value); + } + has_fields = true; + } + } + if has_fields { + batch.push(doc); + } + } + if !batch.is_empty() { + let mut add_docs = HashMap::new(); + add_docs.insert(label.clone(), batch); + self.node_indexer.commit(&mut add_docs, &mut HashMap::new()); + } + self.node_indexer.enable(&label); + } + } + } + fn start_populate_index( &self, label: &Arc, @@ -1940,89 +2170,59 @@ impl Graph { sz } - #[must_use] - pub fn deleted_nodes_count(&self) -> u64 { - self.deleted_nodes.len() - } - - #[must_use] - pub fn deleted_relationships_count(&self) -> u64 { - self.deleted_relationships.len() - } - - #[must_use] - pub const fn deleted_relationships(&self) -> &RoaringTreemap { - &self.deleted_relationships - } - - #[must_use] - pub fn label_matrices(&self) -> &[VersionedMatrix] { - &self.labels_matices - } - - #[must_use] - pub fn relationship_tensors(&self) -> &[Tensor] { - &self.relationship_matrices - } - - /// Synchronously create an index (without spawning background population). - pub fn create_index_sync( - &mut self, - index_type: &IndexType, - entity_type: &EntityType, - label: &Arc, - attrs: &Vec>, - options: Option, - ) -> Result<(), String> { - match entity_type { - EntityType::Node => { - let len = self.get_label_matrix_mut(label).nvals(); - self.node_indexer - .create_index(index_type, label, attrs, len, options)?; + /// Encode a single payload entry. + pub fn encode_payload( + &self, + w: &mut dyn Writer, + p: &PayloadEntry, + global_attrs: &[Arc], + ) { + match p.state { + EncodeState::Nodes => { + self.node_attrs.encode_with_range( + w, + &self.deleted_nodes, + self.max_node_id(), + global_attrs, + p.count, + p.offset, + ); } - EntityType::Relationship => {} - } - Ok(()) - } - - /// Synchronously populate all pending indexes. - /// Used after RDB load when the graph is fully constructed. - pub fn populate_indexes_sync(&mut self) { - let fields_by_label = self.node_indexer.get_all_pending_fields(); - for (label, attrs) in fields_by_label { - if let Some(lm) = self.get_label_matrix(&label) { - let resolved_attrs: Vec<(u16, Vec<_>)> = attrs - .iter() - .filter_map(|(attr, fields)| { - self.get_node_attribute_id(attr) - .map(|idx| (idx as u16, fields.clone())) - }) - .collect(); - - let mut batch = Vec::new(); - for (n, _) in lm.iter(0, u64::MAX) { - let mut doc = Document::new(n); - let mut has_fields = false; - for (attr_idx, fields) in &resolved_attrs { - let value = self.get_node_attribute_by_idx(NodeId(n), *attr_idx); - if let Some(value) = value { - for field in fields { - doc.set(field, &value); - } - has_fields = true; - } - } - if has_fields { - batch.push(doc); - } + EncodeState::DeletedNodes => { + self.deleted_nodes.encode_with_range(w, p.count, p.offset); + } + EncodeState::Edges => { + self.relationship_attrs.encode_with_range( + w, + &self.deleted_relationships, + self.max_relationship_id(), + global_attrs, + p.count, + p.offset, + ); + } + EncodeState::DeletedEdges => { + self.deleted_relationships + .encode_with_range(w, p.count, p.offset); + } + EncodeState::LabelsMatrices => { + let label_matrices = self.label_matrices(); + w.write_unsigned(label_matrices.len() as u64); + for (i, lm) in label_matrices.iter().enumerate() { + w.write_unsigned(i as u64); + lm.encode(w); } - if !batch.is_empty() { - let mut add_docs = HashMap::new(); - add_docs.insert(label.clone(), batch); - self.node_indexer.commit(&mut add_docs, &mut HashMap::new()); + } + EncodeState::RelationMatrices => { + let tensors = self.relationship_tensors(); + for (i, tensor) in tensors.iter().enumerate() { + w.write_unsigned(i as u64); + tensor.encode(w); } - self.node_indexer.enable(&label); } + EncodeState::AdjMatrix => self.adjacancy_matrix.encode(w), + EncodeState::LblsMatrix => self.node_labels_matrix.encode(w), + _ => {} } } @@ -2036,7 +2236,8 @@ impl Graph { self.relationship_attrs.attrs_name.iter().cloned().collect() } - /// Register a node attribute name (get-or-create). + /// Register a node attribute name (get-or-create). Used by effect + /// replication to pre-register attribute names on the replica. pub fn add_node_attribute_name( &mut self, name: &str, @@ -2047,7 +2248,8 @@ impl Graph { } } - /// Register a relationship attribute name (get-or-create). + /// Register a relationship attribute name (get-or-create). Used by effect + /// replication to pre-register attribute names on the replica. pub fn add_rel_attribute_name( &mut self, name: &str, diff --git a/src/reply.rs b/src/reply.rs index 45a9e857..f38b5cc7 100644 --- a/src/reply.rs +++ b/src/reply.rs @@ -127,8 +127,8 @@ pub fn reply_compact_value( } raw::reply_with_array(ctx.ctx, i64::from(raw::REDISMODULE_POSTPONED_LEN)); - let attrs = bg.get_node_all_attrs_by_id(*id); - for (key, value) in attrs.iter() { + let attrs: Vec<_> = bg.get_node_all_attrs_by_id(*id).collect(); + for (key, value) in &attrs { raw::reply_with_array(ctx.ctx, 3); raw::reply_with_long_long(ctx.ctx, (*key).into()); reply_compact_value(ctx, runtime, value); @@ -171,8 +171,8 @@ pub fn reply_compact_value( raw::reply_with_long_long(ctx.ctx, u64::from(*rel_dst) as _); let node_attr_offset = bg.node_attribute_count() as i64; raw::reply_with_array(ctx.ctx, i64::from(raw::REDISMODULE_POSTPONED_LEN)); - let attrs = bg.get_relationship_all_attrs_by_id(*rel_id); - for (key, value) in attrs.iter() { + let attrs: Vec<_> = bg.get_relationship_all_attrs_by_id(*rel_id).collect(); + for (key, value) in &attrs { raw::reply_with_array(ctx.ctx, 3); raw::reply_with_long_long(ctx.ctx, i64::from(*key) + node_attr_offset); reply_compact_value(ctx, runtime, value);