Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b80ad8e
make sure we cancel all tasks when the running server is dropped
ljeub-pometry Apr 17, 2026
c7f41f5
update optd
ljeub-pometry Apr 17, 2026
8bc017b
add domain for NodeOp
ljeub-pometry Apr 17, 2026
f51ea1d
Merge branch 'db_v4' into optimise_node_name_filter
miratepuffin Apr 17, 2026
14ca248
Merge db_v4 into optimise_node_name_filter
ljeub-pometry Apr 20, 2026
be0de7c
avoid unnecessarily re-filtering the domain when it is correct
ljeub-pometry Apr 20, 2026
00192b2
changes to better support Bn edge sized graphs
fabianmurariu Apr 17, 2026
3d60794
remove accidental pyo3 import
ljeub-pometry Apr 20, 2026
13305b5
small import updates
fabianmurariu Apr 20, 2026
7d42637
should call list_filtered in nodes
ljeub-pometry Apr 20, 2026
85a9431
const_value_in_domain should be the same as const_value by default
ljeub-pometry Apr 20, 2026
69e67a5
Merge remote-tracking branch 'origin/optimise_node_name_filter' into …
fabianmurariu Apr 20, 2026
be62c8f
possible improvements to UI for very large graphs
fabianmurariu Apr 21, 2026
f814e71
Merge db_v4 into pre_prod_optim2
ljeub-pometry Apr 21, 2026
73a6762
still need to check that the edge exists in the layer, even if we hav…
ljeub-pometry Apr 22, 2026
2a54bf3
no optimisation in with_debug as they make debugging more annoying
ljeub-pometry Apr 22, 2026
84cd417
filtering by node is really bad for window so change this back
ljeub-pometry Apr 22, 2026
92881b1
fix materialize double-adding temporal edges
ljeub-pometry Apr 22, 2026
b52362c
for a persistent graph the update history and properties for exploded…
ljeub-pometry Apr 22, 2026
1b76bbc
need to look at explode() for history on persistent graphs
ljeub-pometry Apr 22, 2026
c1d71bd
attempt at faster node_valid
ljeub-pometry Apr 22, 2026
5f6140a
include updates from static graph in node_valid check for layers
ljeub-pometry Apr 23, 2026
24f843c
cleanup
ljeub-pometry Apr 23, 2026
1fdccc4
fix search feature
ljeub-pometry Apr 23, 2026
adf1166
make component test easier to debug on failure
ljeub-pometry Apr 23, 2026
8f54fff
add our own union find implementation based on the old connected comp…
ljeub-pometry Apr 23, 2026
0a226ac
clean up dependencies
ljeub-pometry Apr 23, 2026
1c9dab6
storage dependency is definitely used
ljeub-pometry Apr 23, 2026
08a2d40
avoid compiling the vectors feature in benchmarks unless it is actual…
ljeub-pometry Apr 23, 2026
874dff9
implement has_layer_inner directly
ljeub-pometry Apr 23, 2026
c046c72
optimise last for filtered additions
ljeub-pometry Apr 24, 2026
a21d6a6
add fast path for getting edge ref out again
ljeub-pometry Apr 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ debug = true
[profile.with-debug]
inherits = "dev"
debug = true
opt-level = 0

# for fast one-time builds (e.g., docs/CI)
[profile.build-fast]
Expand Down
2 changes: 1 addition & 1 deletion clam-core
7 changes: 6 additions & 1 deletion db4-graph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ where
self.logical_to_physical.flush()
}

pub fn vacuum(&self) -> Result<(), StorageError> {
self.storage.vacuum()?;
Ok(())
}

pub fn disk_storage_path(&self) -> Option<&Path> {
self.graph_dir()
.filter(|_| Extension::disk_storage_enabled())
Expand Down Expand Up @@ -402,7 +407,7 @@ where
WriteLockedGraph::new(self)
}

