-
Notifications
You must be signed in to change notification settings - Fork 3
Implement RDB serialization and deserialization for graph data #359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
65c0416
20171d8
3ce7c6d
788c6f2
f1be7a0
b560f8c
4a81b3a
179e0be
6c29a53
89c6855
6fb14c4
f0a39b0
844c53d
3f4c27c
727eadb
47b6943
e87dc11
ec86753
359c49c
791aca7
99fd3e1
77315ca
d74a121
a2b54e7
890a23a
d1a2377
774635c
fc3707e
76ef617
3b7736d
057c9c4
f33e2bb
a3e65ee
a7a4bb6
8db5fed
103d9aa
c320f66
6c38250
6722aff
d4528ed
a0b7ea3
0cd9ddc
7a1be98
39f2155
f8d7c07
056f466
7c14cc8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -83,6 +83,7 @@ | |||||||||||||||||||||||||||||||
| //! Each attribute is stored as a separate fjall entry: | ||||||||||||||||||||||||||||||||
| //! `entity_id (8 bytes big-endian) + attr_idx (2 bytes big-endian)` | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| use std::cell::RefCell; | ||||||||||||||||||||||||||||||||
| use std::{collections::HashMap, sync::Arc}; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| use fjall::{ | ||||||||||||||||||||||||||||||||
|
|
@@ -92,6 +93,7 @@ use once_cell::sync::OnceCell; | |||||||||||||||||||||||||||||||
| use roaring::RoaringTreemap; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| use super::attribute_cache::AttributeCache; | ||||||||||||||||||||||||||||||||
| use super::graphblas::serialization::{Decode, Encode, Reader, Writer}; | ||||||||||||||||||||||||||||||||
| use crate::runtime::{ordermap::OrderMap, orderset::OrderSet, value::Value}; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /// Create a composite key from entity ID and attribute index. | ||||||||||||||||||||||||||||||||
|
|
@@ -134,6 +136,10 @@ pub struct AttributeStore { | |||||||||||||||||||||||||||||||
| dirty_entities: RoaringTreemap, | ||||||||||||||||||||||||||||||||
| /// Entity IDs pending full deletion (all attributes) — applied on commit, cleared on rollback. | ||||||||||||||||||||||||||||||||
| pending_deletes: RoaringTreemap, | ||||||||||||||||||||||||||||||||
| /// Encoding context: deleted entity IDs (set before serialization). | ||||||||||||||||||||||||||||||||
| encode_deleted: RefCell<Option<RoaringTreemap>>, | ||||||||||||||||||||||||||||||||
| /// Encoding context: maximum entity ID (set before serialization). | ||||||||||||||||||||||||||||||||
| encode_max_id: RefCell<u64>, | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl Clone for AttributeStore { | ||||||||||||||||||||||||||||||||
|
|
@@ -148,6 +154,8 @@ impl Clone for AttributeStore { | |||||||||||||||||||||||||||||||
| version: self.version, | ||||||||||||||||||||||||||||||||
| dirty_entities: self.dirty_entities.clone(), | ||||||||||||||||||||||||||||||||
| pending_deletes: self.pending_deletes.clone(), | ||||||||||||||||||||||||||||||||
| encode_deleted: RefCell::new(None), | ||||||||||||||||||||||||||||||||
| encode_max_id: RefCell::new(0), | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
@@ -172,6 +180,8 @@ impl AttributeStore { | |||||||||||||||||||||||||||||||
| version, | ||||||||||||||||||||||||||||||||
| dirty_entities: RoaringTreemap::new(), | ||||||||||||||||||||||||||||||||
| pending_deletes: RoaringTreemap::new(), | ||||||||||||||||||||||||||||||||
| encode_deleted: RefCell::new(None), | ||||||||||||||||||||||||||||||||
| encode_max_id: RefCell::new(0), | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
@@ -235,6 +245,8 @@ impl AttributeStore { | |||||||||||||||||||||||||||||||
| version, | ||||||||||||||||||||||||||||||||
| dirty_entities: RoaringTreemap::new(), | ||||||||||||||||||||||||||||||||
| pending_deletes: RoaringTreemap::new(), | ||||||||||||||||||||||||||||||||
| encode_deleted: RefCell::new(None), | ||||||||||||||||||||||||||||||||
| encode_max_id: RefCell::new(0), | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
|
|
@@ -369,10 +381,10 @@ impl AttributeStore { | |||||||||||||||||||||||||||||||
| pub fn get_all_attrs_by_id( | ||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||
| key: u64, | ||||||||||||||||||||||||||||||||
| ) -> impl Iterator<Item = (u16, Value)> + '_ { | ||||||||||||||||||||||||||||||||
| let cached = self.cache.get_entity(key, self.version); | ||||||||||||||||||||||||||||||||
| let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key)); | ||||||||||||||||||||||||||||||||
| attrs.into_iter() | ||||||||||||||||||||||||||||||||
| ) -> Vec<(u16, Value)> { | ||||||||||||||||||||||||||||||||
| self.cache | ||||||||||||||||||||||||||||||||
| .get_entity(key, self.version) | ||||||||||||||||||||||||||||||||
| .unwrap_or_else(|| self.populate_cache_from_fjall(key)) | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // ---- write path (cache only) ---------------------------------------- | ||||||||||||||||||||||||||||||||
|
|
@@ -619,6 +631,20 @@ impl AttributeStore { | |||||||||||||||||||||||||||||||
| pub const fn cache(&self) -> &Arc<AttributeCache> { | ||||||||||||||||||||||||||||||||
| &self.cache | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| /// Set encoding context needed by `Encode::encode_with_range`. | ||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||
| /// `node_attrs` and `rel_attrs` are the attribute name sets from the node and | ||||||||||||||||||||||||||||||||
| /// relationship stores respectively, used to build the canonical global | ||||||||||||||||||||||||||||||||
| /// attribute ordering (nodes first, then relationships, deduplicated). | ||||||||||||||||||||||||||||||||
| pub fn set_encode_context( | ||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||
| deleted: &RoaringTreemap, | ||||||||||||||||||||||||||||||||
| max_id: u64, | ||||||||||||||||||||||||||||||||
| ) { | ||||||||||||||||||||||||||||||||
| *self.encode_deleted.borrow_mut() = Some(deleted.clone()); | ||||||||||||||||||||||||||||||||
| *self.encode_max_id.borrow_mut() = max_id; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| // SAFETY: AttributeStore is Send+Sync because: | ||||||||||||||||||||||||||||||||
|
|
@@ -628,3 +654,86 @@ impl AttributeStore { | |||||||||||||||||||||||||||||||
| // - All other fields (`RoaringTreemap`, `OrderSet`, etc.) are owned and not shared | ||||||||||||||||||||||||||||||||
| unsafe impl Send for AttributeStore {} | ||||||||||||||||||||||||||||||||
| unsafe impl Sync for AttributeStore {} | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl Encode<19> for AttributeStore { | ||||||||||||||||||||||||||||||||
| fn encode( | ||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||
| _w: &mut dyn Writer, | ||||||||||||||||||||||||||||||||
| ) { | ||||||||||||||||||||||||||||||||
| unimplemented!("use encode_with_range for AttributeStore") | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| fn encode_with_range( | ||||||||||||||||||||||||||||||||
| &self, | ||||||||||||||||||||||||||||||||
| w: &mut dyn Writer, | ||||||||||||||||||||||||||||||||
| count: u64, | ||||||||||||||||||||||||||||||||
| offset: u64, | ||||||||||||||||||||||||||||||||
| ) { | ||||||||||||||||||||||||||||||||
| let binding = self.encode_deleted.borrow(); | ||||||||||||||||||||||||||||||||
| let deleted = binding.as_ref().expect("encode context not set"); | ||||||||||||||||||||||||||||||||
| let max_id = *self.encode_max_id.borrow(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let mut skipped = 0u64; | ||||||||||||||||||||||||||||||||
| let mut encoded = 0u64; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for id in 0..=max_id { | ||||||||||||||||||||||||||||||||
| if deleted.contains(id) { | ||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| if skipped < offset { | ||||||||||||||||||||||||||||||||
| skipped += 1; | ||||||||||||||||||||||||||||||||
| continue; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| w.write_unsigned(id); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let props: Vec<(u16, Value)> = self.get_all_attrs_by_id(id); | ||||||||||||||||||||||||||||||||
| w.write_unsigned(props.len() as u64); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| for (attr_id, value) in props { | ||||||||||||||||||||||||||||||||
| w.write_unsigned(attr_id as u64); | ||||||||||||||||||||||||||||||||
| value.encode(w); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| encoded += 1; | ||||||||||||||||||||||||||||||||
| if encoded >= count { | ||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| impl Decode<19> for AttributeStore { | ||||||||||||||||||||||||||||||||
| fn decode(_r: &mut dyn Reader) -> Result<Self, String> { | ||||||||||||||||||||||||||||||||
| unimplemented!("use decode_with_count for AttributeStore") | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| fn decode_with_count( | ||||||||||||||||||||||||||||||||
| &mut self, | ||||||||||||||||||||||||||||||||
| r: &mut dyn Reader, | ||||||||||||||||||||||||||||||||
| count: u64, | ||||||||||||||||||||||||||||||||
| ) -> Result<(), String> { | ||||||||||||||||||||||||||||||||
| for _ in 0..count { | ||||||||||||||||||||||||||||||||
| let entity_id = r.read_unsigned()?; | ||||||||||||||||||||||||||||||||
| let attr_count = r.read_unsigned()?; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| let mut entity_attrs = OrderMap::default(); | ||||||||||||||||||||||||||||||||
| for _ in 0..attr_count { | ||||||||||||||||||||||||||||||||
| let attr_id = r.read_unsigned()? as u16; | ||||||||||||||||||||||||||||||||
| let value = Value::decode(r)?; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if (attr_id as usize) < self.attrs_name.len() { | ||||||||||||||||||||||||||||||||
| let attr_name = self.attrs_name[attr_id as usize].clone(); | ||||||||||||||||||||||||||||||||
| entity_attrs.insert(attr_name, value); | ||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reject oversized attribute ids instead of truncating them.
Suggested fix- let attr_id = r.read_unsigned()? as u16;
+ let attr_id = u16::try_from(r.read_unsigned()?)
+ .map_err(|_| "attribute id exceeds u16".to_string())?;📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| if !entity_attrs.is_empty() { | ||||||||||||||||||||||||||||||||
| let mut batch = HashMap::new(); | ||||||||||||||||||||||||||||||||
| batch.insert(entity_id, entity_attrs); | ||||||||||||||||||||||||||||||||
| self.insert_attrs(&batch)?; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| Ok(()) | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
Uh oh!
There was an error while loading. Please reload this page.