Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 107 additions & 13 deletions graph/src/graph/attribute_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ use once_cell::sync::OnceCell;
use roaring::RoaringTreemap;

use super::attribute_cache::AttributeCache;
use super::graphblas::serialization::{Decode, Encode, Reader, Writer};
use crate::runtime::{ordermap::OrderMap, orderset::OrderSet, value::Value};

/// Create a composite key from entity ID and attribute index.
Expand Down Expand Up @@ -513,19 +514,6 @@ impl AttributeStore {
Ok(nremoved)
}

#[must_use]
pub fn get_attr_id(
&self,
attr: &Arc<String>,
) -> Option<usize> {
self.attrs_name.get_index_of(attr)
}

#[must_use]
pub fn memory_usage(&self) -> usize {
self.cache.memory_usage()
}

/// Bulk import attributes for entities known to be new (no prior state).
///
/// Optimized for RDB decode: skips cache/fjall lookups since entities
Expand Down Expand Up @@ -554,6 +542,19 @@ impl AttributeStore {
}
}

#[must_use]
pub fn get_attr_id(
&self,
attr: &Arc<String>,
) -> Option<usize> {
self.attrs_name.get_index_of(attr)
}

#[must_use]
pub fn memory_usage(&self) -> usize {
self.cache.memory_usage()
}

pub fn commit(&mut self) -> Result<(), String> {
// Apply pending full entity deletions to fjall.
if !self.pending_deletes.is_empty() {
Expand Down Expand Up @@ -666,6 +667,64 @@ impl AttributeStore {
pub const fn cache(&self) -> &Arc<AttributeCache> {
&self.cache
}

/// Encode a range of entities, borrowing the deleted bitmap directly.
pub fn encode_with_range(
&self,
w: &mut dyn Writer,
deleted: &RoaringTreemap,
max_id: u64,
global_attrs: &[Arc<String>],
count: u64,
offset: u64,
) {
// Build attr remap inline.
let global_index: std::collections::HashMap<&Arc<String>, usize> = global_attrs
.iter()
.enumerate()
.map(|(i, n)| (n, i))
.collect();

let mut remap = vec![u16::MAX; self.attrs_name.len()];
for (local_id, local_name) in self.attrs_name.iter().enumerate() {
if let Some(&global_id) = global_index.get(local_name) {
remap[local_id] = global_id as u16;
}
}

let mut skipped = 0u64;
let mut encoded = 0u64;

for id in 0..=max_id {
if deleted.contains(id) {
continue;
}
if skipped < offset {
skipped += 1;
continue;
}

w.write_unsigned(id);

let props = self.get_all_attrs_by_id(id);
w.write_unsigned(props.len() as u64);

for &(local_attr_id, ref value) in props.iter() {
let global_attr_id = if (local_attr_id as usize) < remap.len() {
remap[local_attr_id as usize]
} else {
local_attr_id
};
w.write_unsigned(global_attr_id as u64);
value.encode(w);
}

encoded += 1;
if encoded >= count {
break;
}
}
}
}

// SAFETY: AttributeStore is Send+Sync because:
Expand All @@ -675,3 +734,38 @@ impl AttributeStore {
// - All other fields (`RoaringTreemap`, `OrderSet`, etc.) are owned and not shared
unsafe impl Send for AttributeStore {}
unsafe impl Sync for AttributeStore {}

impl Decode<19> for AttributeStore {
fn decode(_r: &mut dyn Reader) -> Result<Self, String> {
unimplemented!("use decode_with_count for AttributeStore")
}

fn decode_with_count(
&mut self,
r: &mut dyn Reader,
count: u64,
) -> Result<(), String> {
for _ in 0..count {
let entity_id = r.read_unsigned()?;
let attr_count = r.read_unsigned()?;

let mut entries: Vec<(u16, Value)> = Vec::with_capacity(attr_count as usize);
for _ in 0..attr_count {
let attr_id = r.read_unsigned()? as u16;
let value = Value::decode(r)?;

if (attr_id as usize) < self.attrs_name.len() && !matches!(value, Value::Null) {
entries.push((attr_id, value));
}
}

if !entries.is_empty() {
entries.sort_by_key(|(idx, _)| *idx);
self.cache
.insert_entity(entity_id, entries, self.version, true);
self.dirty_entities.insert(entity_id);
}
}
Ok(())
}
}
Loading
Loading