diff --git a/crates/lib/src/command/migrate/m20250111083535_add_child_counts_to_nodes.rs b/crates/lib/src/command/migrate/m20250111083535_add_child_counts_to_nodes.rs index 2dc5fc78a..28a2ddb6b 100644 --- a/crates/lib/src/command/migrate/m20250111083535_add_child_counts_to_nodes.rs +++ b/crates/lib/src/command/migrate/m20250111083535_add_child_counts_to_nodes.rs @@ -3,10 +3,12 @@ use std::path::Path; use super::Migrate; use crate::config::RepositoryConfig; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; use crate::model::merkle_tree::merkle_tree_node_cache; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; use crate::model::merkle_tree::node::vnode::VNodeOpts; use crate::model::merkle_tree::node::{ CommitNode, DirNode, EMerkleTreeNode, MerkleTreeNode, VNode, @@ -135,14 +137,27 @@ fn run_on_commit(repository: &LocalRepository, commit: &Commit) -> Result<(), Ox dir_node_opts.num_entries = num_children as u64; let dir_node = DirNode::new(&new_repo, dir_node_opts)?; - // Write a new commit db + // Write a new commit node via a session on the old repo. let commit_node = CommitNode::from_commit(commit.clone()); - let mut root_commit_db = - MerkleNodeDB::open_read_write(&old_repo, &commit_node, root_node.parent_id)?; - root_commit_db.add_child(&dir_node)?; + let old_store = old_repo.merkle_store(); + let new_store = new_repo.merkle_store(); + let old_session = old_store.begin()?; + let new_session = new_store.begin()?; + let mut root_commit_ns = old_session.create_node(&commit_node, root_node.parent_id)?; + root_commit_ns.add_child(&dir_node)?; + root_commit_ns.finish()?; let current_path = Path::new(""); - rewrite_nodes(&old_repo, &new_repo, &root_node, current_path)?; + rewrite_nodes( + &old_repo, + &new_repo, + &old_session, + &new_session, + &root_node, + current_path, + )?; + new_session.finish()?; + old_session.finish()?; // println!("new tree for commit {}", commit); // repositories::tree::print_tree(&new_repo, commit)?; @@ -158,9 +173,12 @@ fn run_on_commit(repository: &LocalRepository, commit: &Commit) -> Result<(), Ox // Forgive me if you are reading this for reference, we don't have great writers for the // merkle tree yet - so there is a lot of duplicate logic with `commit_writer.rs` -fn rewrite_nodes( +#[allow(clippy::too_many_arguments)] +fn rewrite_nodes( old_repo: &LocalRepository, new_repo: &LocalRepository, + old_session: &OldS, + new_session: &NewS, node: &MerkleTreeNode, current_dir: &Path, ) -> Result<(), OxenError> { @@ -169,19 +187,10 @@ fn rewrite_nodes( EMerkleTreeNode::Directory(dir) => { // Load all the children of children (files and folders) // Then redistribute into buckets... - // and then just use the MerkleNodeDB to write the nodes - // to the new tree + // and then just use the session to write the nodes to the new tree let dir_children = repositories::tree::list_files_and_folders(child)?; let current_dir = current_dir.join(dir.name()); - // log::debug!( - // "rewrite_nodes {} children on current_dir {:?} DIRECTORY {} {}", - // dir_children.len(), - // current_dir, - // dir.hash(), - // dir - // ); - let total_children = dir_children.len(); let vnode_size = old_repo.vnode_size(); let num_vnodes = (total_children as f32 / vnode_size as f32).ceil() as u128; @@ -190,15 +199,7 @@ fn rewrite_nodes( let mut dir_node_opts = dir.get_opts(); dir_node_opts.num_entries = total_children as u64; let dir = DirNode::new(new_repo, dir_node_opts)?; - let mut dir_db = MerkleNodeDB::open_read_write(old_repo, &dir, node.parent_id)?; - - // log::debug!( - // "rewrite_nodes {} VNodes for {} children in {} with vnode size {}", - // num_vnodes, - // total_children, - // dir, - // vnode_size - // ); + let mut dir_ns = old_session.create_node(&dir, node.parent_id)?; // Compute buckets let mut buckets: Vec> = vec![vec![]; num_vnodes as usize]; @@ -206,14 +207,6 @@ fn rewrite_nodes( let path = current_dir.join(dir_child.maybe_path().unwrap()); let hash = hasher::hash_buffer_128bit(path.to_str().unwrap().as_bytes()); let bucket_idx = hash % num_vnodes; - // log::debug!( - // "\trewrite_nodes dir_child {:?} bucket {} num_vnodes {} hash {} {}", - // path, - // bucket_idx, - // num_vnodes, - // hash, - // dir_child - // ); buckets[bucket_idx as usize].push(dir_child); } @@ -242,51 +235,58 @@ fn rewrite_nodes( vnodes.push((vnode_id, bucket.clone())); } - // log::debug!("rewrite_nodes count vnodes: {}", vnodes.len()); for (hash, entries) in vnodes.iter() { - // create a new vnode obj and add the the db + // create a new vnode obj and add to the dir session let opts = VNodeOpts { hash: *hash, num_entries: entries.len() as u64, }; let vnode_obj = VNode::new(new_repo, opts)?; - // log::debug!("rewrite_nodes adding VNode to DirNode! {:?}", vnode_obj); - dir_db.add_child(&vnode_obj)?; + dir_ns.add_child(&vnode_obj)?; - let mut vnode_db = - MerkleNodeDB::open_read_write(new_repo, &vnode_obj, Some(dir_db.node_id))?; + let dir_ns_id = *dir_ns.node_id(); + let mut vnode_ns = new_session.create_node(&vnode_obj, Some(dir_ns_id))?; - // log::debug!("rewrite_nodes count entries {}", entries.len()); for entry in entries { match &entry.node { EMerkleTreeNode::File(f_node) => { - // log::debug!("rewrite_nodes adding FileNode to VNode! {}", f_node); - vnode_db.add_child(f_node)?; + vnode_ns.add_child(f_node)?; } EMerkleTreeNode::Directory(d_node) => { let mut d_node_opts = d_node.get_opts(); let d_children = repositories::tree::list_files_and_folders(entry)?; d_node_opts.num_entries = d_children.len() as u64; - // log::debug!( - // "rewrite_nodes adding DirNode to VNode with {} num_entries {}", - // d_node_opts.num_entries, - // d_node - // ); let d_node = DirNode::new(new_repo, d_node_opts)?; - vnode_db.add_child(&d_node)?; + vnode_ns.add_child(&d_node)?; } _ => { panic!("Shouldn't reach here.") } } } + vnode_ns.finish()?; } - - rewrite_nodes(old_repo, new_repo, child, ¤t_dir)?; + dir_ns.finish()?; + + rewrite_nodes( + old_repo, + new_repo, + old_session, + new_session, + child, + ¤t_dir, + )?; } EMerkleTreeNode::VNode(_) => { // VNode just needs to traverse to the next dirnode - rewrite_nodes(old_repo, new_repo, child, current_dir)?; + rewrite_nodes( + old_repo, + new_repo, + old_session, + new_session, + child, + current_dir, + )?; } _ => { // pass, FileNode was not changed, so it is on the latest version diff --git a/crates/lib/src/core/v_latest/commits.rs b/crates/lib/src/core/v_latest/commits.rs index c06f48f98..2c6344703 100644 --- a/crates/lib/src/core/v_latest/commits.rs +++ b/crates/lib/src/core/v_latest/commits.rs @@ -21,8 +21,10 @@ use std::str; use crate::constants::COMMIT_COUNT_DIR; use crate::core::db::key_val::{opts, str_val_db}; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::core::v_latest::index::CommitMerkleTree; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; use rocksdb::{DBWithThreadMode, MultiThreaded, SingleThreaded}; /// Configuration for commit traversal operations @@ -283,10 +285,14 @@ pub fn create_empty_commit( )?; let parent_id = Some(existing_node.hash); - let mut commit_db = MerkleNodeDB::open_read_write(repo, &commit_node, parent_id)?; + let store = repo.merkle_store(); + let session = store.begin()?; + let mut commit_ns = session.create_node(&commit_node, parent_id)?; // There should always be one child, the root directory let dir_node = existing_node.children.first().unwrap().dir()?; - commit_db.add_child(&dir_node)?; + commit_ns.add_child(&dir_node)?; + commit_ns.finish()?; + session.finish()?; // Copy the dir hashes db to the new commit repositories::tree::cp_dir_hashes_to(repo, &existing_commit_id, commit_node.hash())?; @@ -362,9 +368,13 @@ pub fn create_initial_commit( }, )?; - // Open the commit database and add the root directory - let mut commit_db = MerkleNodeDB::open_read_write(repo, &commit_node, None)?; - commit_db.add_child(&dir_node)?; + // Open the commit write session and add the root directory + let store = repo.merkle_store(); + let session = store.begin()?; + let mut commit_ns = session.create_node(&commit_node, None)?; + commit_ns.add_child(&dir_node)?; + commit_ns.finish()?; + session.finish()?; // Initialize the dir_hash_db with the root directory hash let commit_id_string = commit_id.to_string(); diff --git a/crates/lib/src/core/v_latest/entries.rs b/crates/lib/src/core/v_latest/entries.rs index 1d16e3d0e..d7c8841b6 100644 --- a/crates/lib/src/core/v_latest/entries.rs +++ b/crates/lib/src/core/v_latest/entries.rs @@ -1,7 +1,9 @@ use crate::core; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::error::OxenError; use crate::model::entry::metadata_entry::WorkspaceMetadataEntry; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; use crate::model::merkle_tree::node::{DirNode, EMerkleTreeNode, FileNode, MerkleTreeNode}; use crate::model::metadata::MetadataDir; use crate::model::metadata::generic_metadata::GenericMetadata; @@ -545,15 +547,18 @@ pub fn update_metadata(repo: &LocalRepository, revision: impl AsRef) -> Res // Initialize data structures for aggregation let mut num_bytes = 0; - // Start the recursive traversal - traverse_and_update_sizes_and_counts(repo, &mut node, &mut num_bytes)?; + // One merkle write session covers every node written during the traversal. + let store = repo.merkle_store(); + let session = store.begin()?; + traverse_and_update_sizes_and_counts(&session, &mut node, &mut num_bytes)?; + session.finish()?; Ok(()) } #[allow(clippy::type_complexity)] -fn traverse_and_update_sizes_and_counts( - repo: &LocalRepository, +fn traverse_and_update_sizes_and_counts( + session: &S, node: &mut MerkleTreeNode, num_bytes: &mut u64, ) -> Result<(HashMap, HashMap), OxenError> { @@ -566,31 +571,33 @@ fn traverse_and_update_sizes_and_counts( EMerkleTreeNode::Commit(commit_node) => { log::debug!("Traversing node {commit_node:?}"); process_children( - repo, + session, children, &mut local_counts, &mut local_sizes, num_bytes, )?; - let mut dir_db = MerkleNodeDB::open_read_write(repo, commit_node, node.parent_id)?; - add_children_to_db(&mut dir_db, &node.children)?; + let mut dir_ns = session.create_node(commit_node, node.parent_id)?; + add_children_to_session(&mut dir_ns, &node.children)?; + dir_ns.finish()?; } EMerkleTreeNode::VNode(vnode) => { log::debug!("Traversing vnode {vnode:?}"); process_children( - repo, + session, children, &mut local_counts, &mut local_sizes, num_bytes, )?; - let mut dir_db = MerkleNodeDB::open_read_write(repo, vnode, node.parent_id)?; - add_children_to_db(&mut dir_db, &node.children)?; + let mut dir_ns = session.create_node(vnode, node.parent_id)?; + add_children_to_session(&mut dir_ns, &node.children)?; + dir_ns.finish()?; } EMerkleTreeNode::Directory(dir_node) => { log::debug!("No need to aggregate dir {}", dir_node.name()); process_children( - repo, + session, children, &mut local_counts, &mut local_sizes, @@ -598,8 +605,9 @@ fn traverse_and_update_sizes_and_counts( )?; dir_node.set_data_type_counts(local_counts.clone()); dir_node.set_data_type_sizes(local_sizes.clone()); - let mut dir_db = MerkleNodeDB::open_read_write(repo, dir_node, node.parent_id)?; - add_children_to_db(&mut dir_db, &node.children)?; + let mut dir_ns = session.create_node(dir_node, node.parent_id)?; + add_children_to_session(&mut dir_ns, &node.children)?; + dir_ns.finish()?; } EMerkleTreeNode::File(file_node) => { log::debug!( @@ -626,8 +634,8 @@ fn traverse_and_update_sizes_and_counts( Ok((local_counts, local_sizes)) } -fn process_children( - repo: &LocalRepository, +fn process_children( + session: &S, children: &mut [MerkleTreeNode], local_counts: &mut HashMap, local_sizes: &mut HashMap, @@ -635,7 +643,7 @@ fn process_children( ) -> Result<(), OxenError> { for child in children.iter_mut() { let (child_counts, child_sizes) = - traverse_and_update_sizes_and_counts(repo, child, num_bytes)?; + traverse_and_update_sizes_and_counts(session, child, num_bytes)?; for (key, count) in child_counts { *local_counts.entry(key).or_insert(0) += count; } @@ -646,23 +654,23 @@ fn process_children( Ok(()) } -fn add_children_to_db( - dir_db: &mut MerkleNodeDB, +fn add_children_to_session( + ns: &mut S, children: &[MerkleTreeNode], ) -> Result<(), OxenError> { for child in children { match &child.node { EMerkleTreeNode::Commit(commit_node) => { - dir_db.add_child(commit_node)?; + ns.add_child(commit_node)?; } EMerkleTreeNode::Directory(dir_node) => { - dir_db.add_child(dir_node)?; + ns.add_child(dir_node)?; } EMerkleTreeNode::File(file_node) => { - dir_db.add_child(file_node)?; + ns.add_child(file_node)?; } EMerkleTreeNode::VNode(vnode) => { - dir_db.add_child(vnode)?; + ns.add_child(vnode)?; } _ => { return Err(OxenError::basic_str("Unsupported node type")); diff --git a/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs b/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs index 7b7506468..99b6553ac 100644 --- a/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs +++ b/crates/lib/src/core/v_latest/index/commit_merkle_tree.rs @@ -6,8 +6,7 @@ use std::str; use crate::core::db::dir_hashes::dir_hashes_db::{ dir_hash_db_path, dir_hash_db_path_from_commit_id, with_dir_hash_db_manager, }; -use crate::core::db::merkle_node::MerkleNodeDB; - +use crate::model::merkle_tree::merkle_reader::MerkleReader; use crate::model::merkle_tree::node::EMerkleTreeNode; use crate::model::merkle_tree::node::FileNode; @@ -207,7 +206,7 @@ impl CommitMerkleTree { unique_hashes: Option<&mut HashSet>, shared_hashes: Option<&mut HashSet>, ) -> Result, OxenError> { - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } @@ -232,7 +231,7 @@ impl CommitMerkleTree { unique_hashes: Option<&mut HashMap<(MerkleHash, MerkleTreeNodeType), PathBuf>>, ) -> Result, OxenError> { // log::debug!("Read node hash [{}]", hash); - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } @@ -258,7 +257,7 @@ impl CommitMerkleTree { partial_nodes: &mut HashMap, ) -> Result, OxenError> { // log::debug!("Read node hash [{}]", hash); - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } @@ -282,7 +281,7 @@ impl CommitMerkleTree { depth: i32, ) -> Result, OxenError> { // log::debug!("Read depth {} node hash [{}]", depth, hash); - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { log::debug!("read_depth merkle node db does not exist for hash: {hash}"); return Ok(None); } @@ -314,7 +313,7 @@ impl CommitMerkleTree { shared_hashes: Option<&mut HashSet>, depth: i32, ) -> Result, OxenError> { - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } @@ -343,7 +342,7 @@ impl CommitMerkleTree { depth: i32, ) -> Result, OxenError> { // log::debug!("Read node hash [{}]", hash); - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } @@ -375,7 +374,7 @@ impl CommitMerkleTree { depth: i32, ) -> Result, OxenError> { // log::debug!("Read node hash [{}]", hash); - if !MerkleNodeDB::exists(repo, hash) { + if !repo.merkle_store().exists(hash)? { // log::debug!("read_node merkle node db does not exist for hash: {}", hash); return Ok(None); } diff --git a/crates/lib/src/model/merkle_tree/node/merkle_tree_node.rs b/crates/lib/src/model/merkle_tree/node/merkle_tree_node.rs index 36de0ba88..6fa191286 100644 --- a/crates/lib/src/model/merkle_tree/node/merkle_tree_node.rs +++ b/crates/lib/src/model/merkle_tree/node/merkle_tree_node.rs @@ -6,8 +6,8 @@ use std::path::Path; use std::path::PathBuf; use super::*; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_reader::MerkleReader; use crate::model::{LocalRepository, MerkleHash, MerkleTreeNodeType}; use serde::{Deserialize, Serialize}; @@ -41,11 +41,15 @@ impl MerkleTreeNode { /// Private implementation that loads from disk without caching fn from_hash_uncached(repo: &LocalRepository, hash: &MerkleHash) -> Result { - let node_db = MerkleNodeDB::open_read_only(repo, hash)?; - let parent_id = node_db.parent_id; + let record = repo + .merkle_store() + .get_node(hash)? + // TODO: use a structured error variant + .ok_or_else(|| OxenError::basic_str(format!("Merkle node not found: {hash}")))?; + let parent_id = record.parent_id().copied(); Ok(MerkleTreeNode { hash: *hash, - node: node_db.node()?, + node: record.into_node(), parent_id, children: Vec::new(), }) @@ -74,13 +78,15 @@ impl MerkleTreeNode { repo: &LocalRepository, hash: &MerkleHash, ) -> Result, OxenError> { - let Ok(mut node_db) = MerkleNodeDB::open_read_only(repo, hash) else { - // We don't return an error here because there are some situations where we won't have all the node files. - // For example, when working in a subtree clone. + let store = repo.merkle_store(); + // We don't return an error for missing-node here because there are some situations + // where we won't have all the node files, e.g. when working in a subtree clone. + // Parse/IO errors from an existing node DO still propagate. + if !store.exists(hash)? { log::warn!("no child node db: {hash:?}"); return Ok(Vec::new()); - }; - Ok(node_db.map()?) + } + Ok(store.get_children(hash)?) } /// Check if the node is a leaf node (i.e. it has no children) diff --git a/crates/lib/src/model/repository/local_repository.rs b/crates/lib/src/model/repository/local_repository.rs index 92222dcbb..391995957 100644 --- a/crates/lib/src/model/repository/local_repository.rs +++ b/crates/lib/src/model/repository/local_repository.rs @@ -1,8 +1,10 @@ use crate::config::RepositoryConfig; use crate::constants::SHALLOW_FLAG; use crate::constants::{self, DEFAULT_VNODE_SIZE, MIN_OXEN_VERSION}; +use crate::core::db::merkle_node::FileBackend; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; +use crate::model::merkle_tree::MerkleStore; use crate::model::merkle_tree::node::FileNode; use crate::model::{MetadataEntry, Remote, RemoteRepository}; use crate::opts::StorageOpts; @@ -52,6 +54,9 @@ pub struct LocalRepositoryWithEntries { pub entries: Option>, } +// Keep this private: it is an implementation detail for providing dynamic loading of the MerkleTree trait. +mod merkle_store_dispatch; + impl LocalRepository { /// Create a LocalRepository from a directory pub fn from_dir(path: impl AsRef) -> Result { @@ -93,6 +98,20 @@ impl LocalRepository { } } + /// Obtain the Merkle tree store for this repository. + /// + /// Returns an opaque `impl MerkleStore` whose concrete type is the private + /// dispatch enum in [`merkle_store_dispatch`]. Callers use it purely through the + /// trait surface (read, write); backend selection is an implementation detail + /// of this method. + /// + /// When new backends (e.g. LMDB) are added, they are registered in + /// `merkle_store_dispatch::define_merkle_store_dispatch!`, and the + /// dispatch logic for choosing among them lives here. + pub fn merkle_store(&self) -> impl MerkleStore + '_ { + merkle_store_dispatch::StoreEnum::File(FileBackend::new(self)) + } + pub fn init_version_store(&mut self, storage_opts: &StorageOpts) -> Result<(), OxenError> { let store = create_version_store(&self.path, storage_opts)?; self.version_store = Some(store); diff --git a/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs b/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs new file mode 100644 index 000000000..f456bd0f7 --- /dev/null +++ b/crates/lib/src/model/repository/local_repository/merkle_store_dispatch.rs @@ -0,0 +1,181 @@ +//! Private enum-dispatch for merkle-store backends. +//! +//! [`LocalRepository::merkle_store`] returns an opaque `impl MerkleStore` +//! backed by the [`StoreEnum`] generated here. Callers never name the enum — +//! it is `pub(super)` and only the parent `local_repository` module wraps +//! construction. +//! +//! To add a new backend, add one line to the +//! [`define_merkle_store_dispatch!`] invocation at the bottom of this file. +//! The macro expands every delegating trait impl and the unified error enum; +//! no hand-written match arms need to be edited. +//! +//! [`LocalRepository::merkle_store`]: super::LocalRepository::merkle_store + +use crate::core::db::merkle_node::FileBackend; +use crate::core::db::merkle_node::merkle_node_db::MerkleDbError; +use crate::error::{IntoOxenError, OxenError}; +use crate::model::MerkleHash; +use crate::model::TMerkleTreeNode; +use crate::model::merkle_tree::merkle_reader::{MerkleNodeRecord, MerkleReader}; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; +use crate::model::merkle_tree::node::MerkleTreeNode; + +/// Declare the merkle-store dispatch enum and every delegating trait impl +/// from a single list of backends. Each entry is +/// `VariantName => Backend<'repo>, error = BackendError`. +/// +/// The macro also generates a unified `StoreError` enum with one variant per +/// backend's native error. It derives `IntoOxenError` by delegating to each +/// backend's own `IntoOxenError` impl, so errors funnel cleanly into +/// `OxenError` via the crate-wide blanket `From for OxenError`. +/// +/// Implementation note: none of this should be visible to users depending on +/// `liboxen`. We use `pub(super)` so that `local_repository` is the only +/// module that can use this macro and associated dispatch-enum. +macro_rules! define_merkle_store_dispatch { + ( $( $variant:ident => $backend:ty, error = $err:ty ),* $(,)? ) => { + /// Unified error for the dispatch enum. One variant per backend. + #[derive(Debug, thiserror::Error)] + pub(super) enum StoreError { + $( + #[error(transparent)] + $variant($err), + )* + } + + impl IntoOxenError for StoreError { + fn into_oxen(self) -> OxenError { + match self { + $( Self::$variant(e) => e.into_oxen() ),* + } + } + } + + /// Dispatch enum over merkle-store backends. + pub(super) enum StoreEnum<'repo> { + $( $variant($backend) ),* + } + + /// Dispatch enum over per-backend [`MerkleWriteSession`] types. Each + /// variant type is named via GAT projection from the backend type, so + /// adding a backend doesn't require naming its session type. + pub(super) enum SessionEnum<'repo> { + $( $variant(<$backend as MerkleWriter>::Session<'repo>) ),* + } + + /// Dispatch enum over per-backend [`NodeWriteSession`] types. + pub(super) enum NodeSessionEnum<'a, 'repo: 'a> { + $( + $variant( + <<$backend as MerkleWriter>::Session<'repo> + as MerkleWriteSession>::NodeSession<'a> + ) + ),* + } + + impl<'repo> MerkleReader for StoreEnum<'repo> { + type Error = StoreError; + + fn exists(&self, hash: &MerkleHash) -> Result { + match self { + $( Self::$variant(b) => b.exists(hash).map_err(StoreError::$variant) ),* + } + } + + fn get_node( + &self, + hash: &MerkleHash, + ) -> Result, StoreError> { + match self { + $( Self::$variant(b) => b.get_node(hash).map_err(StoreError::$variant) ),* + } + } + + fn get_children( + &self, + hash: &MerkleHash, + ) -> Result, StoreError> { + match self { + $( Self::$variant(b) => b.get_children(hash).map_err(StoreError::$variant) ),* + } + } + } + + impl<'repo> MerkleWriter for StoreEnum<'repo> { + type Error = StoreError; + type Session<'a> = SessionEnum<'repo> where Self: 'a; + + fn begin(&self) -> Result, StoreError> { + match self { + $( + Self::$variant(b) => b + .begin() + .map(SessionEnum::$variant) + .map_err(StoreError::$variant) + ),* + } + } + } + + impl<'repo> MerkleWriteSession for SessionEnum<'repo> { + type Error = StoreError; + type NodeSession<'a> + = NodeSessionEnum<'a, 'repo> + where + Self: 'a; + + fn create_node<'a, N: TMerkleTreeNode>( + &'a self, + node: &N, + parent_id: Option, + ) -> Result, StoreError> { + match self { + $( + Self::$variant(s) => s + .create_node(node, parent_id) + .map(NodeSessionEnum::$variant) + .map_err(StoreError::$variant) + ),* + } + } + + fn finish(self) -> Result<(), StoreError> { + match self { + $( Self::$variant(s) => s.finish().map_err(StoreError::$variant) ),* + } + } + } + + impl<'a, 'repo: 'a> NodeWriteSession for NodeSessionEnum<'a, 'repo> { + type Error = StoreError; + + fn node_id(&self) -> &MerkleHash { + match self { + $( Self::$variant(n) => n.node_id() ),* + } + } + + fn add_child(&mut self, child: &N) -> Result<(), StoreError> { + match self { + $( Self::$variant(n) => n.add_child(child).map_err(StoreError::$variant) ),* + } + } + + fn finish(self) -> Result<(), StoreError> { + match self { + $( Self::$variant(n) => n.finish().map_err(StoreError::$variant) ),* + } + } + } + }; +} + +// === Backend registry. === +// Adding a new backend is a single line: `Variant => Backend<'repo>, error = Backend's Error type`. +// Every dispatch impl and the `StoreError` variant are regenerated by the macro. +define_merkle_store_dispatch! { + File => FileBackend<'repo>, error = MerkleDbError, +} diff --git a/crates/lib/src/repositories/commits/commit_writer.rs b/crates/lib/src/repositories/commits/commit_writer.rs index 0b9c4fcd2..30e341cea 100644 --- a/crates/lib/src/repositories/commits/commit_writer.rs +++ b/crates/lib/src/repositories/commits/commit_writer.rs @@ -17,7 +17,6 @@ use crate::constants::ORIG_HEAD_FILE; use crate::constants::{HEAD_FILE, STAGED_DIR}; use crate::core::db; use crate::core::db::key_val::str_val_db; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::core::refs::with_ref_manager; use crate::core::v_latest::index::CommitMerkleTree; use crate::core::v_latest::status; @@ -27,6 +26,9 @@ use crate::model::MerkleTreeNodeType; use crate::model::NewCommit; use crate::model::NewCommitBody; use crate::model::User; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; use crate::model::merkle_tree::node::EMerkleTreeNode; use crate::model::merkle_tree::node::StagedMerkleTreeNode; use crate::model::merkle_tree::node::VNode; @@ -287,15 +289,20 @@ pub fn commit_dir_entries_with_parents( } } - let mut commit_db = MerkleNodeDB::open_read_write(repo, &node, parent_id)?; + let store = repo.merkle_store(); + let session = store.begin()?; + let mut commit_ns = session.create_node(&node, parent_id)?; write_commit_entries( repo, commit_id, - &mut commit_db, + &session, + &mut commit_ns, &dir_hash_db, &dir_hashes, &vnode_entries, )?; + commit_ns.finish()?; + session.finish()?; commit_progress_bar.finish_and_clear(); Ok(node.to_commit()) @@ -382,17 +389,22 @@ pub fn commit_dir_entries_new( } } - let mut commit_db = MerkleNodeDB::open_read_write(repo, &node, parent_id)?; + let store = repo.merkle_store(); + let session = store.begin()?; + let mut commit_ns = session.create_node(&node, parent_id)?; write_commit_entries( repo, commit_id, - &mut commit_db, + &session, + &mut commit_ns, &dir_hash_db, &dir_hashes, &vnode_entries, )?; + commit_ns.finish()?; + session.finish()?; commit_progress_bar.finish_and_clear(); // Remove all the directories that are staged for removal @@ -496,15 +508,20 @@ pub fn commit_dir_entries( } } - let mut commit_db = MerkleNodeDB::open_read_write(repo, &node, None)?; + let store = repo.merkle_store(); + let session = store.begin()?; + let mut commit_ns = session.create_node(&node, None)?; write_commit_entries( repo, commit_id, - &mut commit_db, + &session, + &mut commit_ns, &dir_hash_db, &dir_hashes, &vnode_entries, )?; + commit_ns.finish()?; + session.finish()?; commit_progress_bar.finish_and_clear(); // Remove all the directories that are staged for removal @@ -782,10 +799,12 @@ pub fn compute_commit_id(new_commit: &NewCommit) -> Result( repo: &LocalRepository, commit_id: MerkleHash, - commit_db: &mut MerkleNodeDB, + session: &'a S, + commit_ns: &mut S::NodeSession<'a>, dir_hash_db: &DBWithThreadMode, dir_hashes: &HashMap, entries: &HashMap, Vec)>, @@ -794,7 +813,7 @@ fn write_commit_entries( let mut total_written = 0; let root_path = PathBuf::from(""); let dir_node = compute_dir_node(repo, commit_id, entries, dir_hashes, &root_path)?; - commit_db.add_child(&dir_node)?; + commit_ns.add_child(&dir_node)?; total_written += 1; str_val_db::put( @@ -802,17 +821,19 @@ fn write_commit_entries( root_path.to_str().unwrap(), &dir_node.hash().to_string(), )?; - let dir_db = MerkleNodeDB::open_read_write(repo, &dir_node, Some(commit_id))?; + let mut dir_ns = session.create_node(&dir_node, Some(commit_id))?; r_create_dir_node( repo, + session, commit_id, - &mut Some(dir_db), + Some(&mut dir_ns), dir_hash_db, dir_hashes, entries, root_path, &mut total_written, )?; + dir_ns.finish()?; // The dir_hash_db was pre-populated from the previous commit, so // removed directories still have stale entries that must be deleted; @@ -845,10 +866,11 @@ fn cache_invalidate_dir_hash_db<'a>( } #[allow(clippy::too_many_arguments)] -fn r_create_dir_node( +fn r_create_dir_node<'a, S: MerkleWriteSession>( repo: &LocalRepository, + session: &'a S, commit_id: MerkleHash, - maybe_dir_db: &mut Option, + mut maybe_parent_ns: Option<&mut S::NodeSession<'a>>, dir_hash_db: &DBWithThreadMode, dir_hashes: &HashMap, entries: &HashMap, Vec)>, @@ -872,75 +894,56 @@ fn r_create_dir_node( num_entries: vnode.entries.len() as u64, }; let vnode_obj = VNode::new(repo, opts)?; - if let Some(dir_db) = maybe_dir_db { - dir_db.add_child(&vnode_obj)?; + // Capture the parent's hash before we reborrow `maybe_parent_ns` mutably. + let parent_id_for_vnode = maybe_parent_ns.as_deref().map(|ns| *ns.node_id()); + if let Some(parent_ns) = maybe_parent_ns.as_deref_mut() { + parent_ns.add_child(&vnode_obj)?; *total_written += 1; } - // log::debug!( - // "Processing vnode {} with {} entries", - // vnode.id, - // vnode.entries.len() - // ); - - let mut vnode_db = MerkleNodeDB::open_read_write( - repo, - &vnode_obj, - maybe_dir_db.as_ref().map(|db| db.node_id), - )?; + let mut vnode_ns = session.create_node(&vnode_obj, parent_id_for_vnode)?; for entry in vnode.entries.iter() { - // log::debug!("Processing entry {} in vnode {}", entry.node, vnode.id); + log::trace!("Processing entry {} in vnode {}", entry.node, vnode.id); match &entry.node.node { EMerkleTreeNode::Directory(node) => { // If the dir has updates, we need a new dir db let dir_path = entry.node.maybe_path()?; - // log::debug!("Processing dir node {:?}", dir_path); let dir_node = if entries.contains_key(&dir_path) { let dir_node = compute_dir_node(repo, commit_id, entries, dir_hashes, &dir_path)?; - // if let Some(vnode_db) = &mut maybe_vnode_db { - vnode_db.add_child(&dir_node)?; + vnode_ns.add_child(&dir_node)?; *total_written += 1; - // } - - // if the vnode is new, we need a new dir db - // let mut child_db = if maybe_vnode_db.is_some() { - let mut child_db = Some(MerkleNodeDB::open_read_write( - repo, - &dir_node, - Some(vnode.id), - )?); + let mut child_ns = session.create_node(&dir_node, Some(vnode.id))?; r_create_dir_node( repo, + session, commit_id, - &mut child_db, + Some(&mut child_ns), dir_hash_db, dir_hashes, entries, &dir_path, total_written, )?; + child_ns.finish()?; + dir_node } else { - // log::debug!("r_create_dir_node skipping {:?}", dir_path); // Look up the old dir node and reference it let Some(old_dir_node) = CommitMerkleTree::read_node(repo, node.hash(), false)? else { - // log::debug!( - // "r_create_dir_node could not read old dir node {:?}", - // node.hash - // ); + log::trace!( + "r_create_dir_node could not read old dir node {:?}", + node.hash(), + ); continue; }; let dir_node = old_dir_node.dir()?; - - // if let Some(vnode_db) = &mut maybe_vnode_db { - vnode_db.add_child(&dir_node)?; + vnode_ns.add_child(&dir_node)?; *total_written += 1; - // } dir_node }; @@ -955,13 +958,6 @@ fn r_create_dir_node( let file_path = PathBuf::from(&file_node.name()); let file_name = file_path.file_name().unwrap().to_str().unwrap(); - // log::debug!( - // "Processing file {:?} in vnode {} in commit {}", - // path, - // vnode.id, - // commit_id - // ); - // Just single file chunk for now let chunks = vec![file_node.hash().to_u128()]; file_node.set_chunk_hashes(chunks); @@ -973,9 +969,8 @@ fn r_create_dir_node( file_node.set_last_commit_id(&last_commit_id); file_node.set_name(file_name); - vnode_db.add_child(&file_node)?; + vnode_ns.add_child(&file_node)?; *total_written += 1; - // } } _ => { return Err(OxenError::basic_str(format!( @@ -985,6 +980,7 @@ fn r_create_dir_node( } } } + vnode_ns.finish()?; } log::debug!("Finished processing dir {path:?} total written {total_written} entries"); diff --git a/crates/lib/src/repositories/tree.rs b/crates/lib/src/repositories/tree.rs index 4e6334022..f1444304d 100644 --- a/crates/lib/src/repositories/tree.rs +++ b/crates/lib/src/repositories/tree.rs @@ -9,7 +9,6 @@ use tar::Archive; use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR}; use crate::core::commit_sync_status; -use crate::core::db::merkle_node::MerkleNodeDB; use crate::core::db::merkle_node::merkle_node_db::node_db_path; use crate::core::node_sync_status; use crate::core::v_latest::index::CommitMerkleTree as CommitMerkleTreeLatest; @@ -17,6 +16,9 @@ use crate::core::v_latest::index::CommitMerkleTree; use crate::core::v_old::v0_19_0::index::CommitMerkleTree as CommitMerkleTreeV0_19_0; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_writer::{ + MerkleWriteSession, MerkleWriter, NodeWriteSession, +}; use crate::model::merkle_tree::node::{ CommitNode, DirNodeWithPath, EMerkleTreeNode, FileNode, FileNodeWithDir, MerkleTreeNode, }; @@ -1024,12 +1026,16 @@ pub fn unpack_nodes( } /// Write a node to disk +// TODO: this should just accept `&CommitNode` pub fn write_tree(repo: &LocalRepository, node: &MerkleTreeNode) -> Result<(), OxenError> { let EMerkleTreeNode::Commit(commit_node) = &node.node else { return Err(OxenError::basic_str("Expected commit node")); }; let commit_node = CommitNode::new(repo, commit_node.get_opts())?; - p_write_tree(repo, node, &commit_node)?; + let store = repo.merkle_store(); + let session = store.begin()?; + p_write_tree(&session, node, &commit_node)?; + session.finish()?; Ok(()) } @@ -1038,34 +1044,36 @@ pub fn write_tree(repo: &LocalRepository, node: &MerkleTreeNode) -> Result<(), O /// Recursively writes the node and all its children to disk. To write a full tree, the node /// (`node_impl`) **MUST** be the root of the tree -- i.e. a `Commit` node. /// -/// [1] https://github.com/rust-lang/rust/issues/20041) -fn p_write_tree( - repo: &LocalRepository, +// [1] https://github.com/rust-lang/rust/issues/20041 +// TODO: this should just accept `MerkleTreeNode` since the `node_impl` always comes from this +fn p_write_tree( + session: &S, node: &MerkleTreeNode, node_impl: &N, ) -> Result<(), OxenError> { let parent_id = node.parent_id; - let mut db = MerkleNodeDB::open_read_write(repo, node_impl, parent_id)?; + let mut ns = session.create_node(node_impl, parent_id)?; for child in &node.children { match &child.node { EMerkleTreeNode::VNode(vnode) => { - db.add_child(vnode)?; - p_write_tree(repo, child, vnode)?; + ns.add_child(vnode)?; + p_write_tree(session, child, vnode)?; } EMerkleTreeNode::Directory(dir_node) => { - db.add_child(dir_node)?; - p_write_tree(repo, child, dir_node)?; + ns.add_child(dir_node)?; + p_write_tree(session, child, dir_node)?; } EMerkleTreeNode::File(file_node) => { - db.add_child(file_node)?; + ns.add_child(file_node)?; } node => { + // TODO: change this to `return Err(OxenError::DisallowedNodeWrite(node.clone()));` panic!("p_write_tree Unexpected node type: {node:?}"); } } } - db.close()?; + ns.finish()?; Ok(()) }