From bc59215045a0bf8c75ebb5dd7d6c69d2d14b02fa Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Tue, 5 May 2026 17:43:02 -0700 Subject: [PATCH] LMDB based `MerkleStore` implementation Adds the `heed` crate to provide access LMDB (Lightning Memory-Mapped Database). Creates a new `MerkleStore` implementation using LMDB as `LmdbBackend` under the new `core::db::merkle_node::lmdb` package in `liboxen`. Extensive new tests have been added to ensure that the memory layout of LMDB values is consistent and that LMDB operations work as expected. **LMDB Store Design** The `LmdbBackend` uses two tables to store all Merkle tree nodes: 1. `merkle_tree_nodes`: u128 -> ~EMerkleTreeNode 2. `merkle_links`: u128 -> parent(u128) + []children(u128) (1) stores the actual Merkle tree node struct. It has the type and the msgpack serialized bytes for the `EMerkleTreeNode`. To maintain backwards compatability, the `EMerkleTreeNode`'s serialized representation is used as-is and _not_ modified. (Modification would require a migration). (2) stores the connections that dictate the structure of the Merkle tree. Each node maps to a `LmdbLink`, which is an optional parent connection and a list of the node's children. Each of these is are `MerkleHash`es: they're stored as 16 byte `u128` values. **`zerocopy` uses** The `zerocopy` dependency has been added as the `LmdbBackend` offers full zero-copy support for read operations. These are implemented using methods on the `LmdbBackend` struct itself. The `MerkleReader` operations require owned data, so these views have to be copied to comply with the trait design. However, this opens the door in the future to iterating on the trait to return borrows on the underlying data. Each internal table has its own zerocopy view: `LmdbNodeRef` for `LmdbNode` and `LmdbLinkRef` for `LmdbLink`. The borrows last as long as the lifetime for the read transaction because they are direct views into LMDB's internal memory-mapped pages. **`MerkleReader` implementation** The `LmdbBackend` actually stores `FileNode` and `FileChunkNode` Merkle tree nodes in its store directly. This diverges from the `FileBackend`, where, for better file access patterns and to reduce inode pressure, file nodes are only stored in the `children` file and require parsing the lookup table from the `node` file. To ensure that `LmdbBackend` adheres to the constraints of `MerkleReader`, the `get_node` and `exists` methods treat file nodes as not being present. However, the `LmdbBackend` struct provides `full_exists` & `full_get_node` which work correctly on actually stored file and file chunk nodes. **`MerkleWriter` implementation** LMDB encourages the use of short-lived transactions. Writing into LMDB directly buffers data in memory (via memory-mapped pages). Closing a transaction requires an fsync, which is an expensive syscall. The writer implementation explicitly buffers written nodes and children via a `Cell>`. The enclosing write session's `finish` performs the actual write to LMDB. Node that the node write session _does not_ actually ensure that writes are persisted to LMDB, as this would incur a performance penalty via fsync. --- .gitignore | 1 + Cargo.lock | 125 +++- Cargo.toml | 2 + crates/lib/Cargo.toml | 2 + crates/lib/src/core/db/merkle_node.rs | 4 + .../src/core/db/merkle_node/file_backend.rs | 7 +- crates/lib/src/core/db/merkle_node/lmdb.rs | 273 ++++++++ .../core/db/merkle_node/lmdb/lmdb_backend.rs | 228 ++++++ .../src/core/db/merkle_node/lmdb/reader.rs | 250 +++++++ .../core/db/merkle_node/lmdb/value_structs.rs | 662 ++++++++++++++++++ .../src/core/db/merkle_node/lmdb/writer.rs | 485 +++++++++++++ .../src/core/db/merkle_node/merkle_node_db.rs | 11 +- .../core/v_latest/index/commit_merkle_tree.rs | 2 +- crates/lib/src/error.rs | 5 + .../src/model/merkle_tree/node/commit_node.rs | 1 + .../src/model/merkle_tree/node/dir_node.rs | 1 + .../model/merkle_tree/node/file_chunk_node.rs | 1 + .../src/model/merkle_tree/node/file_node.rs | 1 + .../lib/src/model/merkle_tree/node/vnode.rs | 1 + .../src/repositories/commits/commit_writer.rs | 11 +- 20 files changed, 2056 insertions(+), 17 deletions(-) create mode 100644 crates/lib/src/core/db/merkle_node/lmdb.rs create mode 100644 crates/lib/src/core/db/merkle_node/lmdb/lmdb_backend.rs create mode 100644 crates/lib/src/core/db/merkle_node/lmdb/reader.rs create mode 100644 crates/lib/src/core/db/merkle_node/lmdb/value_structs.rs create mode 100644 crates/lib/src/core/db/merkle_node/lmdb/writer.rs diff --git a/.gitignore b/.gitignore index 90da84a17..9b3923b0d 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ cli-test/vendor/ .cursor .cargo/config.toml .claude/settings.local.json +.claude/*.lock .superset data/ diff --git a/Cargo.lock b/Cargo.lock index 65a4c89ce..fcc24854d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2019,7 +2019,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6139a8597ed92cf816dfb33f5dd6cf0bb93a6adc938f11039f371bc5bcd26c3" dependencies = [ "chrono", - "phf", + "phf 0.12.1", ] [[package]] @@ -2941,6 +2941,15 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "doxygen-rs" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" +dependencies = [ + "phf 0.11.3", +] + [[package]] name = "duckdb" version = "1.10502.0" @@ -3642,6 +3651,44 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "heed" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad82d6598ccf1dac15c8b758a1bd282b755b6776be600429176757190a1b0202" +dependencies = [ + "bitflags 2.11.0", + "byteorder", + "heed-traits", + "heed-types", + "libc", + "lmdb-master-sys", + "once_cell", + "page_size", + "serde", + "synchronoise", + "url", +] + +[[package]] +name = "heed-traits" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" + +[[package]] +name = "heed-types" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" +dependencies = [ + "bincode", + "byteorder", + "heed-traits", + "serde", + "serde_json", +] + [[package]] name = "hermit-abi" version = "0.5.2" @@ -4510,6 +4557,7 @@ dependencies = [ "futures-util", "glob", "glob-match", + "heed", "http 1.4.0", "humantime", "hyper 1.8.1", @@ -4579,6 +4627,7 @@ dependencies = [ "uuid", "walkdir", "xxhash-rust", + "zerocopy", "zip 2.4.2", ] @@ -4647,6 +4696,17 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "lmdb-master-sys" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaeb9bd22e73bd1babffff614994b341e9b2008de7bb73bf1f7e9154f1978f8b" +dependencies = [ + "cc", + "doxygen-rs", + "libc", +] + [[package]] name = "local-channel" version = "0.1.5" @@ -5487,6 +5547,16 @@ dependencies = [ "sha2 0.10.9", ] +[[package]] +name = "page_size" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "par-stream" version = "0.10.2" @@ -5640,13 +5710,55 @@ dependencies = [ "indexmap 2.13.0", ] +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_macros", + "phf_shared 0.11.3", +] + [[package]] name = "phf" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "913273894cec178f401a31ec4b656318d95473527be05c0752cc41cdc32be8b7" dependencies = [ - "phf_shared", + "phf_shared 0.12.1", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared 0.11.3", + "rand 0.8.5", +] + +[[package]] +name = "phf_macros" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" +dependencies = [ + "phf_generator", + "phf_shared 0.11.3", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", ] [[package]] @@ -8246,6 +8358,15 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synchronoise" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" +dependencies = [ + "crossbeam-queue", +] + [[package]] name = "synstructure" version = "0.13.2" diff --git a/Cargo.toml b/Cargo.toml index 2e1abc705..f4ee26135 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ futures = "0.3.28" futures-util = "0.3.28" glob = "0.3.1" glob-match = "0.2.1" +heed = "0.22.0" hex = "0.4.3" http = "1.1.0" humantime = "2.1.0" @@ -161,6 +162,7 @@ utoipa-swagger-ui = { version = "9", features = ["actix-web"] } uuid = { version = "1.4.1", features = ["serde", "v4"] } walkdir = "2.5.0" xxhash-rust = { version = "0.8.7", features = ["xxh3"] } +zerocopy = { version = "0.8", features = ["derive"] } zip = { version = "2.4.1", default-features = false, features = ["deflate"] } [profile.dev] diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index 139338133..c71bf0a4f 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -74,6 +74,7 @@ futures = { workspace = true } futures-util = { workspace = true } glob = { workspace = true } glob-match = { workspace = true } +heed = { workspace = true } http = { workspace = true } humantime = { workspace = true } ignore = { workspace = true } @@ -137,6 +138,7 @@ utoipa = { workspace = true } uuid = { workspace = true } walkdir = { workspace = true } xxhash-rust = { workspace = true } +zerocopy = { workspace = true } zip = { workspace = true } [dev-dependencies] diff --git a/crates/lib/src/core/db/merkle_node.rs b/crates/lib/src/core/db/merkle_node.rs index f9c4ce10e..d7775e7b3 100644 --- a/crates/lib/src/core/db/merkle_node.rs +++ b/crates/lib/src/core/db/merkle_node.rs @@ -1,4 +1,8 @@ pub mod file_backend; +pub mod lmdb; pub mod merkle_node_db; pub(crate) use merkle_node_db::MerkleNodeDB; + +pub use file_backend::FileBackend; +pub use lmdb::LmdbBackend; diff --git a/crates/lib/src/core/db/merkle_node/file_backend.rs b/crates/lib/src/core/db/merkle_node/file_backend.rs index 659c18100..4ab2325a4 100644 --- a/crates/lib/src/core/db/merkle_node/file_backend.rs +++ b/crates/lib/src/core/db/merkle_node/file_backend.rs @@ -354,12 +354,9 @@ fn extract_tar_under( // we have the hex-encoded hash as the `{prefix}/{suffix}` dirs. if let Some(hash) = extract_hash_from_entry_path(&dst_path, oxen_hidden)? { hashes.insert(hash); - } else { - log::warn!( - "Skipping non-merkle entry in tarball: {}", - dst_path.display() - ); } + // If we can't extract a path, it's because we're looking at a node or children file. + // We will have already obtained the hash from the directory, so this is ok! } Ok(hashes) } diff --git a/crates/lib/src/core/db/merkle_node/lmdb.rs b/crates/lib/src/core/db/merkle_node/lmdb.rs new file mode 100644 index 000000000..d774ce29a --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/lmdb.rs @@ -0,0 +1,273 @@ +/// The [`LmdbBackend`] struct. +mod lmdb_backend; +/// The [`MerkleReader`] implementation. +mod reader; +/// The structures stored as values. +mod value_structs; +/// The [`MerkleWriter`] implementation. +mod writer; + +pub use lmdb_backend::LmdbBackend; + +use thiserror::Error; + +use crate::model::merkle_tree::{merkle_hash::HexHash, node_type::InvalidMerkleTreeNodeType}; + +/// Errors that the LMDB backend's operations can encounter. +/// +/// Major categories: +/// - deserializing values from LMDB incorrectly (truncated, bad magic, +/// unsupported version, malformed tail) +/// - underlying LMDB library errors (heed) +/// - violations on the integrity of the specific LMDB tables +#[derive(Debug, Error)] +pub enum LmdbError { + // ── LmdbNode value (merkle_tree_nodes) decode errors ───────────────────── + #[error("[LmdbNode] header truncated: only {len} bytes (need at least 12)")] + NodeHeaderTruncated { len: usize }, + + #[error("[LmdbNode] bad magic: got {actual:?}, expected b\"OXNV\"")] + NodeBadMagic { actual: [u8; 4] }, + + #[error("[LmdbNode] unsupported on-disk version: {0}")] + NodeUnsupportedVersion(u8), + + #[error("[LmdbNode] {0}")] + InvalidMerkleTreeNodeType(#[from] InvalidMerkleTreeNodeType), + + // ── LmdbLink value (merkle_links) decode errors ────────────────────────── + #[error("[LmdbLink] header truncated: only {len} bytes (need at least 36)")] + LinkHeaderTruncated { len: usize }, + + #[error("[LmdbLink] bad magic: got {actual:?}, expected b\"OXLN\"")] + LinkBadMagic { actual: [u8; 4] }, + + #[error("[LmdbLink] unsupported on-disk version: {0}")] + LinkUnsupportedVersion(u8), + + #[error("[LmdbLink] invalid has_parent flag: expected 0 or 1, got {0}")] + InvalidIsParent(u8), + + #[error("[LmdbLink] children tail length {tail_len} is not a multiple of 16 bytes")] + ChildrenTailMisaligned { tail_len: usize }, + + #[error("[LmdbLink] header claims {claimed} children but tail has {actual}")] + ChildrenCountMismatch { claimed: usize, actual: usize }, + + // ── LMDB / heed transport ──────────────────────────────────────────────── + #[error("Error retrieving: {0}")] + Retrieve(heed::Error), + + #[error("Error accessing LMDB Merkle store: {0}")] + Access(heed::Error), + + #[error("Error writing LMDB Merkle store: {0}")] + Write(heed::Error), + + // ── Cross-table integrity ──────────────────────────────────────────────── + #[error("Missing node, have link for (hex) hash: {0}")] + IntegrityNoNode(HexHash), + + #[error("Missing link, have node for (hex) hash: {0}")] + IntegrityNoLink(HexHash), + + #[error("Stored a child for (hex) hash ({0}) but node for hash does not exist.")] + IntegrityNoHash(HexHash), +} + +#[cfg(test)] +mod tests { + use std::{ + collections::HashMap, + path::{Path, PathBuf}, + }; + + use heed::EnvOpenOptions; + use time::OffsetDateTime; + + use crate::{ + core::db::merkle_node::{LmdbBackend, lmdb::lmdb_backend::lmdb_dir_location}, + error::OxenError, + model::{ + EntryDataType, LocalRepository, MerkleHash, MerkleTreeNodeType, TMerkleTreeNode, + merkle_tree::{ + merkle_writer::MerkleWriter, + node::{ + CommitNode, DirNode, FileChunkNode, FileNode, VNode, + commit_node::CommitNodeOpts, dir_node::DirNodeOpts, file_node::FileNodeOpts, + vnode::VNodeOpts, + }, + }, + }, + }; + + /// Map a test repo path to a per-repo directory under the OS temp dir. + /// + /// The Windows CI runner mounts an ImDisk RAMDisk at `R:\test` and points + /// `OXEN_TEST_RUN_DIR` there (see `.github/workflows/ci_test.yml`). ImDisk is a + /// Win32-level emulation that does not fully implement the NT-level memory-section + /// APIs LMDB depends on: `NtCreateSection` against a file on the ImDisk volume + /// returns `STATUS_INVALID_DEVICE_REQUEST`, which `mdb_nt2win32` converts to + /// `ERROR_INVALID_FUNCTION` (Win32 code 1). It surfaces here as + /// `Lmdb(Access(Io(Os { code: 1, .. })))` and is reported as "Incorrect function.". + /// `LmdbBackend` already documents `DO NOT USE ON A VIRTUAL FILE SYSTEM`; an ImDisk + /// volume is exactly that. + /// + /// Routing the env to the OS temp dir keeps it on the host's real filesystem + /// (NTFS on Windows runners), where the NT memory-section APIs work normally. + /// The mapping is stable per repo path so that callers that re-open with the + /// same `repo_root` (e.g. `test_data_persists_across_env_reopen`) hit the same + /// env on each open. The repo's UUID-named leaf keeps env paths unique across + /// concurrent tests. + fn lmdb_test_root(repo_root: &Path) -> PathBuf { + let leaf = repo_root + .file_name() + .expect("test repo_root has a leaf component"); + std::env::temp_dir().join("oxen-lmdb-tests").join(leaf) + } + + /// Build a fresh [`LmdbBackend`] for a test [`LocalRepository`]. + pub(in crate::core::db::merkle_node::lmdb) fn open_lmdb_backend( + repo: &LocalRepository, + ) -> LmdbBackend { + open_lmdb_at(repo.path.clone()) + } + + /// Open the backend keyed by a `repo_root` — used to test persistence across env opens. + /// + /// The on-disk env lives under the OS temp dir, not under `repo_root` itself; see + /// [`lmdb_test_root`] for why. heed (without `NO_SUB_DIR`) treats the env path as a + /// directory it writes `data.mdb` + `lock.mdb` into, so the leaf must exist, and we + /// create it here. + pub(in crate::core::db::merkle_node::lmdb) fn open_lmdb_at(repo_root: PathBuf) -> LmdbBackend { + let test_root = lmdb_test_root(&repo_root); + let env_dir = lmdb_dir_location(&test_root); + std::fs::create_dir_all(&env_dir).expect("env dir"); + + let mut opts = EnvOpenOptions::new(); + opts.map_size(10 * 1024 * 1024); + LmdbBackend::new(test_root, opts).expect("open lmdb backend") + } + + /// Drive a test against a fresh [`LocalRepository`] and an [`LmdbBackend`] + /// rooted at the same path. Mirrors the writer.rs `with_test_backend` helper. + pub(in crate::core::db::merkle_node::lmdb) fn with_test_backend( + test_fn: F, + ) -> Result<(), OxenError> + where + F: FnOnce(&LocalRepository, &LmdbBackend) -> Result<(), OxenError> + std::panic::UnwindSafe, + { + crate::test::run_empty_local_repo_test(|repo| { + let backend = open_lmdb_backend(&repo); + test_fn(&repo, &backend) + }) + } + + /// Helper: parse a hex-char string into a [`MerkleHash`]. Uses the `FromStr` impl. + pub(in crate::core::db::merkle_node::lmdb) fn h(hex: &str) -> MerkleHash { + hex.parse().expect("valid hex hash") + } + + /// Helper: make a commit node. + pub(in crate::core::db::merkle_node::lmdb) fn commit_with_hash( + repo: &LocalRepository, + hash: MerkleHash, + ) -> CommitNode { + CommitNode::new( + repo, + CommitNodeOpts { + hash, + parent_ids: vec![], + email: String::new(), + author: String::new(), + message: String::new(), + timestamp: OffsetDateTime::UNIX_EPOCH, + }, + ) + .expect("CommitNode::new") + } + + /// Helper: make a directory node. + pub(in crate::core::db::merkle_node::lmdb) fn dir_with_hash( + repo: &LocalRepository, + hash: MerkleHash, + ) -> DirNode { + DirNode::new( + repo, + DirNodeOpts { + name: String::new(), + hash, + num_entries: 0, + num_bytes: 0, + last_commit_id: MerkleHash::new(0), + last_modified_seconds: 0, + last_modified_nanoseconds: 0, + data_type_counts: HashMap::new(), + data_type_sizes: HashMap::new(), + }, + ) + .expect("DirNode::new") + } + + /// Helper: make a virtual directory node. + pub(in crate::core::db::merkle_node::lmdb) fn vnode_with_hash( + repo: &LocalRepository, + hash: MerkleHash, + ) -> VNode { + VNode::new( + repo, + VNodeOpts { + hash, + num_entries: 0, + }, + ) + .expect("VNode::new") + } + + /// Helper: make a file node. + pub(in crate::core::db::merkle_node::lmdb) fn file_node_with_hash( + repo: &LocalRepository, + hash: MerkleHash, + ) -> FileNode { + FileNode::new( + repo, + FileNodeOpts { + name: String::new(), + hash, + combined_hash: MerkleHash::new(0), + metadata_hash: None, + num_bytes: 0, + last_modified_seconds: 0, + last_modified_nanoseconds: 0, + data_type: EntryDataType::Binary, + metadata: None, + mime_type: String::new(), + extension: String::new(), + }, + ) + .expect("FileNode::new") + } + + /// Helper: make a file chunk node. + pub(in crate::core::db::merkle_node::lmdb) fn file_chunk_node_with_hash( + hash: MerkleHash, + ) -> FileChunkNode { + FileChunkNode { + data: vec![], + node_type: MerkleTreeNodeType::FileChunk, + hash, + } + } + + /// Helper: write a single node into the backend with the given parent_id. Does not write children. + pub(in crate::core::db::merkle_node::lmdb) fn write_one( + backend: &LmdbBackend, + node: &dyn TMerkleTreeNode, + parent_id: Option, + ) -> Result<(), OxenError> { + let session = backend.begin()?; + let ns = session.create_node(node, parent_id)?; + ns.finish()?; + session.finish() + } +} diff --git a/crates/lib/src/core/db/merkle_node/lmdb/lmdb_backend.rs b/crates/lib/src/core/db/merkle_node/lmdb/lmdb_backend.rs new file mode 100644 index 000000000..e42d44461 --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/lmdb/lmdb_backend.rs @@ -0,0 +1,228 @@ +use std::path::{Path, PathBuf}; + +use heed::byteorder::LE; +use heed::types::{Bytes, DecodeIgnore, U128}; +use heed::{AnyTls, Database, Env, EnvOpenOptions, WithTls}; + +use crate::constants::OXEN_HIDDEN_DIR; +use crate::core::db::merkle_node::lmdb::LmdbError; +use crate::core::db::merkle_node::lmdb::value_structs::{LmdbLink, LmdbNode}; +use crate::error::OxenError; +use crate::model::MerkleHash; + +/// Keys are merkle hashes, which are `u128` values. +/// MUST USE LITTLE ENDIAN (LE) so the byte layout matches +/// `ContentHash::as_bytes` / `LocationHash::as_bytes`. +pub(super) type KeyLmdb = U128; + +/// Values are serialized as bytes. Access requires deserialization into structs. +pub(super) type ValueLmdb = Bytes; + +/// LMDB table name: storing merkle tree node byte +const DB_NODES: &str = "merkle_tree_nodes"; +/// LMDB table name: storing parent & child relationships per-node +const DB_LINKS: &str = "merkle_links"; + +/// Merkle tree node storage that is backed by LMDB. +/// +/// NOTE: DO NOT USE ON A VIRTUAL FILE SYSTEM !! +pub struct LmdbBackend { + /// The filesystem location of the local repository. + pub(super) repo_root: PathBuf, + /// The LMDB environment that contains the [`Database`] fields. + pub(super) lmdb_env: Env, // note: WithTls makes this !Send. Use AnyTls if need to send between threads. + + /// Stores every kind of merkle tree node: any concrete [`MerkleTreeNode`]. + /// This includes files nodes! Note that there is no other data but the Merkle + /// tree node type (u8) and the msgpack-serialized bytes of the actual node. + /// + /// These values deserialize into a [`LmdbNode`]. The zerocopy view on these + /// values is the [`LmdbNodeRef`]. + /// + /// In the LMDB environment, this has name of the [DB_NODES] constant. + pub(super) merkle_tree_nodes: Database, + + /// Stores the parent and children connections for each Merkle tree node. + /// + /// These values deserialize into a [`LmdbLink`]. The zerocopy view on these + /// values is the [`LmdbLinkRef`]. + /// + /// In the LMDB environment, this has name of the [DB_LINKS] constant. + pub(super) merkle_links: Database, +} + +impl LmdbBackend { + pub fn new(repo_root: PathBuf, options: EnvOpenOptions) -> Result { + let options = { + let mut options = options; + options.max_dbs(2); + log::debug!("Config for LMDB backend: {options:?}"); + options + }; + + let lmdb_env = { + let db_location = lmdb_dir_location(&repo_root); + log::debug!("Opening LMDB backend at: {}", db_location.display()); + // SAFETY: LMDB uses a memory mapped file. If this file is modified in or out of process, + // it can cause undefined behavior. There is nothing else in the Oxen codebase that + // modifies this mmap'd file. Nothing else access `db_location` except for this code. + // This uses LMDB's own opening code, which does lock the file to check for and + // prevent concurrent access. The fact that it is a "hidden" file helps somewhat: it will + // be unlikely for someone or something to accidentily stumble upon it and mess things up. + unsafe { options.open(db_location).map_err(LmdbError::Access)? } + }; + + // create these two key-value tables + let mut wtxn = lmdb_env.write_txn().map_err(LmdbError::Access)?; + let merkle_tree_nodes = lmdb_env + .create_database::(&mut wtxn, Some(DB_NODES)) + .map_err(LmdbError::Write)?; + let merkle_links = lmdb_env + .create_database::(&mut wtxn, Some(DB_LINKS)) + .map_err(LmdbError::Write)?; + wtxn.commit().map_err(LmdbError::Write)?; + log::debug!( + "Ensure tables '{DB_NODES}' (Merkle nodes) and '{DB_LINKS}' (tree connections) exist" + ); + + Ok(Self { + repo_root, + lmdb_env, + merkle_tree_nodes, + merkle_links, + }) + } + + /// Private helper for creating a new LMDB write transaction. + /// Returns [`LmdbError::Access`] on error. + pub(in crate::core::db::merkle_node::lmdb) fn write_txn<'a>( + lmdb_env: &'a heed::Env, + ) -> Result, LmdbError> { + lmdb_env.write_txn().map_err(LmdbError::Access) + } + + /// Private helper for creating a new LMDB read transaction. + /// Returns [`LmdbError::Access`] on error. + pub(in crate::core::db::merkle_node::lmdb) fn read_txn<'a>( + &'a self, + ) -> Result, LmdbError> { + self.lmdb_env.read_txn().map_err(LmdbError::Access) + } + + /// Retrieve the raw stored value bytes, or `None` if no entry exists for the key. + /// + /// The returned slice borrows directly into the LMDB read transaction's mmap + /// region — no copy, no parse. Callers wrap it in [`super::value_structs::LmdbNodeRef`] + /// or [`super::value_structs::LmdbLinkRef`] to get a typed zero-copy view. + /// + /// Generic over the `TLS` marker so the helper works with whatever thread local storage + /// choice the [`Env`] was opened with. The `Deref` bound is what lets `db.get(rtxn, ..)` + /// accept `&heed::RoTxn<'a, T>` — heed's `Database::get` requires `&RoTxn<'_, AnyTls>` + /// and both `WithTls` and `WithoutTls` deref to it. + /// + /// Returns an [`LmdbError::Retrieve`] on error. + #[inline(always)] + pub(super) fn retrieve_bytes<'a, TLS>( + rtxn: &'a heed::RoTxn<'a, TLS>, + db: &Database, + key: &MerkleHash, + ) -> Result, LmdbError> + where + heed::RoTxn<'a, TLS>: std::ops::Deref>, + { + db.get(rtxn, &key.to_u128()).map_err(LmdbError::Retrieve) + } + + /// True if the given key exists in the database. False otherwise. + /// Does no decoding or other handling of the stored value. + /// + /// Generic over the `TLS` marker — see [`Self::retrieve`] for the rationale on the bound. + /// + /// Returns an [`LmdbError::Retrieve`] on error. + #[inline(always)] + pub(super) fn key_present<'a, TLS>( + rtxn: &heed::RoTxn<'a, TLS>, + db: &Database, + key: &MerkleHash, + ) -> Result + where + heed::RoTxn<'a, TLS>: std::ops::Deref>, + { + Ok(db + .remap_data_type::() + .get(rtxn, &key.to_u128()) + .map_err(LmdbError::Retrieve)? + .is_some()) + } + + /// Serialize the value and put it into the database with the given key. + /// Is an error if serialization fails (heed::Error::Encoding). + /// + /// Returns an [`LmdbError::Write`] on error. + #[inline(always)] + pub(super) fn put_serialized<'a, L, S: Fn(L) -> Vec>( + wtxn: &mut heed::RwTxn<'a>, + db: &Database, + key: &MerkleHash, + serialize: S, + value: L, + ) -> Result<(), LmdbError> { + let as_bytes = serialize(value); + db.put(wtxn, &key.to_u128(), &as_bytes) + .map_err(LmdbError::Write) + } + + /// Checks if a node with the given `hash` exists in the store. + /// + /// Unlike [`MerkleReader::exists`], this method will return `Ok(true)` for file nodes + /// if the file exists in the tree. + pub fn full_exists(&self, hash: &MerkleHash) -> Result { + let exists = Self::key_present(&self.read_txn()?, &self.merkle_tree_nodes, hash)?; + Ok(exists) + } + + /// Retrieves the node with the given `hash` from the store. `None` means no such node exists. + /// + /// Unlike [`MerkleReader::get_node`], this method will return `Ok(Some(.))` for file nodes + /// if the file exists in the tree. + /// + /// Internally reads the value bytes directly from LMDB's mmap and decodes via + /// [`LmdbNode::decode`], which uses the zero-copy [`super::value_structs::LmdbNodeRef`] + /// path to validate the header before copying out the msgpack tail. + pub fn full_get_node(&self, hash: &MerkleHash) -> Result, LmdbError> { + let rtxn = self.read_txn()?; + let Some(bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_tree_nodes, hash)? else { + return Ok(None); + }; + Ok(Some(LmdbNode::decode(bytes)?)) + } + + /// Retreives the node's parent link (if present) and its children. Returns Ok(None) + /// if there is no node for the given hash. + pub fn get_links(&self, hash: &MerkleHash) -> Result, LmdbError> { + let rtxn = self.read_txn()?; + let Some(bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_links, hash)? else { + return Ok(None); + }; + Ok(Some(LmdbLink::decode(bytes)?)) + } +} + +/// The name of the LMDB directory as it exists in the repository's `.oxen/` hidden directory. +/// LMDB's actual physical contents and lock files are stored within this directory. +const OXEN_LMDB_MERKLE_DIR: &str = "lmdb_merkle_tree_store"; + +/// The complete filepath to the LMDB file for the given repository. +pub(crate) fn lmdb_dir_location(repo_root: &Path) -> PathBuf { + repo_root.join(OXEN_HIDDEN_DIR).join(OXEN_LMDB_MERKLE_DIR) +} + +/// Only displays the repository root and LMDB environment settings. +impl std::fmt::Debug for LmdbBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LmdbBackend") + .field("repo_root", &self.repo_root) + .field("lmdb_env", &self.lmdb_env.info()) + .finish() + } +} diff --git a/crates/lib/src/core/db/merkle_node/lmdb/reader.rs b/crates/lib/src/core/db/merkle_node/lmdb/reader.rs new file mode 100644 index 000000000..aee14771a --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/lmdb/reader.rs @@ -0,0 +1,250 @@ +use crate::core::db::merkle_node::LmdbBackend; +use crate::core::db::merkle_node::lmdb::{ + LmdbError, + value_structs::{LmdbLinkRef, LmdbNodeRef}, +}; +use crate::error::OxenError; +use crate::model::MerkleHash; +use crate::model::merkle_tree::merkle_reader::{MerkleEntry, MerkleReader}; +use crate::model::merkle_tree::node::{EMerkleTreeNode, MerkleTreeNode, MerkleTreeNodeType}; + +/// Implements the [`MerkleReader`] trait for the [`LmdbBackend`]. +/// +/// LMDB encourages short-lived read transactions, so each method opens its own +/// rtxn and uses it for the entire scope of the read. Inside that scope all +/// access is zero-copy: bytes returned by [`LmdbBackend::retrieve_bytes`] are +/// borrowed directly from the LMDB mmap and wrapped in [`LmdbNodeRef`] / +/// [`LmdbLinkRef`] without intermediate parsing or `Vec` allocation. The msgpack +/// decode of the node payload runs against the borrowed slice directly. +impl MerkleReader for LmdbBackend { + /// Checks if a node with the given `hash` exists in the store. + /// + /// NOTE: [`MerkleReader`]'s methods are **NOT** defined for files. + /// This will return `Ok(false)` on a file node that exists. + fn exists(&self, hash: &MerkleHash) -> Result { + let rtxn = self.read_txn()?; + let Some(bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_tree_nodes, hash)? else { + return Ok(false); + }; + let node_ref = LmdbNodeRef::from_bytes(bytes)?; + use MerkleTreeNodeType::*; + match node_ref.kind()? { + File | FileChunk => Ok(false), + Commit | Dir | VNode => Ok(true), + } + } + + /// Retrieves the node with the given `hash` from the store. `None` means no such node exists. + /// + /// NOTE: to comply with [`MerkleReader::get_node`]'s semantics, this method + /// has to consider present file nodes as not existing. + fn get_node(&self, hash: &MerkleHash) -> Result, OxenError> { + let rtxn = self.read_txn()?; + + // ── Read the node entry (zero-copy view into mmap). ──────────────── + let Some(node_bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_tree_nodes, hash)? else { + return Ok(None); + }; + let node_ref = LmdbNodeRef::from_bytes(node_bytes)?; + let kind = node_ref.kind()?; + use MerkleTreeNodeType::*; + if matches!(kind, File | FileChunk) { + // Trait contract: file-typed nodes are reported as absent. + return Ok(None); + } + + // ── Read the link entry to recover the parent_id. ────────────────── + let Some(link_bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_links, hash)? else { + // Node exists but no link row — table-cross integrity violation. + return Err(LmdbError::IntegrityNoLink(hash.to_hex_hash()).into()); + }; + let link_ref = LmdbLinkRef::from_bytes(link_bytes)?; + let parent_id = link_ref.parent_id(); + + // ── Decode the EMerkleTreeNode from the borrowed msgpack tail. ───── + Ok(Some(MerkleEntry { + node: EMerkleTreeNode::from_type_and_bytes(kind, node_ref.data)?, + parent_id, + })) + } + + /// Retrieves the children of the node with the given `hash` from the store. + /// An empty vec means that either the node is not a directory or virtual node, + /// or it is one but has no children. + fn get_children( + &self, + hash: &MerkleHash, + ) -> Result, OxenError> { + let rtxn = self.read_txn()?; + + let Some(link_bytes) = Self::retrieve_bytes(&rtxn, &self.merkle_links, hash)? else { + // Existing semantics: missing link is treated as "no children". + return Ok(Vec::new()); + }; + let link_ref = LmdbLinkRef::from_bytes(link_bytes)?; + + let mut loaded = Vec::with_capacity(link_ref.num_children()); + for child_hash in link_ref.children_iter() { + let Some(child_bytes) = + Self::retrieve_bytes(&rtxn, &self.merkle_tree_nodes, &child_hash)? + else { + return Err(LmdbError::IntegrityNoHash(child_hash.to_hex_hash()).into()); + }; + let child_ref = LmdbNodeRef::from_bytes(child_bytes)?; + let child_kind = child_ref.kind()?; + loaded.push(( + child_hash, + MerkleTreeNode { + node: EMerkleTreeNode::from_type_and_bytes(child_kind, child_ref.data)?, + hash: child_hash, + parent_id: Some(*hash), + children: vec![], + }, + )); + } + Ok(loaded) + } +} + +#[cfg(test)] +mod tests { + + // ──────────────────────────────────────────────────────────────────────────── + // Reader semantics: file/file-chunk vs everything else. + // The trait `MerkleReader::{exists, get_node}` treats file-typed nodes as absent. + // The inherent `LmdbBackend::{full_exists, full_get_node}` see them. + // ──────────────────────────────────────────────────────────────────────────── + + use crate::{ + core::db::merkle_node::lmdb::tests::{ + commit_with_hash, dir_with_hash, file_chunk_node_with_hash, file_node_with_hash, h, + vnode_with_hash, with_test_backend, write_one, + }, + error::OxenError, + model::MerkleTreeNodeType, + model::merkle_tree::merkle_reader::MerkleReader, + }; + + #[test] + fn test_get_node_returns_none_for_file_node() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let f_h = h("11111111111111111111111111111111"); + let f = file_node_with_hash(repo, f_h); + write_one(backend, &f, None)?; + // The trait says `get_node` must treat file nodes as absent. + assert!(backend.get_node(&f_h)?.is_none()); + Ok(()) + }) + } + + #[test] + fn test_get_node_returns_none_for_file_chunk_node() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let c_h = h("22222222222222222222222222222222"); + let c = file_chunk_node_with_hash(c_h); + write_one(backend, &c, None)?; + assert!(backend.get_node(&c_h)?.is_none()); + Ok(()) + }) + } + + #[test] + fn test_exists_returns_false_for_file_node() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let f_h = h("33333333333333333333333333333333"); + let f = file_node_with_hash(repo, f_h); + write_one(backend, &f, None)?; + // `exists` mirrors `get_node`'s file-as-absent semantics. + assert!(!backend.exists(&f_h)?); + Ok(()) + }) + } + + #[test] + fn test_exists_returns_false_for_file_chunk_node() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let c_h = h("44444444444444444444444444444444"); + let c = file_chunk_node_with_hash(c_h); + write_one(backend, &c, None)?; + assert!(!backend.exists(&c_h)?); + Ok(()) + }) + } + + #[test] + fn test_full_get_node_returns_some_for_file_node() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let f_h = h("55555555555555555555555555555555"); + let f = file_node_with_hash(repo, f_h); + write_one(backend, &f, None)?; + let stored = backend + .full_get_node(&f_h)? + .expect("file node should be stored"); + assert_eq!(stored.kind, MerkleTreeNodeType::File); + Ok(()) + }) + } + + #[test] + fn test_full_get_node_returns_some_for_file_chunk_node() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let c_h = h("66666666666666666666666666666666"); + let c = file_chunk_node_with_hash(c_h); + write_one(backend, &c, None)?; + let stored = backend + .full_get_node(&c_h)? + .expect("file chunk node should be stored"); + assert_eq!(stored.kind, MerkleTreeNodeType::FileChunk); + Ok(()) + }) + } + + #[test] + fn test_full_exists_returns_true_for_file_and_chunk_nodes() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let f_h = h("77777777777777777777777777777777"); + let c_h = h("88888888888888888888888888888888"); + let f = file_node_with_hash(repo, f_h); + let c = file_chunk_node_with_hash(c_h); + write_one(backend, &f, None)?; + write_one(backend, &c, None)?; + assert!(backend.full_exists(&f_h)?); + assert!(backend.full_exists(&c_h)?); + Ok(()) + }) + } + + /// Across all five node kinds: the trait's `exists` matches `full_exists` for + /// non-file kinds, and disagrees for file/file-chunk. + #[test] + fn test_exists_vs_full_exists_for_each_node_kind() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let commit_h = h("11111111111111111111111111111111"); + let dir_h = h("22222222222222222222222222222222"); + let vnode_h = h("33333333333333333333333333333333"); + let file_h = h("44444444444444444444444444444444"); + let chunk_h = h("55555555555555555555555555555555"); + + write_one(backend, &commit_with_hash(repo, commit_h), None)?; + write_one(backend, &dir_with_hash(repo, dir_h), None)?; + write_one(backend, &vnode_with_hash(repo, vnode_h), None)?; + write_one(backend, &file_node_with_hash(repo, file_h), None)?; + write_one(backend, &file_chunk_node_with_hash(chunk_h), None)?; + + // full_exists sees everything. + for hash in [&commit_h, &dir_h, &vnode_h, &file_h, &chunk_h] { + assert!( + backend.full_exists(hash)?, + "full_exists should be true for {hash}" + ); + } + // exists agrees on commit/dir/vnode, hides file/file-chunk. + assert!(backend.exists(&commit_h)?); + assert!(backend.exists(&dir_h)?); + assert!(backend.exists(&vnode_h)?); + assert!(!backend.exists(&file_h)?); + assert!(!backend.exists(&chunk_h)?); + Ok(()) + }) + } +} diff --git a/crates/lib/src/core/db/merkle_node/lmdb/value_structs.rs b/crates/lib/src/core/db/merkle_node/lmdb/value_structs.rs new file mode 100644 index 000000000..c5965560f --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/lmdb/value_structs.rs @@ -0,0 +1,662 @@ +//! Zero-copy on-disk byte layouts for the LMDB Merkle tree backend. +//! +//! Every persisted value is `magic + version + fixed header + variable tail`. +//! The fixed headers are `#[repr(C)]` with alignment-1 fields so they can be +//! cast directly from LMDB-returned `&[u8]` regardless of pointer alignment; +//! the variable tail is interpreted via `<[T]>::ref_from_bytes`. +//! +//! Reads return [`LmdbNodeRef`] / [`LmdbLinkRef`] borrowed into the LMDB read +//! transaction's mmap region — no copying, no parsing. Writes go through the +//! owned [`LmdbNode`] / [`LmdbLink`] convenience types which encode the bytes +//! contiguously for [`heed::Database::put`]. +//! +//! Schema evolution is by versioning: never mutate `LmdbNodeHeaderV1` / +//! `LmdbLinkHeaderV1` after release. Add a `V2` variant to [`NodeVersion`] / +//! [`LinkVersion`] alongside a `LmdbNodeHeaderV2` / `LmdbLinkHeaderV2` and +//! dispatch on the version byte. + +use std::mem::size_of; + +// NOTE: All numeric values stored in LMDB are always in little-endian format. This ensures that +// LMDB files always contain the exact same data. Nearly every single processor architecture +// that we expect oxen to run on will be in little endian, so using these values means that +// all conversion operations are no-ops. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +use zerocopy::byteorder::little_endian::U64; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, TryFromBytes, Unaligned}; + +use crate::core::db::merkle_node::lmdb::LmdbError; +use crate::model::{MerkleHash, MerkleTreeNodeType}; + +/// 4-byte magic prefix for `merkle_tree_nodes` values. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +pub(super) const NODE_MAGIC: [u8; 4] = *b"OXNV"; + +/// 4-byte magic prefix for `merkle_links` values. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +pub(super) const LINK_MAGIC: [u8; 4] = *b"OXLN"; + +// ───────────────────────────────────────────────────────────────────────────── +// Version enums. These are real Rust enums in code, single-byte on disk. +// +// `#[repr(u8)]` pins the discriminant size to 1 byte and avoids endianness +// concerns (a 2-byte+ repr would use native endianness for the discriminant, +// which is a cross-platform hazard for persisted data). 256 versions is far +// more headroom than we'll ever exhaust. +// +// `TryFromBytes` validates that the byte matches a declared variant during +// `try_ref_from_prefix`. `IntoBytes` lets `header.as_bytes()` work. +// ───────────────────────────────────────────────────────────────────────────── + +/// On-disk version tag for [`LmdbNodeHeaderV1`]. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +#[derive( + TryFromBytes, IntoBytes, KnownLayout, Immutable, Unaligned, Copy, Clone, Debug, PartialEq, Eq, +)] +#[repr(u8)] +pub enum NodeVersion { + V1 = 1, +} + +/// On-disk version tag for [`LmdbLinkHeaderV1`]. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +#[derive( + TryFromBytes, IntoBytes, KnownLayout, Immutable, Unaligned, Copy, Clone, Debug, PartialEq, Eq, +)] +#[repr(u8)] +pub enum LinkVersion { + V1 = 1, +} + +// ───────────────────────────────────────────────────────────────────────────── +// Node header — 8 bytes total, kind discriminant in the header, msgpack tail. +// ───────────────────────────────────────────────────────────────────────────── + +/// Fixed header of a `merkle_tree_nodes` value. +/// +/// Followed in storage by the msgpack-serialized +/// [`crate::model::merkle_tree::node::EMerkleTreeNode`] payload. Size and +/// offsets are pinned by tests so a future field reorder breaks CI immediately +/// rather than silently corrupting on-disk data. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +// ****************** CHANGING STRUCT LAYOUT IS A BREAKING CHANGE!!! ****************** +// ****************** REMOVING `repr(c)` IS A BREAKING CHANGE!!!!!!! ****************** +#[derive(TryFromBytes, IntoBytes, KnownLayout, Immutable, Unaligned, Copy, Clone, Debug)] +#[repr(C)] +pub(super) struct LmdbNodeHeaderV1 { + pub magic: [u8; 4], + pub version: NodeVersion, + pub kind: u8, + pub _reserved: [u8; 2], +} + +impl LmdbNodeHeaderV1 { + pub const SIZE: usize = size_of::(); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Link header — 32 bytes, parent slot is always reserved (zeros if unused), +// children follow as `[[u8; 16]; num_children]` in little-endian. +// ───────────────────────────────────────────────────────────────────────────── + +/// Fixed header of a `merkle_links` value. +/// +/// Followed in storage by `num_children` children, each a 16-byte little-endian +/// merkle hash. The 16-byte `parent` slot is always written; consumers must +/// only read it when `has_parent == 1`. +// WARNING: DO NOT CHANGE THIS!!!! +// MIGRATION WARNING: CHANGING THIS **REQUIRES** A MIGRATION! +// ****************** CHANGING STRUCT LAYOUT IS A BREAKING CHANGE!!! ****************** +// ****************** REMOVING `repr(c)` IS A BREAKING CHANGE!!!!!!! ****************** +#[derive(TryFromBytes, IntoBytes, KnownLayout, Immutable, Unaligned, Copy, Clone, Debug)] +#[repr(C)] +pub(super) struct LmdbLinkHeaderV1 { + pub magic: [u8; 4], + pub version: LinkVersion, + pub has_parent: u8, + pub _reserved: [u8; 2], + pub num_children: U64, + pub parent: [u8; 16], +} + +impl LmdbLinkHeaderV1 { + pub const SIZE: usize = size_of::(); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Zero-copy reader views. +// `'a` is the LMDB read transaction's lifetime: the borrows end when the rtxn +// drops, which the borrow checker enforces. +// ───────────────────────────────────────────────────────────────────────────── + +/// Zero-copy view over a `merkle_tree_nodes` value. +/// +/// `header` is a typed view into the first [`LmdbNodeHeaderV1::SIZE`] bytes; +/// `data` is the msgpack tail still residing in the LMDB mmap. The msgpack +/// decoder runs against `data` directly when an owned node is needed. +#[derive(Debug)] +pub(crate) struct LmdbNodeRef<'a> { + pub header: &'a LmdbNodeHeaderV1, + pub data: &'a [u8], +} + +impl<'a> LmdbNodeRef<'a> { + /// Parse a stored `merkle_tree_nodes` value with no copying. + /// + /// Validation order is chosen for precise error reporting: + /// 1. Length check → [`LmdbError::NodeHeaderTruncated`]. + /// 2. Magic check → [`LmdbError::NodeBadMagic`]. + /// 3. `try_ref_from_prefix` → its only remaining failure mode is an + /// invalid `NodeVersion` byte, which we map to + /// [`LmdbError::NodeUnsupportedVersion`]. + pub(crate) fn from_bytes(bytes: &'a [u8]) -> Result { + if bytes.len() < LmdbNodeHeaderV1::SIZE { + return Err(LmdbError::NodeHeaderTruncated { len: bytes.len() }); + } + let magic: [u8; 4] = bytes[0..4].try_into().expect("len pre-checked"); + if magic != NODE_MAGIC { + return Err(LmdbError::NodeBadMagic { actual: magic }); + } + let (header, tail) = LmdbNodeHeaderV1::try_ref_from_prefix(bytes).map_err(|_| { + // Length and magic pre-checked; only `NodeVersion` validity can fail. + LmdbError::NodeUnsupportedVersion(bytes[4]) + })?; + Ok(Self { header, data: tail }) + } + + /// Decode the kind discriminant. Errors if the byte isn't a known + /// [`MerkleTreeNodeType`]. + pub(crate) fn kind(&self) -> Result { + Ok(MerkleTreeNodeType::from_u8(self.header.kind)?) + } +} + +/// Zero-copy view over a `merkle_links` value. +#[derive(Debug)] +pub(crate) struct LmdbLinkRef<'a> { + pub header: &'a LmdbLinkHeaderV1, + /// Children stored as `[u8; 16]` little-endian merkle hashes. Length is + /// validated against `header.num_children` at parse time. + pub child_hashes: &'a [[u8; 16]], +} + +impl<'a> LmdbLinkRef<'a> { + /// Parse a stored `merkle_links` value with no copying. See + /// [`LmdbNodeRef::from_bytes`] for the validation-order rationale. + pub(crate) fn from_bytes(bytes: &'a [u8]) -> Result { + if bytes.len() < LmdbLinkHeaderV1::SIZE { + return Err(LmdbError::LinkHeaderTruncated { len: bytes.len() }); + } + let magic: [u8; 4] = bytes[0..4].try_into().expect("len pre-checked"); + if magic != LINK_MAGIC { + return Err(LmdbError::LinkBadMagic { actual: magic }); + } + let (header, tail) = LmdbLinkHeaderV1::try_ref_from_prefix(bytes).map_err(|_| { + // Length and magic pre-checked; only `LinkVersion` validity can fail + // structurally — `has_parent` is `u8` so it's always a valid bit + // pattern (we validate its semantic range below). + LmdbError::LinkUnsupportedVersion(bytes[4]) + })?; + if header.has_parent > 1 { + return Err(LmdbError::InvalidIsParent(header.has_parent)); + } + let child_hashes = + <[[u8; 16]]>::ref_from_bytes(tail).map_err(|_| LmdbError::ChildrenTailMisaligned { + tail_len: tail.len(), + })?; + let claimed = header.num_children.get() as usize; + if child_hashes.len() != claimed { + return Err(LmdbError::ChildrenCountMismatch { + claimed, + actual: child_hashes.len(), + }); + } + Ok(Self { + header, + child_hashes, + }) + } + + pub(crate) fn parent_id(&self) -> Option { + if self.header.has_parent == 1 { + Some(MerkleHash::new(u128::from_le_bytes(self.header.parent))) + } else { + None + } + } + + pub(crate) fn children_iter(&self) -> impl Iterator + '_ { + self.child_hashes + .iter() + .map(|h| MerkleHash::new(u128::from_le_bytes(*h))) + } + + pub(crate) fn num_children(&self) -> usize { + self.child_hashes.len() + } +} + +// ───────────────────────────────────────────────────────────────────────────── +// Owned write-side types. +// These exist for write paths that build values in memory; encode produces the +// on-disk byte format. Reads should prefer the `Ref` types. +// ───────────────────────────────────────────────────────────────────────────── + +/// In-memory representation of a `merkle_tree_nodes` row. +/// +/// Properties: +/// - File nodes do not have children (those live in `merkle_links`). +#[derive(Debug, Clone)] +pub struct LmdbNode { + pub(crate) kind: MerkleTreeNodeType, + /// msgpack-serialized [`crate::model::merkle_tree::node::EMerkleTreeNode`]. + pub(crate) data: Vec, +} + +impl LmdbNode { + /// Encode for storage in `merkle_tree_nodes`. Produces a single contiguous + /// `Vec` of `[header][msgpack_tail]`. The header is a [`LmdbNodeHeaderV1`]. + pub(super) fn encode(self) -> Vec { + let header = LmdbNodeHeaderV1 { + magic: NODE_MAGIC, + version: NodeVersion::V1, + kind: self.kind.to_u8(), + _reserved: [0; 2], + }; + let mut out = Vec::with_capacity(LmdbNodeHeaderV1::SIZE + self.data.len()); + out.extend_from_slice(header.as_bytes()); + out.extend_from_slice(&self.data); + out + } + + /// Eager-decode from on-disk bytes — copies the msgpack tail into a fresh + /// `Vec`. Prefer [`LmdbNodeRef::from_bytes`] when zero-copy is possible. + pub(super) fn decode(bytes: &[u8]) -> Result { + let r = LmdbNodeRef::from_bytes(bytes)?; + Ok(Self { + kind: r.kind()?, + data: r.data.to_vec(), + }) + } + + /// The type of Merkle tree node that's serialized. + pub fn kind(&self) -> MerkleTreeNodeType { + self.kind + } + + /// A reference to the msgpack-encoded bytes of the node. + pub fn data(&self) -> &[u8] { + &self.data + } +} + +/// In-memory representation of a `merkle_links` row. +/// +/// Properties: +/// - Commit nodes do not have a parent (so their `parent_id` is None). +/// - File nodes do not have children. +#[derive(Debug, Clone)] +pub struct LmdbLink { + pub(crate) parent_id: Option, + /// The children of this node, if it has any. + pub(crate) children: Vec, +} + +impl LmdbLink { + /// Encode for storage in `merkle_links`. Produces `[header][child_hashes]`. + /// The header is a [`LmdbLinkHeaderV1`]. + pub(super) fn encode(self) -> Vec { + let (has_parent, parent_bytes) = match self.parent_id { + Some(p) => (1u8, p.to_le_bytes()), + None => (0u8, [0u8; 16]), + }; + let header = LmdbLinkHeaderV1 { + magic: LINK_MAGIC, + version: LinkVersion::V1, + has_parent, + _reserved: [0; 2], + num_children: U64::new(self.children.len() as u64), + parent: parent_bytes, + }; + let mut out = + Vec::with_capacity(LmdbLinkHeaderV1::SIZE + self.children.len() * size_of::()); + out.extend_from_slice(header.as_bytes()); + for c in &self.children { + out.extend_from_slice(&c.to_le_bytes()); + } + out + } + + /// Eager-decode from on-disk bytes. Prefer [`LmdbLinkRef::from_bytes`] when + /// zero-copy is possible. + pub(super) fn decode(bytes: &[u8]) -> Result { + let r = LmdbLinkRef::from_bytes(bytes)?; + Ok(Self { + parent_id: r.parent_id(), + children: r.children_iter().collect(), + }) + } + + /// A reference to this node's optional parent hash. + pub fn parent_id(&self) -> Option<&MerkleHash> { + self.parent_id.as_ref() + } + + /// A reference to this node's children. + pub fn children(&self) -> &[MerkleHash] { + &self.children + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::mem::offset_of; + + // ── LmdbNodeHeaderV1 layout ────────────────────────────────────────────── + + #[test] + fn node_header_v1_size_is_stable() { + assert_eq!(size_of::(), 8); + assert_eq!(LmdbNodeHeaderV1::SIZE, 8); + } + + #[test] + fn node_header_v1_offsets_are_stable() { + assert_eq!(offset_of!(LmdbNodeHeaderV1, magic), 0); + assert_eq!(offset_of!(LmdbNodeHeaderV1, version), 4); + assert_eq!(offset_of!(LmdbNodeHeaderV1, kind), 5); + assert_eq!(offset_of!(LmdbNodeHeaderV1, _reserved), 6); + } + + #[test] + fn node_header_v1_golden_bytes() { + let h = LmdbNodeHeaderV1 { + magic: NODE_MAGIC, + version: NodeVersion::V1, + kind: 7, + _reserved: [0; 2], + }; + let expected: &[u8] = &[ + // magic "OXNV" + 0x4f, 0x58, 0x4e, 0x56, // + // version = NodeVersion::V1 (discriminant = 1) + 0x01, // + // kind = 7 + 0x07, // + // _reserved + 0x00, 0x00, + ]; + assert_eq!(h.as_bytes(), expected); + } + + // ── LmdbLinkHeaderV1 layout ────────────────────────────────────────────── + + #[test] + fn link_header_v1_size_is_stable() { + assert_eq!(size_of::(), 32); + assert_eq!(LmdbLinkHeaderV1::SIZE, 32); + } + + #[test] + fn link_header_v1_offsets_are_stable() { + assert_eq!(offset_of!(LmdbLinkHeaderV1, magic), 0); + assert_eq!(offset_of!(LmdbLinkHeaderV1, version), 4); + assert_eq!(offset_of!(LmdbLinkHeaderV1, has_parent), 5); + assert_eq!(offset_of!(LmdbLinkHeaderV1, _reserved), 6); + assert_eq!(offset_of!(LmdbLinkHeaderV1, num_children), 8); + assert_eq!(offset_of!(LmdbLinkHeaderV1, parent), 16); + } + + #[test] + fn link_header_v1_golden_bytes_with_parent() { + let h = LmdbLinkHeaderV1 { + magic: LINK_MAGIC, + version: LinkVersion::V1, + has_parent: 1, + _reserved: [0; 2], + num_children: U64::new(3), + parent: [0xab; 16], + }; + let expected: &[u8] = &[ + // magic "OXLN" + 0x4f, 0x58, 0x4c, 0x4e, // + // version = LinkVersion::V1 (discriminant = 1) + 0x01, // + // has_parent = 1 + 0x01, // + // _reserved + 0x00, 0x00, // + // num_children = 3 LE + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // + // parent [0xab; 16] + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, // + 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, 0xab, + ]; + assert_eq!(h.as_bytes(), expected); + } + + // ── Version enums ──────────────────────────────────────────────────────── + + /// Sanity check that the version enums occupy exactly 1 byte. Catches a + /// future repr change before it silently breaks the header layout. + #[test] + fn version_enums_are_one_byte() { + assert_eq!(size_of::(), 1); + assert_eq!(size_of::(), 1); + assert_eq!((NodeVersion::V1 as u8), 1); + assert_eq!((LinkVersion::V1 as u8), 1); + } + + // ── Round-trips ────────────────────────────────────────────────────────── + + #[test] + fn lmdb_node_round_trip() { + let n = LmdbNode { + kind: MerkleTreeNodeType::Commit, + data: vec![1, 2, 3, 4, 5], + }; + let bytes = n.clone().encode(); + let r = LmdbNodeRef::from_bytes(&bytes).expect("ref parse"); + assert_eq!(r.kind().expect("kind"), MerkleTreeNodeType::Commit); + assert_eq!(r.header.version, NodeVersion::V1); + assert_eq!(r.data, &[1, 2, 3, 4, 5][..]); + + let owned = LmdbNode::decode(&bytes).expect("owned decode"); + assert_eq!(owned.kind, MerkleTreeNodeType::Commit); + assert_eq!(owned.data, vec![1, 2, 3, 4, 5]); + } + + #[test] + fn lmdb_link_round_trip_no_parent_no_children() { + let l = LmdbLink { + parent_id: None, + children: vec![], + }; + let bytes = l.encode(); + assert_eq!(bytes.len(), LmdbLinkHeaderV1::SIZE); + let r = LmdbLinkRef::from_bytes(&bytes).expect("ref parse"); + assert_eq!(r.header.version, LinkVersion::V1); + assert_eq!(r.parent_id(), None); + assert_eq!(r.num_children(), 0); + } + + #[test] + fn lmdb_link_round_trip_with_parent_and_many_children() { + let parent = MerkleHash::new(0xDEAD_BEEF); + let children: Vec = (0..7) + .map(|i| MerkleHash::new(0xC0DE_0000 + i as u128)) + .collect(); + let l = LmdbLink { + parent_id: Some(parent), + children: children.clone(), + }; + let bytes = l.encode(); + let r = LmdbLinkRef::from_bytes(&bytes).expect("ref parse"); + assert_eq!(r.parent_id(), Some(parent)); + assert_eq!(r.num_children(), 7); + let returned: Vec = r.children_iter().collect(); + assert_eq!(returned, children); + } + + #[test] + fn lmdb_node_ref_round_trip_unaligned() { + // Force a misaligned source pointer by prepending a byte; the ref + // parse must still succeed because every header field is alignment-1. + let n = LmdbNode { + kind: MerkleTreeNodeType::Dir, + data: vec![0xCD; 17], + }; + let canonical = n.encode(); + let mut buf = vec![0u8]; + buf.extend_from_slice(&canonical); + let unaligned = &buf[1..]; + let r = LmdbNodeRef::from_bytes(unaligned).expect("must parse unaligned"); + assert_eq!(r.kind().expect("kind"), MerkleTreeNodeType::Dir); + assert_eq!(r.data, &vec![0xCD; 17][..]); + } + + #[test] + fn lmdb_link_ref_round_trip_unaligned() { + let l = LmdbLink { + parent_id: Some(MerkleHash::new(42)), + children: vec![MerkleHash::new(1), MerkleHash::new(2)], + }; + let canonical = l.encode(); + let mut buf = vec![0u8]; + buf.extend_from_slice(&canonical); + let unaligned = &buf[1..]; + let r = LmdbLinkRef::from_bytes(unaligned).expect("must parse unaligned"); + assert_eq!(r.parent_id(), Some(MerkleHash::new(42))); + assert_eq!(r.num_children(), 2); + } + + #[test] + fn lmdb_node_bad_magic_rejected() { + let mut bytes = LmdbNode { + kind: MerkleTreeNodeType::Commit, + data: vec![], + } + .encode(); + bytes[0] = b'X'; + let err = LmdbNodeRef::from_bytes(&bytes).unwrap_err(); + assert!(matches!(err, LmdbError::NodeBadMagic { .. }), "{err:?}"); + } + + #[test] + fn lmdb_node_unsupported_version_rejected() { + let mut bytes = LmdbNode { + kind: MerkleTreeNodeType::Commit, + data: vec![], + } + .encode(); + bytes[4] = 99; // not a known NodeVersion discriminant + let err = LmdbNodeRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!(err, LmdbError::NodeUnsupportedVersion(99)), + "{err:?}" + ); + } + + #[test] + fn lmdb_link_unsupported_version_rejected() { + let mut bytes = LmdbLink { + parent_id: None, + children: vec![], + } + .encode(); + bytes[4] = 42; // not a known LinkVersion discriminant + let err = LmdbLinkRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!(err, LmdbError::LinkUnsupportedVersion(42)), + "{err:?}" + ); + } + + #[test] + fn lmdb_node_truncated_header_rejected() { + let bytes = vec![0u8; 5]; // too small for the 8-byte header + let err = LmdbNodeRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!(err, LmdbError::NodeHeaderTruncated { .. }), + "{err:?}" + ); + } + + #[test] + fn lmdb_link_truncated_header_rejected() { + let bytes = vec![0u8; 10]; // too small for the 32-byte header + let err = LmdbLinkRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!(err, LmdbError::LinkHeaderTruncated { .. }), + "{err:?}" + ); + } + + #[test] + fn lmdb_link_misaligned_tail_rejected() { + // Build a valid header that claims 1 child, then put 15 bytes of tail + // (not divisible by 16) — must fail with ChildrenTailMisaligned. + let header = LmdbLinkHeaderV1 { + magic: LINK_MAGIC, + version: LinkVersion::V1, + has_parent: 0, + _reserved: [0; 2], + num_children: U64::new(1), + parent: [0; 16], + }; + let mut bytes = Vec::new(); + bytes.extend_from_slice(header.as_bytes()); + bytes.extend_from_slice(&[0u8; 15]); + let err = LmdbLinkRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!(err, LmdbError::ChildrenTailMisaligned { .. }), + "{err:?}" + ); + } + + #[test] + fn lmdb_link_count_mismatch_rejected() { + // Header claims 5 children but only 1 follows. + let header = LmdbLinkHeaderV1 { + magic: LINK_MAGIC, + version: LinkVersion::V1, + has_parent: 0, + _reserved: [0; 2], + num_children: U64::new(5), + parent: [0; 16], + }; + let mut bytes = Vec::new(); + bytes.extend_from_slice(header.as_bytes()); + bytes.extend_from_slice(&[0u8; 16]); + let err = LmdbLinkRef::from_bytes(&bytes).unwrap_err(); + assert!( + matches!( + err, + LmdbError::ChildrenCountMismatch { + claimed: 5, + actual: 1 + } + ), + "{err:?}" + ); + } + + #[test] + fn lmdb_link_invalid_is_parent_rejected() { + let mut bytes = LmdbLink { + parent_id: None, + children: vec![], + } + .encode(); + bytes[5] = 2; // has_parent must be 0 or 1 (offset is 5 now, post-shrink) + let err = LmdbLinkRef::from_bytes(&bytes).unwrap_err(); + assert!(matches!(err, LmdbError::InvalidIsParent(2)), "{err:?}"); + } +} diff --git a/crates/lib/src/core/db/merkle_node/lmdb/writer.rs b/crates/lib/src/core/db/merkle_node/lmdb/writer.rs new file mode 100644 index 000000000..9008470de --- /dev/null +++ b/crates/lib/src/core/db/merkle_node/lmdb/writer.rs @@ -0,0 +1,485 @@ +use std::cell::Cell; +use std::rc::Rc; + +use heed::{Database, Env, WithTls}; + +use crate::core::db::merkle_node::lmdb::LmdbError; +use crate::core::db::merkle_node::lmdb::lmdb_backend::{KeyLmdb, LmdbBackend, ValueLmdb}; +use crate::core::db::merkle_node::lmdb::value_structs::{LmdbLink, LmdbNode}; +use crate::error::OxenError; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; +use crate::model::{MerkleHash, MerkleTreeNodeType, TMerkleTreeNode}; + +/// Merkle writer implementation for the [`LmdbBackend`]. +impl MerkleWriter for LmdbBackend { + /// Returns a new [`LmdbWriteSession`] that can be used to write nodes to the store. + /// + /// No write transaction is opened here; the session buffers all writes via the shared + /// [`Self::pending`] queue and opens exactly one [`heed::RwTxn`] in [`Self::finish`] + /// to commit them atomically. + fn begin<'a>(&'a self) -> Result, OxenError> { + Ok(Box::new(LmdbWriteSession { + env: &self.lmdb_env, + merkle_tree_nodes: &self.merkle_tree_nodes, + merkle_links: &self.merkle_links, + pending: Rc::new(Cell::new(Vec::new())), + })) + } +} + +/// One node's worth of buffered writes — the unit produced by a [`LmdbNodeWriteSession`] +/// and consumed by [`LmdbWriteSession::finish`]. +struct PendingWrite { + node_hash: MerkleHash, + node_kind: MerkleTreeNodeType, + node_data: Vec, + parent_id: Option, + children: Vec<(MerkleHash, LmdbNode)>, +} + +/// Implements [`MerkleWriteSession`] for the [`LmdbBackend`] with all-or-nothing semantics. +/// +/// Each [`LmdbNodeWriteSession`] this hands out shares the [`Self::pending`] queue via +/// [`Rc>`]. Node sessions buffer their state in memory and push a [`PendingWrite`] +/// onto the queue when their `finish` is called. This session's [`Self::finish`] opens +/// a single [`heed::RwTxn`], drains the queue, and commits — so either every queued write +/// is persisted or none is. +/// +/// **Callers MUST call `finish()` on every node session and on the outer write session.** +/// There is intentionally no [`Drop`] guard: any session that goes out of scope without +/// an explicit `finish()` silently loses its buffered state. +/// +/// Single-threaded by construction: [`Rc`] makes the session itself `!Send`, and the +/// [`MerkleWriteSession`] trait doesn't require `Send`, so the session can never move +/// across threads. The borrow on `pending` lives only for one [`Vec::push`] inside a +/// node session's `finish`, with no nested re-entry — so the [`Cell::take`] / +/// [`Cell::set`] dance can never observe a concurrent or aliased access. If async or +/// multi-threading is needed one day, then this can be migrated to an [`Arc>`]. +/// +/// LMDB allows only one write transaction per environment, so other write transactions +/// against the same env block until this session's `finish` returns. Read transactions +/// taken before `finish` see pre-session data; reads taken after see the new state. +struct LmdbWriteSession<'env> { + env: &'env Env, + merkle_tree_nodes: &'env Database, + merkle_links: &'env Database, + pending: Rc>>, +} + +impl<'env> MerkleWriteSession for LmdbWriteSession<'env> { + /// Start writing a single node and allow callers to write its children. + /// On success, returns a [`LmdbNodeWriteSession`]. + fn create_node<'node_trans>( + &'node_trans self, + node: &dyn TMerkleTreeNode, + parent_id: Option, + ) -> Result, OxenError> { + Ok(Box::new(LmdbNodeWriteSession { + pending: Rc::clone(&self.pending), + node_hash: node.hash(), + node_kind: node.node_type(), + node_data: node.to_msgpack_bytes()?, + parent_id, + children_buffer: Vec::new(), + })) + } + + /// Drain every queued [`PendingWrite`] into a single [`heed::RwTxn`] and commit. + /// All-or-nothing: either every node and its children/links are persisted, or none is. + /// + /// Each write goes through [`LmdbNode::encode`] / [`LmdbLink::encode`] which + /// produce the zero-copy on-disk byte format (`magic + version + header + tail`) + /// readable via the corresponding [`super::value_structs::LmdbNodeRef`] / + /// [`super::value_structs::LmdbLinkRef`]. + fn finish(self: Box) -> Result<(), OxenError> { + let pending = self.pending.take(); + if pending.is_empty() { + return Ok(()); + } + let mut wtxn = LmdbBackend::write_txn(self.env)?; + for pw in pending { + LmdbBackend::put_serialized( + &mut wtxn, + self.merkle_tree_nodes, + &pw.node_hash, + LmdbNode::encode, + LmdbNode { + kind: pw.node_kind, + data: pw.node_data, + }, + )?; + + let children = { + let mut children = Vec::with_capacity(pw.children.len()); + for (child_hash, child_node) in pw.children { + LmdbBackend::put_serialized( + &mut wtxn, + self.merkle_tree_nodes, + &child_hash, + LmdbNode::encode, + child_node, + )?; + children.push(child_hash); + } + children + }; + + LmdbBackend::put_serialized( + &mut wtxn, + self.merkle_links, + &pw.node_hash, + LmdbLink::encode, + LmdbLink { + parent_id: pw.parent_id, + children, + }, + )?; + } + wtxn.commit().map_err(LmdbError::Write)?; + Ok(()) + } +} + +/// Buffers one node's data + children in memory; on `finish`, hands the buffer off to the +/// parent [`LmdbWriteSession`]'s pending queue. +/// +/// **Callers MUST call `finish()` before this goes out of scope.** No [`Drop`] guard +/// exists; an unfinished node session silently discards its buffered state. +struct LmdbNodeWriteSession { + // **Always** points to the parent [`LmdbWriteSession::pending`] + pending: Rc>>, + node_hash: MerkleHash, + node_kind: MerkleTreeNodeType, + node_data: Vec, + parent_id: Option, + children_buffer: Vec<(MerkleHash, LmdbNode)>, +} + +impl NodeWriteSession for LmdbNodeWriteSession { + /// Hash of the node currently being written. + fn node_id(&self) -> &MerkleHash { + &self.node_hash + } + + /// Serialize this child and queue for writing. + fn add_child(&mut self, child: &dyn TMerkleTreeNode) -> Result<(), OxenError> { + let child_as_lmdb_node = LmdbNode { + kind: child.node_type(), + data: child.to_msgpack_bytes()?, + }; + self.children_buffer + .push((child.hash(), child_as_lmdb_node)); + Ok(()) + } + + /// Hand the buffered node + children off to the parent session's queue. + /// The actual LMDB writes happen in [`LmdbWriteSession::finish`]. + fn finish(self: Box) -> Result<(), OxenError> { + let LmdbNodeWriteSession { + pending, + node_hash, + node_kind, + node_data, + parent_id, + children_buffer, + } = *self; + // Since Rc is !Send, there's never any parallel access to `pending`. + // Therefore, we will never accidentily loose a `PendingWrite` push. + let mut queue = pending.take(); + queue.push(PendingWrite { + node_hash, + node_kind, + node_data, + parent_id, + children: children_buffer, + }); + pending.set(queue); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::core::db::merkle_node::LmdbBackend; + use crate::error::OxenError; + use crate::model::MerkleHash; + use crate::model::merkle_tree::merkle_reader::MerkleReader; + use crate::model::merkle_tree::merkle_writer::MerkleWriter; + use crate::model::merkle_tree::node::CommitNode; + use crate::test; + + use crate::core::db::merkle_node::lmdb::tests::{ + commit_with_hash, dir_with_hash, file_chunk_node_with_hash, file_node_with_hash, h, + open_lmdb_at, vnode_with_hash, with_test_backend, write_one, + }; + + /// End-to-end smoke test for the queue-based design: write a parent commit with + /// a child commit added, verify both land in the store after `finish` and the + /// parent's link records the child. + #[test] + fn test_lmdb_writer_queue_roundtrip_node_with_child_redundant_write() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let parent_h = h("11111111111111111111111111111111"); + let child_h = h("22222222222222222222222222222222"); + let parent = commit_with_hash(repo, parent_h); + let child = commit_with_hash(repo, child_h); + + let session = backend.begin()?; + { + let mut parent_ns = session.create_node(&parent, None)?; + parent_ns.add_child(&child)?; + parent_ns.finish()?; + } + { + // redundant, but this is using the interface more explictly + // it will write the same node twice: last write wins + let child_ns = session.create_node(&child, Some(parent_h))?; + child_ns.finish()?; + } + session.finish()?; + + rt_node_with_child_verify(backend, parent_h, child_h) + }) + } + + #[test] + fn test_lmdb_writer_queue_roundtrip_node_with_child() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let parent_h = h("11111111111111111111111111111111"); + let child_h = h("22222222222222222222222222222222"); + let parent = commit_with_hash(repo, parent_h); + let child = commit_with_hash(repo, child_h); + + let session = backend.begin()?; + { + let mut parent_ns = session.create_node(&parent, None)?; + parent_ns.add_child(&child)?; + parent_ns.finish()?; + } + // relying on the create_node implementation to ensure that added + // children nodes are written into the node storage as well + session.finish()?; + + rt_node_with_child_verify(backend, parent_h, child_h) + }) + } + + fn rt_node_with_child_verify( + backend: &LmdbBackend, + parent_h: MerkleHash, + child_h: MerkleHash, + ) -> Result<(), OxenError> { + assert!(backend.exists(&parent_h)?); + assert!(backend.exists(&child_h)?); + let children = backend.get_children(&parent_h)?; + assert_eq!(children.len(), 1, "parent should have one child"); + assert_eq!(children[0].0, child_h, "child hash should match"); + Ok(()) + } + + /// Nothing was written, nothing should be in the store, but `finish` must succeed. + #[test] + fn test_lmdb_writer_empty_session_is_noop() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let session = backend.begin()?; + session.finish()?; + let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); + assert!(!backend.exists(&missing)?); + Ok(()) + }) + } + + /// Two `LmdbNodeWriteSession`s alive at the same time used to be a borrow-checker + /// violation under the shared-`RwTxn` design. With the pending-queue design they + /// can coexist freely; both writes must land after the parent's `finish`. + #[test] + fn test_lmdb_writer_concurrent_node_sessions() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let a_h = h("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let b_h = h("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let a = commit_with_hash(repo, a_h); + let b = commit_with_hash(repo, b_h); + + let session = backend.begin()?; + let a_ns = session.create_node(&a, None)?; + let b_ns = session.create_node(&b, Some(a_h))?; + // finish in reverse order on purpose + b_ns.finish()?; + a_ns.finish()?; + session.finish()?; + + assert!(backend.exists(&a_h)?); + assert!(backend.exists(&b_h)?); + Ok(()) + }) + } + + // ──────────────────────────────────────────────────────────────────────────── + // get_links works on every node kind, including file/file-chunk. + // ──────────────────────────────────────────────────────────────────────────── + + #[test] + fn test_get_links_returns_link_for_each_node_kind() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let parent = h("00000000000000000000000000000001"); + let commit_h = h("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + let dir_h = h("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + let vnode_h = h("cccccccccccccccccccccccccccccccc"); + let file_h = h("dddddddddddddddddddddddddddddddd"); + let chunk_h = h("eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"); + + write_one(backend, &commit_with_hash(repo, commit_h), None)?; + write_one(backend, &dir_with_hash(repo, dir_h), Some(parent))?; + write_one(backend, &vnode_with_hash(repo, vnode_h), Some(parent))?; + write_one(backend, &file_node_with_hash(repo, file_h), Some(parent))?; + write_one(backend, &file_chunk_node_with_hash(chunk_h), Some(parent))?; + + let commit_link = backend.get_links(&commit_h)?.expect("commit link"); + assert_eq!(commit_link.parent_id, None); + assert!(commit_link.children.is_empty()); + + for hash in [&dir_h, &vnode_h, &file_h, &chunk_h] { + let link = backend.get_links(hash)?.expect("link for non-commit kind"); + assert_eq!( + link.parent_id, + Some(parent), + "parent should round-trip for {hash}" + ); + assert!( + link.children.is_empty(), + "no add_child was called for {hash}" + ); + } + Ok(()) + }) + } + + #[test] + fn test_get_links_returns_none_for_unwritten_hash() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); + assert!(backend.get_links(&missing)?.is_none()); + Ok(()) + }) + } + + // ──────────────────────────────────────────────────────────────────────────── + // Missing-hash behavior across all four read methods. + // ──────────────────────────────────────────────────────────────────────────── + + #[test] + fn test_reads_against_missing_hash() -> Result<(), OxenError> { + with_test_backend(|_repo, backend| { + let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); + assert!(!backend.exists(&missing)?); + assert!(backend.get_node(&missing)?.is_none()); + assert!(backend.get_children(&missing)?.is_empty()); + assert!(!backend.full_exists(&missing)?); + assert!(backend.full_get_node(&missing)?.is_none()); + assert!(backend.get_links(&missing)?.is_none()); + Ok(()) + }) + } + + // ──────────────────────────────────────────────────────────────────────────── + // Multi-child round-trip — exercises the `LmdbLink` serialization path with + // N>1 children. + // ──────────────────────────────────────────────────────────────────────────── + + #[test] + fn test_get_children_returns_all_children_in_order() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let parent_h = h("11111111111111111111111111111111"); + let parent = commit_with_hash(repo, parent_h); + + // 5 distinct children with deterministic, distinguishable hashes. + let kid_hashes: Vec = (0..5) + .map(|i| h(&format!("{:032x}", 0xC0DE_0000_u64 + i))) + .collect(); + let kids: Vec = kid_hashes + .iter() + .map(|kh| commit_with_hash(repo, *kh)) + .collect(); + + let session = backend.begin()?; + { + let mut parent_ns = session.create_node(&parent, None)?; + for k in &kids { + parent_ns.add_child(k)?; + } + parent_ns.finish()?; + } + for k in &kids { + let ns = session.create_node(k, Some(parent_h))?; + ns.finish()?; + } + session.finish()?; + + let children = backend.get_children(&parent_h)?; + assert_eq!(children.len(), 5, "all 5 children should be present"); + let returned_hashes: Vec = children.iter().map(|(c, _)| *c).collect(); + assert_eq!( + returned_hashes, kid_hashes, + "children returned in add_child order" + ); + Ok(()) + }) + } + + #[test] + fn test_get_children_empty_for_node_with_no_add_child() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let commit_h = h("11111111111111111111111111111111"); + let commit = commit_with_hash(repo, commit_h); + write_one(backend, &commit, None)?; + assert!(backend.get_children(&commit_h)?.is_empty()); + Ok(()) + }) + } + + // ──────────────────────────────────────────────────────────────────────────── + // Persistence across env open/close. + // ──────────────────────────────────────────────────────────────────────────── + + #[test] + fn test_data_persists_across_env_reopen() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + let commit_h = h("11111111111111111111111111111111"); + let commit = commit_with_hash(&repo, commit_h); + + // First open: write, then drop the backend (env close). + { + let backend = open_lmdb_at(repo.path.clone()); + write_one(&backend, &commit, None)?; + } + // Second open: reopen, read. + { + let backend = open_lmdb_at(repo.path.clone()); + assert!(backend.exists(&commit_h)?); + assert!(backend.get_node(&commit_h)?.is_some()); + } + Ok(()) + }) + } + + // ──────────────────────────────────────────────────────────────────────────── + // Idempotent re-writes (last-write-wins). + // ──────────────────────────────────────────────────────────────────────────── + + #[test] + fn test_repeated_writes_to_same_hash_do_not_error() -> Result<(), OxenError> { + with_test_backend(|repo, backend| { + let commit_h = h("11111111111111111111111111111111"); + let commit = commit_with_hash(repo, commit_h); + + // Write twice, in two separate sessions. + write_one(backend, &commit, None)?; + write_one(backend, &commit, None)?; + + assert!(backend.exists(&commit_h)?); + Ok(()) + }) + } +} diff --git a/crates/lib/src/core/db/merkle_node/merkle_node_db.rs b/crates/lib/src/core/db/merkle_node/merkle_node_db.rs index 3e0484941..dcce13dce 100644 --- a/crates/lib/src/core/db/merkle_node/merkle_node_db.rs +++ b/crates/lib/src/core/db/merkle_node/merkle_node_db.rs @@ -388,7 +388,7 @@ impl MerkleNodeDB { Ok(()) } - /// Write the base node info. + /// Writes the content of the Merkle tree node according to the specific `node` file format. /// WARNING: Sets the internal dtype, node_id, parent_id of `self` to the values from `node`. fn write_node( &mut self, @@ -406,15 +406,16 @@ impl MerkleNodeDB { let Some(node_file) = self.node_file.as_mut() else { return Err(MerkleDbError::WriteBeforeOpen); }; - // log::debug!("write_node node: {}", node); - // Write data type + log::trace!("write_node node: {}", node); + node_file.write_all(&node.node_type().to_u8().to_le_bytes())?; // Write parent id if let Some(parent_id) = parent_id { node_file.write_all(&parent_id.to_le_bytes())?; } else { + // write 16 bytes, each is zero => write a 0_u128 node_file.write_all(&[0u8; 16])?; } @@ -422,7 +423,7 @@ impl MerkleNodeDB { let buf = node.to_msgpack_bytes()?; let data_len = buf.len() as u32; node_file.write_all(&data_len.to_le_bytes())?; - // log::debug!("write_node Wrote data length {}", data_len); + log::trace!("write_node Wrote data length {}", data_len); // Write data node_file.write_all(&buf)?; @@ -438,6 +439,7 @@ impl MerkleNodeDB { Ok(()) } + /// Writes the content of a node's child as the child would appear in the `children` file. pub(crate) fn add_child(&mut self, item: &dyn TMerkleTreeNode) -> Result<(), MerkleDbError> { if self.read_only { return Err(MerkleDbError::ReadOnly); @@ -467,6 +469,7 @@ impl MerkleNodeDB { // log::debug!("--add_child-- children_file {:?}", children_file); // log::debug!("--add_child-- buf.len() {}", buf.len()); children_file.write_all(&buf)?; + self.data_offset += data_len; Ok(()) diff --git a/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs b/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs index 55b15c15c..eb89afd44 100644 --- a/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs +++ b/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs @@ -765,7 +765,7 @@ impl CommitMerkleTree { // Load the children for the vnode let vnode_with_children = CommitMerkleTree::read_depth(repo, &vnode_without_children.hash, 0)?; - // log::debug!("read_file vnode_with_children: {:?}", vnode_with_children); + log::trace!("read_file vnode_with_children: {:?}", vnode_with_children); let Some(vnode_with_children) = vnode_with_children else { return Ok(None); }; diff --git a/crates/lib/src/error.rs b/crates/lib/src/error.rs index 5de5412a3..a2406c762 100644 --- a/crates/lib/src/error.rs +++ b/crates/lib/src/error.rs @@ -14,6 +14,7 @@ use std::path::Path; use std::path::PathBuf; use tokio::task::JoinError; +use crate::core::db::merkle_node::lmdb::LmdbError; use crate::core::db::merkle_node::merkle_node_db::MerkleDbError; use crate::model::ParsedResource; use crate::model::RepoNew; @@ -260,6 +261,9 @@ pub enum OxenError { /// Contains the name of the incompatible type as reported by [`std::any::type_name_of_val`]. DisallowedNodeWrite(&'static str), + #[error("{0}")] + Lmdb(#[from] LmdbError), + // // Schema (dataframes) // @@ -546,6 +550,7 @@ pub enum OxenError { #[error("{0}")] Basic(StringError), + // TODO: remove all uses of `Basic` and replace with specific errors. #[error("{0}")] InternalError(StringError), } diff --git a/crates/lib/src/model/merkle_tree/node/commit_node.rs b/crates/lib/src/model/merkle_tree/node/commit_node.rs index d9f18d66c..9c66df7db 100644 --- a/crates/lib/src/model/merkle_tree/node/commit_node.rs +++ b/crates/lib/src/model/merkle_tree/node/commit_node.rs @@ -124,6 +124,7 @@ impl CommitNode { } } + #[inline(always)] pub fn deserialize(data: &[u8]) -> Result { // In order to support versions that didn't have the enum, // if it fails we will fall back to the old struct, then populate the enum diff --git a/crates/lib/src/model/merkle_tree/node/dir_node.rs b/crates/lib/src/model/merkle_tree/node/dir_node.rs index d026190ec..48187569b 100644 --- a/crates/lib/src/model/merkle_tree/node/dir_node.rs +++ b/crates/lib/src/model/merkle_tree/node/dir_node.rs @@ -118,6 +118,7 @@ impl DirNode { } } + #[inline(always)] pub fn deserialize(data: &[u8]) -> Result { // In order to support versions that didn't have the enum, // if it fails we will fall back to the old struct, then populate the enum diff --git a/crates/lib/src/model/merkle_tree/node/file_chunk_node.rs b/crates/lib/src/model/merkle_tree/node/file_chunk_node.rs index b0cd92a4b..6809b9e95 100644 --- a/crates/lib/src/model/merkle_tree/node/file_chunk_node.rs +++ b/crates/lib/src/model/merkle_tree/node/file_chunk_node.rs @@ -16,6 +16,7 @@ pub struct FileChunkNode { } impl FileChunkNode { + #[inline(always)] pub fn deserialize(data: &[u8]) -> Result { rmp_serde::from_slice(data) } diff --git a/crates/lib/src/model/merkle_tree/node/file_node.rs b/crates/lib/src/model/merkle_tree/node/file_node.rs index 5c4aa01ba..8ae6fd8de 100644 --- a/crates/lib/src/model/merkle_tree/node/file_node.rs +++ b/crates/lib/src/model/merkle_tree/node/file_node.rs @@ -94,6 +94,7 @@ impl FileNode { } } + #[inline(always)] pub fn deserialize(data: &[u8]) -> Result { let file_node: FileNode = match rmp_serde::from_slice(data) { Ok(file_node) => file_node, diff --git a/crates/lib/src/model/merkle_tree/node/vnode.rs b/crates/lib/src/model/merkle_tree/node/vnode.rs index 300286b08..a30ca5bfc 100644 --- a/crates/lib/src/model/merkle_tree/node/vnode.rs +++ b/crates/lib/src/model/merkle_tree/node/vnode.rs @@ -55,6 +55,7 @@ impl VNode { } } + #[inline(always)] pub fn deserialize(data: &[u8]) -> Result { // In order to support versions that didn't have the enum, // if it fails we will fall back to the old struct, then populate the enum diff --git a/crates/lib/src/repositories/commits/commit_writer.rs b/crates/lib/src/repositories/commits/commit_writer.rs index 0f816609e..713e5bd88 100644 --- a/crates/lib/src/repositories/commits/commit_writer.rs +++ b/crates/lib/src/repositories/commits/commit_writer.rs @@ -2160,9 +2160,9 @@ mod tests { /// open each per-node DB read-only via the `MerkleStore` API, and capture /// the **deserialized** structured contents: /// - /// - `parent_id: Option` — from the per-node `node` file's + /// - `parent_id: Option` => from the per-node `node` file's /// fixed-size header (16 raw bytes; deterministic). - /// - `node: EMerkleTreeNode` — msgpack-decoded from the data + /// - `node: EMerkleTreeNode` => msgpack-decoded from the data /// section of the per-node `node` file. Decoding undoes the HashMap /// key-order non-determinism described in the module-level comment. /// - `children: Vec<(MerkleHash, EMerkleTreeNode)>` — each child msgpack- @@ -2185,6 +2185,7 @@ mod tests { /// `HashMap` on every call, so even running the previous-node-writing /// code twice can produce non-identical on-disk bytes for `DirNode` /// bodies. + /// /// The merkle hash is computed from raw fields (not msgpack), so node /// hashes are stable, but the byte image of the msgpack body is not. /// @@ -2195,7 +2196,7 @@ mod tests { /// equivalence observable without changing the production map-typed /// fields to a deterministic container (e.g. `BTreeMap`) or to a /// fixed-seeded `BuildHasher`. - fn snapshot_tree_nodes( + fn snapshot_tree_nodes_file_backend( repo: &LocalRepository, ) -> Result, OxenError> { let nodes_dir = util::fs::oxen_hidden_dir(&repo.path) @@ -2363,7 +2364,7 @@ mod tests { commit_ns.finish()?; session.finish()?; } - let snap_previous = snapshot_tree_nodes(&repo)?; + let snap_previous = snapshot_tree_nodes_file_backend(&repo)?; wipe_tree_nodes(&repo)?; // ── Run 2: updated node writing (production) ────────────────────── @@ -2385,7 +2386,7 @@ mod tests { commit_ns.finish()?; session.finish()?; } - let snap_updated = snapshot_tree_nodes(&repo)?; + let snap_updated = snapshot_tree_nodes_file_backend(&repo)?; // ── Compare ──────────────────────────────────────────────────────── //