pub fn update_time(&self, earliest: EventTime) {
pub fn update_time(&self, _earliest: EventTime) {
// self.storage.update_time(earliest);
}
}
Expand Down
12 changes: 8 additions & 4 deletions db4-storage/src/api/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use raphtory_api::core::entities::{
properties::{meta::Meta, prop::Prop, tprop::TPropOps},
};
use raphtory_core::{
entities::{EID, LayerIds, VID},
entities::{EID, LayerIds, VID, edges::edge_ref::EdgeRef},
storage::timeindex::{EventTime, TimeIndexOps},
};
use rayon::iter::ParallelIterator;
Expand All @@ -32,7 +32,7 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static {
fn latest(&self) -> Option<EventTime>;
fn earliest(&self) -> Option<EventTime>;

fn t_len(&self) -> usize;
fn t_len(&self, layer_id: usize) -> usize;
fn num_layers(&self) -> usize;
// Persistent layer count, not used for up-to-date counts
fn layer_count(&self, layer_id: LayerId) -> u32;
Expand Down Expand Up @@ -97,7 +97,7 @@ pub trait EdgeSegmentOps: Send + Sync + std::fmt::Debug + 'static {
locked_head: impl Deref<Target = MemEdgeSegment>,
) -> Option<(VID, VID)>;

fn entry<'a>(&'a self, edge_pos: LocalPOS) -> Self::Entry<'a>;
fn entry<'a>(&'a self, edge_pos: LocalPOS, edge_ref: Option<EdgeRef>) -> Self::Entry<'a>;

fn layer_entry<'a>(
&'a self,
Expand All @@ -124,7 +124,11 @@ pub trait LockedESegment: Send + Sync + std::fmt::Debug {
where
Self: 'a;

fn entry_ref<'a>(&'a self, edge_pos: impl Into<LocalPOS>) -> Self::EntryRef<'a>
fn entry_ref<'a>(
&'a self,
edge_pos: impl Into<LocalPOS>,
edge_ref: Option<EdgeRef>,
) -> Self::EntryRef<'a>
where
Self: 'a;

Expand Down
39 changes: 29 additions & 10 deletions db4-storage/src/pages/edge_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ use crate::{
persist::{config::ConfigOps, strategy::PersistenceStrategy},
segments::edge::segment::MemEdgeSegment,
};
use either::Either;
use parking_lot::{RwLock, RwLockWriteGuard};
use raphtory_api::core::entities::{
EID, LayerId, VID,
properties::meta::{Meta, STATIC_GRAPH_LAYER_ID},
};
use raphtory_core::{
entities::{ELID, LayerIds},
entities::{ELID, LayerIds, edges::edge_ref::EdgeRef},
storage::timeindex::{AsTime, EventTime},
};
use rayon::prelude::*;
Expand Down Expand Up @@ -56,12 +57,12 @@ impl<ES: EdgeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy<ES = ES>>

pub fn edge_ref(
&self,
e_id: impl Into<EID>,
e_id_ref: Either<EID, EdgeRef>,
) -> <<ES as EdgeSegmentOps>::ArcLockedSegment as LockedESegment>::EntryRef<'_> {
let e_id = e_id.into();
let e_id = e_id_ref.either(|eid| eid, |eref| eref.pid());
let (page_id, pos) = self.storage.resolve_pos(e_id);
let locked_page = &self.locked_pages[page_id];
locked_page.entry_ref(pos)
locked_page.entry_ref(pos, e_id_ref.right())
}

pub fn iter<'a, 'b: 'a>(
Expand Down Expand Up @@ -122,7 +123,7 @@ impl<ES: EdgeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy<ES = ES>>
.map(|(row_group_id, iter)| {
(
row_group_id,
iter.filter(|eid| self.edge_ref(*eid).edge(LayerId(0)).is_some()),
iter.filter(|eid| self.edge_ref(Either::Left(*eid)).edge(LayerId(0)).is_some()),
)
})
}
Expand Down Expand Up @@ -223,8 +224,11 @@ impl<ES: EdgeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy<ES = ES>>
Iterator::max(self.segments.iter().filter_map(|(_, page)| page.latest()))
}

