Skip to content
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
65c0416
Implement RDB serialization and deserialization for graph data
AviAvni Apr 12, 2026
20171d8
feat: add GRAPH.DEBUG command for RDB load management and testing
AviAvni Apr 12, 2026
3ce7c6d
refactor: improve concurrency and error handling in graph serializati…
AviAvni Apr 12, 2026
788c6f2
style: format code for improved readability in multiple files
AviAvni Apr 12, 2026
f1be7a0
feat: add falkordb-bulk-loader to Python dependencies and update enco…
AviAvni Apr 12, 2026
b560f8c
feat: add schema version management to Graph and MVCCGraph for versio…
AviAvni Apr 12, 2026
4a81b3a
feat: add test for graph versioning and remove from todo list
AviAvni Apr 12, 2026
179e0be
feat: add EFFECTS_THRESHOLD as a runtime-configurable value
AviAvni Apr 12, 2026
6c29a53
Implement effects replication for graph mutations
AviAvni Apr 13, 2026
89c6855
feat: move test_effects.py from todo to done list
AviAvni Apr 13, 2026
6fb14c4
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
f0a39b0
Implement reserved node and relationship counters, enhance index effe…
AviAvni Apr 13, 2026
844c53d
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
3f4c27c
fix: remove unnecessary whitespace in apply_effects function
AviAvni Apr 13, 2026
727eadb
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
47b6943
feat: add replication of changes after committing graph effects
AviAvni Apr 13, 2026
e87dc11
feat: include modified flag in query execution results for replicatio…
AviAvni Apr 13, 2026
ec86753
feat: ensure capacity for highest node and relationship IDs during ef…
AviAvni Apr 13, 2026
359c49c
feat: add UINT64 support for edge IDs in matrix and tensor implementa…
AviAvni Apr 13, 2026
791aca7
feat: add UINT64 support for matrices and tensors, update Redis key h…
AviAvni Apr 13, 2026
99fd3e1
refactor: reorganize import statements for clarity and consistency
AviAvni Apr 13, 2026
77315ca
Merge branch 'main' into rdb
AviAvni Apr 13, 2026
d74a121
feat: implement dynamic resizing for graph node and relationship matr…
AviAvni Apr 13, 2026
a2b54e7
refactor: optimize effects handling in commit operation and add clear…
AviAvni Apr 13, 2026
890a23a
feat: add support for non-deterministic function detection in query p…
AviAvni Apr 13, 2026
d1a2377
feat: enhance attribute handling in Pending for new and existing node…
AviAvni Apr 13, 2026
774635c
feat: add disk space cleanup step in CI workflows for rust-pr and rus…
AviAvni Apr 13, 2026
fc3707e
refactor: replace unwrap() with direct lock() calls for DECODE_STATE …
AviAvni Apr 13, 2026
76ef617
feat: improve disk space cleanup in CI workflows for rust-pr and rust…
AviAvni Apr 13, 2026
3b7736d
refactor: simplify match statement in eval_row and improve function s…
AviAvni Apr 13, 2026
057c9c4
fix: dynamically set parallelism based on available CPU cores
AviAvni Apr 13, 2026
f33e2bb
feat: set parallelism for flow tests in CI workflows
AviAvni Apr 14, 2026
a3e65ee
fix: update flow test parallelism syntax in CI workflows
AviAvni Apr 14, 2026
a7a4bb6
refactor: update matrix resizing logic in rebuild_derived_matrices fo…
AviAvni Apr 14, 2026
8db5fed
feat: add redis configuration support to flow tests and update RLTest…
AviAvni Apr 14, 2026
103d9aa
refactor: remove redundant disk space cleanup steps from CI workflows
AviAvni Apr 14, 2026
c320f66
fix: update flow test execution to include release flag and remove pa…
AviAvni Apr 14, 2026
6c38250
refactor: simplify database handling in AttributeStore and Graph init…
AviAvni Apr 14, 2026
6722aff
refactor: streamline virtual key management by consolidating key dele…
AviAvni Apr 14, 2026
d4528ed
refactor: optimize virtual key management and encoding context handling
AviAvni Apr 14, 2026
a0b7ea3
refactor: optimize attribute cache and store to use Arc for improved …
AviAvni Apr 14, 2026
0cd9ddc
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
7a1be98
refactor: optimize memory calculations and improve index query handling
AviAvni Apr 14, 2026
39f2155
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
f8d7c07
merge: resolve conflicts after merging main into rdb
AviAvni Apr 14, 2026
056f466
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
7c14cc8
Merge branch 'main' into rdb
AviAvni Apr 14, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RUN ln -sf /usr/bin/clang-22 /usr/local/bin/clang && \
RUN python3 -m venv /data/venv