pub fn t_len(&self) -> usize {
self.segments.iter().map(|(_, page)| page.t_len()).sum()
pub fn t_len(&self, layer_id: usize) -> usize {
self.segments
.iter()
.map(|(_, page)| page.t_len(layer_id))
.sum()
}

pub fn prop_meta(&self) -> &Arc<Meta> {
Expand Down Expand Up @@ -467,16 +471,16 @@ impl<ES: EdgeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy<ES = ES>>
segment.get_edge(local_edge, layer, segment.head())
}

pub fn edge(&self, e_id: impl Into<EID>) -> ES::Entry<'_> {
let e_id = e_id.into();
pub fn edge(&self, e_id_ref: Either<EID, EdgeRef>) -> ES::Entry<'_> {
let e_id = e_id_ref.either(|eid| eid, |eref| eref.pid());
let (segment_id, local_edge) = resolve_pos(e_id, self.max_page_len());
let segment = self.segments.get(segment_id).unwrap_or_else(|| {
panic!(
"{e_id:?} Not found in seg: {segment_id}, pos: {local_edge:?}, num_segments: {}",
self.segments.count()
)
});
segment.entry(local_edge)
segment.entry(local_edge, e_id_ref.right())
}

pub fn num_edges(&self) -> usize {
Expand All @@ -487,6 +491,21 @@ impl<ES: EdgeSegmentOps<Extension = EXT>, EXT: PersistenceStrategy<ES = ES>>
self.layer_counter.get(layer_id)
}

pub fn num_layers(&self) -> usize {
self.segments
.iter()
.map(|(_, page)| page.num_layers())
.max()
.unwrap_or(0)
}

pub fn num_temporal_edges_layer(&self, layer_id: LayerId) -> usize {
self.segments
.iter()
.map(|(_, page)| page.t_len(layer_id.0))
.sum()
}

pub fn get_writer<'a>(
&'a self,
e_id: EID,
Expand Down
7 changes: 4 additions & 3 deletions db4-storage/src/pages/locked/edges.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ impl<'a, EXT: PersistenceStrategy<ES = ES>, ES: EdgeSegmentOps<Extension = EXT>>
}

pub fn vacuum(&mut self) -> Result<(), StorageError> {
for LockedEdgePage { page, lock, .. } in &mut self.writers {
page.vacuum(lock.deref_mut())?;
}
self.writers.par_iter_mut().try_for_each(|writer| {
let LockedEdgePage { page, lock, .. } = writer;
page.vacuum(lock.deref_mut())
})?;
Ok(())
}

Expand Down
7 changes: 4 additions & 3 deletions db4-storage/src/pages/locked/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ impl<'a, EXT: PersistenceStrategy<NS = NS>, NS: NodeSegmentOps<Extension = EXT>>
}

pub fn vacuum(&mut self) -> Result<(), StorageError> {
for LockedNodePage { page, lock, .. } in &mut self.writers {
page.vacuum(lock.deref_mut())?;
}
self.writers.par_iter_mut().try_for_each(|writer| {
let LockedNodePage { page, lock, .. } = writer;
page.vacuum(lock.deref_mut())
})?;
Ok(())
}
}
13 changes: 10 additions & 3 deletions db4-storage/src/pages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ impl<

let edge_storage = Arc::new(EdgeStorageInner::load(edges_path, ext.clone())?);
let edge_meta = edge_storage.edge_meta().clone();
let node_storage = Arc::new(NodeStorageInner::load(nodes_path, edge_meta, ext.clone())?);
let node_storage: Arc<NodeStorageInner<NS, EXT>> = Arc::new(NodeStorageInner::load(
nodes_path,
edge_meta.clone(),
ext.clone(),
)?);
let node_meta = node_storage.prop_meta();

// Load graph temporal properties and metadata.
Expand All @@ -171,7 +175,10 @@ impl<
node_meta.get_or_create_node_type_id(node_type);
}

let t_len = edge_storage.t_len();
let t_len = edge_meta
.all_layer_iter()
.map(|(layer_id, _)| edge_storage.t_len(layer_id.0))
.sum::<usize>();