# Install Python test dependencies (setuptools needed for pkg_resources)
RUN /data/venv/bin/pip install setuptools behave falkordb hypothesis pytest pytest-benchmark RLTest
RUN /data/venv/bin/pip install setuptools behave falkordb falkordb-bulk-loader hypothesis pytest pytest-benchmark RLTest

# Install Rust (needed before running scripts)
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ graph = { path = "graph", version = "0.1.0" }
lazy_static = "1.5.0"
parking_lot = "0.12.5"
redis-module = { git = "https://github.com/AviAvni/redismodule-rs", branch = "master" }
roaring = "0.11.3"
ryu = "1.0.23"
thin-vec = "0.2.14"
orx-tree = "2.2.0"

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN apt install -y git make cmake curl zlib1g zlib1g-dev libpolly-22-dev libomp-

RUN python3 -m venv venv

RUN venv/bin/pip install behave falkordb hypothesis pytest pytest-benchmark RLTest
RUN venv/bin/pip install behave falkordb falkordb-bulk-loader hypothesis pytest pytest-benchmark RLTest

RUN git clone --branch dev2 --single-branch https://github.com/DrTimothyAldenDavis/GraphBLAS.git
# RUN git clone --branch v10.3.1 --single-branch https://github.com/DrTimothyAldenDavis/GraphBLAS.git
Expand Down
4 changes: 2 additions & 2 deletions flow.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fi
# TEST="tests/flow/test_function_calls" FAIL_FAST=1 ./flow.sh