Ok(Self {
nodes: node_storage,
Expand Down Expand Up @@ -422,7 +429,7 @@ mod test {
use rayon::iter::ParallelIterator;

#[test]
fn test_iterleave() {
fn test_interleave() {
let chunk_size = 3;
let num_segments = 3;
let max_seg_len = 4;
Expand Down
4 changes: 2 additions & 2 deletions db4-storage/src/persist/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::{iter, path::Path};
use tracing::error;

pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 131_072; // 2^17
pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 1_048_576; // 2^20
pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 600_000; // 2^17
pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 6_000_000; // 2^20
pub const CONFIG_FILE: &str = "config.json";

pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized {
Expand Down
54 changes: 41 additions & 13 deletions db4-storage/src/segments/edge/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use raphtory_api::core::entities::{LayerId, properties::prop::Prop};
use raphtory_core::{
entities::{
EID, Multiple, VID,
edges::edge_ref::EdgeRef,
properties::{tcell::TCell, tprop::TPropCell},
},
storage::timeindex::{EventTime, TimeIndexOps},
Expand All @@ -18,14 +19,16 @@ use raphtory_core::{
pub struct MemEdgeEntry<'a, MES> {
pos: LocalPOS,
es: MES,
edge_ref: Option<EdgeRef>,
__marker: std::marker::PhantomData<&'a ()>,
}

impl<'a, MES: std::ops::Deref<Target = MemEdgeSegment>> MemEdgeEntry<'a, MES> {
pub fn new(pos: LocalPOS, es: MES) -> Self {
pub fn new(pos: LocalPOS, es: MES, edge_ref: Option<EdgeRef>) -> Self {
Self {
pos,
es,
edge_ref,
__marker: std::marker::PhantomData,
}
}
Expand All @@ -47,6 +50,7 @@ impl<'a, MES: std::ops::Deref<Target = MemEdgeSegment> + Send + Sync> EdgeEntryO
MemEdgeRef {
pos: self.pos,
es: &self.es,
edge_ref: self.edge_ref,
}
}
}
Expand All @@ -55,11 +59,12 @@ impl<'a, MES: std::ops::Deref<Target = MemEdgeSegment> + Send + Sync> EdgeEntryO
pub struct MemEdgeRef<'a> {
pos: LocalPOS,
es: &'a MemEdgeSegment,
edge_ref: Option<EdgeRef>,
}

impl<'a> MemEdgeRef<'a> {
pub fn new(pos: LocalPOS, es: &'a MemEdgeSegment) -> Self {
Self { pos, es }
pub fn new(pos: LocalPOS, es: &'a MemEdgeSegment, edge_ref: Option<EdgeRef>) -> Self {
Self { pos, es, edge_ref }
}

pub fn has_layers(&self, layer_ids: &Multiple) -> bool {
Expand Down Expand Up @@ -151,11 +156,20 @@ impl<'a> EdgeRefOps<'a> for MemEdgeRef<'a> {
type TProps = EdgeTProps<'a>;

fn edge(self, layer_id: LayerId) -> Option<(VID, VID)> {
self.es
.as_ref()
.get(layer_id.0)?
.get(self.pos)
.map(|entry| (entry.src, entry.dst))
match self.edge_ref {
None => self
.es
.as_ref()
.get(layer_id.0)?
.get(self.pos)
.map(|entry| (entry.src, entry.dst)),
Some(eref) => self
.es
.as_ref()
.get(layer_id.0)?
.has_item(self.pos)
.then_some((eref.src(), eref.dst())),
}
}

fn layer_additions(self, layer_id: LayerId) -> Self::Additions {
Expand All @@ -175,17 +189,31 @@ impl<'a> EdgeRefOps<'a> for MemEdgeRef<'a> {
}

fn src(&self) -> Option<VID> {
self.es.as_ref()[0].get(self.pos).map(|entry| entry.src)
self.edge_ref.map(|e| e.src()).or_else(|| {
self.es
.as_ref()
.first()
.and_then(|layer| layer.get(self.pos))
.map(|entry| entry.src)
})
}

fn dst(&self) -> Option<VID> {
self.es.as_ref()[0].get(self.pos).map(|entry| entry.dst)
self.edge_ref.map(|e| e.dst()).or_else(|| {
self.es
.as_ref()
.first()
.and_then(|layer| layer.get(self.pos))
.map(|entry| entry.dst)
})
}

fn edge_id(&self) -> EID {
let segment_id = self.es.as_ref()[0].segment_id();
let max_page_len = self.es.as_ref()[0].max_page_len();
self.pos.as_eid(segment_id, max_page_len)
self.edge_ref.map(|e| e.pid()).unwrap_or_else(|| {
let segment_id = self.es.as_ref()[0].segment_id();
let max_page_len = self.es.as_ref()[0].max_page_len();
self.pos.as_eid(segment_id, max_page_len)
})
}

fn internal_num_layers(self) -> usize {
Expand Down
Loading
Loading