if [[ ${#TEST_FILTER[@]} -eq 0 ]]; then
RLTest -f "$TESTS_FILE" --module "$TARGET_DIR/$TARGET" --no-progress $PARALLELISM $STOP_ON_FAILURE --clear-logs --log-dir tests/flow/logs $V
RLTest -f "$TESTS_FILE" --module "$TARGET_DIR/$TARGET" --no-progress $PARALLELISM $STOP_ON_FAILURE --clear-logs --log-dir tests/flow/logs --enable-debug-command --enable-protected-configs $V
else
RLTest "${TEST_FILTER[@]}" --module "$TARGET_DIR/$TARGET" --no-progress $PARALLELISM $STOP_ON_FAILURE --clear-logs --log-dir tests/flow/logs $V
RLTest "${TEST_FILTER[@]}" --module "$TARGET_DIR/$TARGET" --no-progress $PARALLELISM $STOP_ON_FAILURE --clear-logs --log-dir tests/flow/logs --enable-debug-command --enable-protected-configs $V
fi
5 changes: 5 additions & 0 deletions flow_tests_done.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ tests/flow/test_concurrent_query.py
tests/flow/test_config.py
tests/flow/test_create_clause
tests/flow/test_distinct
tests/flow/test_effects.py
tests/flow/test_empty_query
tests/flow/test_encode_decode.py
tests/flow/test_entity_update
tests/flow/test_execution_plan_print.py
tests/flow/test_expand_into
Expand All @@ -24,6 +26,7 @@ tests/flow/test_function_calls
tests/flow/test_graph_create
tests/flow/test_graph_deletion
tests/flow/test_graph_merge
tests/flow/test_graph_versioning.py
tests/flow/test_hashjoin.py
tests/flow/test_imdb
tests/flow/test_index_create
Expand Down Expand Up @@ -52,8 +55,10 @@ tests/flow/test_path_algorithms.py
tests/flow/test_path_filter
tests/flow/test_path_projections.py
tests/flow/test_pending_queries_limit.py
tests/flow/test_persistency.py
tests/flow/test_point
tests/flow/test_query_validation
tests/flow/test_rdb_load.py
tests/flow/test_reduce.py
tests/flow/test_results.py
tests/flow/test_reversed_patterns
Expand Down
7 changes: 0 additions & 7 deletions flow_tests_todo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,10 @@ tests/flow/test_profile.py
tests/flow/test_stress.py

## Persistence & Replication
tests/flow/test_persistency.py
tests/flow/test_encode_decode.py
tests/flow/test_rdb_load.py
tests/flow/test_prev_rdb_decode.py
tests/flow/test_replication.py
tests/flow/test_replication_states.py
tests/flow/test_graph_versioning.py

## Redis Integration & Server Features
tests/flow/test_acl.py
tests/flow/test_bolt.py
tests/flow/test_bulk_insertion.py
tests/flow/test_graph_copy.py
Expand All @@ -47,5 +41,4 @@ tests/flow/test_undo_log.py

## Metadata & Internals
tests/flow/test_multi_label.py
tests/flow/test_effects.py
tests/flow/test_intern_string.py
147 changes: 142 additions & 5 deletions graph/src/graph/attribute_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@
//! Each attribute is stored as a separate fjall entry:
//! `entity_id (8 bytes big-endian) + attr_idx (2 bytes big-endian)`

use std::{collections::HashMap, sync::Arc};
use std::{
collections::HashMap,
sync::{
Arc, Mutex,
atomic::{AtomicU64, Ordering},
},
};

use fjall::{
Database, Keyspace, KeyspaceCreateOptions, Readable, Snapshot, config::HashRatioPolicy,
Expand All @@ -92,6 +98,7 @@ use once_cell::sync::OnceCell;
use roaring::RoaringTreemap;

use super::attribute_cache::AttributeCache;
use super::graphblas::serialization::{Decode, Encode, Reader, Writer};
use crate::runtime::{ordermap::OrderMap, orderset::OrderSet, value::Value};

/// Create a composite key from entity ID and attribute index.
Expand Down Expand Up @@ -134,6 +141,12 @@ pub struct AttributeStore {
dirty_entities: RoaringTreemap,
/// Entity IDs pending full deletion (all attributes) — applied on commit, cleared on rollback.
pending_deletes: RoaringTreemap,
/// Encoding context: deleted entity IDs (set before serialization).
encode_deleted: Mutex<Option<RoaringTreemap>>,
/// Encoding context: maximum entity ID (set before serialization).
encode_max_id: AtomicU64,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
/// Encoding context: mapping from local attr IDs to global attr IDs (set before serialization).
encode_attr_remap: Mutex<Option<Vec<u16>>>,
}

impl Clone for AttributeStore {
Expand All @@ -148,6 +161,9 @@ impl Clone for AttributeStore {
version: self.version,
dirty_entities: self.dirty_entities.clone(),
pending_deletes: self.pending_deletes.clone(),
encode_deleted: Mutex::new(None),
encode_max_id: AtomicU64::new(0),
encode_attr_remap: Mutex::new(None),
}
}
}
Expand All @@ -172,6 +188,9 @@ impl AttributeStore {
version,
dirty_entities: RoaringTreemap::new(),
pending_deletes: RoaringTreemap::new(),
encode_deleted: Mutex::new(None),
encode_max_id: AtomicU64::new(0),
encode_attr_remap: Mutex::new(None),
}
}

Expand Down Expand Up @@ -235,6 +254,9 @@ impl AttributeStore {
version,
dirty_entities: RoaringTreemap::new(),
pending_deletes: RoaringTreemap::new(),
encode_deleted: Mutex::new(None),
encode_max_id: AtomicU64::new(0),
encode_attr_remap: Mutex::new(None),
}
}

Expand Down Expand Up @@ -369,10 +391,10 @@ impl AttributeStore {
pub fn get_all_attrs_by_id(
&self,
key: u64,
) -> impl Iterator<Item = (u16, Value)> + '_ {
let cached = self.cache.get_entity(key, self.version);
let attrs = cached.unwrap_or_else(|| self.populate_cache_from_fjall(key));
attrs.into_iter()
) -> Vec<(u16, Value)> {
self.cache
.get_entity(key, self.version)
.unwrap_or_else(|| self.populate_cache_from_fjall(key))
}

// ---- write path (cache only) ----------------------------------------
Expand Down Expand Up @@ -619,6 +641,29 @@ impl AttributeStore {
pub const fn cache(&self) -> &Arc<AttributeCache> {
&self.cache
}

/// Set encoding context needed by `Encode::encode_with_range`.
///
/// Builds a mapping from local attribute IDs (indices in this store's attrs_name)
/// to global attribute IDs (indices in the provided global_attrs list).
pub fn set_encode_context(
&self,
deleted: &RoaringTreemap,
max_id: u64,
global_attrs: &[Arc<String>],
) {
*self.encode_deleted.lock().unwrap() = Some(deleted.clone());
self.encode_max_id.store(max_id, Ordering::Relaxed);

// Build mapping from local attr ID to global attr ID
let mut remap = vec![u16::MAX; self.attrs_name.len()];
for (local_id, local_name) in self.attrs_name.iter().enumerate() {
if let Some(global_id) = global_attrs.iter().position(|n| n == local_name) {
remap[local_id] = global_id as u16;
}
}
*self.encode_attr_remap.lock().unwrap() = Some(remap);
}
}

// SAFETY: AttributeStore is Send+Sync because:
Expand All @@ -628,3 +673,95 @@ impl AttributeStore {
// - All other fields (`RoaringTreemap`, `OrderSet`, etc.) are owned and not shared
unsafe impl Send for AttributeStore {}
unsafe impl Sync for AttributeStore {}

impl Encode<19> for AttributeStore {
fn encode(
&self,
_w: &mut dyn Writer,
) {
unimplemented!("use encode_with_range for AttributeStore")
}

fn encode_with_range(
&self,
w: &mut dyn Writer,
count: u64,
offset: u64,
) {
let binding = self.encode_deleted.lock().unwrap();
let deleted = binding.as_ref().expect("encode context not set");
let max_id = self.encode_max_id.load(Ordering::Relaxed);

let remap_binding = self.encode_attr_remap.lock().unwrap();
let remap = remap_binding.as_ref().expect("encode attr remap not set");

let mut skipped = 0u64;
let mut encoded = 0u64;

for id in 0..=max_id {
if deleted.contains(id) {
continue;
}
if skipped < offset {
skipped += 1;
continue;
}

w.write_unsigned(id);

let props: Vec<(u16, Value)> = self.get_all_attrs_by_id(id);
w.write_unsigned(props.len() as u64);

for (local_attr_id, value) in props {
// Remap local attribute ID to global attribute ID
let global_attr_id = if (local_attr_id as usize) < remap.len() {
remap[local_attr_id as usize]
} else {
local_attr_id
};
w.write_unsigned(global_attr_id as u64);
value.encode(w);
}

encoded += 1;
if encoded >= count {
break;
}
}
}
}

impl Decode<19> for AttributeStore {
fn decode(_r: &mut dyn Reader) -> Result<Self, String> {
unimplemented!("use decode_with_count for AttributeStore")
}

fn decode_with_count(
&mut self,
r: &mut dyn Reader,
count: u64,
) -> Result<(), String> {
for _ in 0..count {
let entity_id = r.read_unsigned()?;
let attr_count = r.read_unsigned()?;

let mut entity_attrs = OrderMap::default();
for _ in 0..attr_count {
let attr_id = r.read_unsigned()? as u16;
let value = Value::decode(r)?;

if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject oversized attribute ids instead of truncating them.

r.read_unsigned()? as u16 will silently wrap malformed values above u16::MAX, so a corrupted RDB can land attributes in the wrong column instead of failing the load.

Suggested fix
-                let attr_id = r.read_unsigned()? as u16;
+                let attr_id = u16::try_from(r.read_unsigned()?)
+                    .map_err(|_| "attribute id exceeds u16".to_string())?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for _ in 0..attr_count {
let attr_id = r.read_unsigned()? as u16;
let value = Value::decode(r)?;
if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
for _ in 0..attr_count {
let attr_id = u16::try_from(r.read_unsigned()?)
.map_err(|_| "attribute id exceeds u16".to_string())?;
let value = Value::decode(r)?;
if (attr_id as usize) < self.attrs_name.len() {
let attr_name = self.attrs_name[attr_id as usize].clone();
entity_attrs.insert(attr_name, value);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@graph/src/graph/attribute_store.rs` around lines 720 - 726, The loop
currently casts r.read_unsigned()? to u16 which silently wraps oversized values;
instead, after reading the unsigned id via r.read_unsigned(), explicitly
validate that it fits into u16 (or into attrs_name.len()) and return a
decode/load error if it exceeds u16::MAX or the number of known attributes so we
don't silently place attributes in the wrong column; update the attr_id handling
in the loop (where attr_id is computed and used to index self.attrs_name and
insert into entity_attrs) to perform this bounds/overflow check and fail the
load rather than truncating.

}
}

if !entity_attrs.is_empty() {
let mut batch = HashMap::new();
batch.insert(entity_id, entity_attrs);
self.insert_attrs(&batch)?;
}
}
Ok(())
}
}
Loading
Loading