diff --git a/crates/lib/src/api/client/tree.rs b/crates/lib/src/api/client/tree.rs index 238ef4cbc..ade78ff72 100644 --- a/crates/lib/src/api/client/tree.rs +++ b/crates/lib/src/api/client/tree.rs @@ -1,19 +1,17 @@ -use async_compression::futures::bufread::GzipDecoder; -use async_tar::Archive; -use flate2::Compression; -use flate2::write::GzEncoder; use futures_util::TryStreamExt; use std::collections::HashSet; +use std::io::Write; use std::path::PathBuf; use std::sync::Arc; use std::time; -use tempfile::TempDir; +use tokio_util::io::{ReaderStream, StreamReader, SyncIoBridge}; +use crate::api; use crate::api::client; -use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR}; use crate::core::progress::push_progress::PushProgress; use crate::core::v_latest::index::CommitMerkleTree; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_transport::{PackOptions, UnpackOptions}; use crate::model::merkle_tree::node::MerkleTreeNode; use crate::model::{LocalRepository, MerkleHash, RemoteRepository}; use crate::opts::download_tree_opts::DownloadTreeOpts; @@ -21,7 +19,6 @@ use crate::opts::fetch_opts::FetchOpts; use crate::view::tree::MerkleHashResponse; use crate::view::tree::merkle_hashes::MerkleHashes; use crate::view::{MerkleHashesResponse, StatusMessage}; -use crate::{api, util}; /// Check if a node exists in the remote repository merkle tree by hash #[tracing::instrument(skip_all)] @@ -50,60 +47,93 @@ pub async fn has_node( } } -/// Upload a node to the remote repository merkle tree +/// Wraps a `Write` to call [`PushProgress::add_bytes`] on each `write` call. +struct CountingWriter { + inner: W, + progress: Arc, +} + +impl Write for CountingWriter { + #[inline(always)] + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let n = self.inner.write(buf)?; + self.progress.add_bytes(n as u64); + Ok(n) + } + #[inline(always)] + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +/// Upload a set of merkle nodes to the remote repository. +/// +/// Packs the nodes into the canonical tar-gz wire format and streams the bytes straight +/// into the HTTP upload body — no intermediate `Vec` is materialized. The pack runs +/// on a blocking worker (`tokio::task::spawn_blocking`) that writes into one end of a +/// `tokio::io::duplex`; the HTTP body reads from the other end through `ReaderStream`, +/// so upload and pack progress together with back-pressure. pub async fn create_nodes( local_repo: &LocalRepository, remote_repo: &RemoteRepository, nodes: HashSet, progress: &Arc, ) -> Result<(), OxenError> { - // Compress the node - log::debug!("create_nodes starting compression"); - // OPT: Try Compression::fast(); - let enc = GzEncoder::new(Vec::new(), Compression::default()); - log::debug!("create_nodes compressing nodes"); - let mut tar = tar::Builder::new(enc); - log::debug!("create_nodes creating tar"); - let node_path = local_repo - .path - .join(OXEN_HIDDEN_DIR) - .join(TREE_DIR) - .join(NODES_DIR); - - for (i, node_hash) in nodes.iter().enumerate() { - let dir_prefix = node_hash.to_hex_hash().node_db_prefix(); - let node_dir = node_path.join(&dir_prefix); - // log::debug!( - // "create_nodes appending objects dir {:?} to tar at path {:?}", - // dir_prefix, - // node_dir - // ); - progress.set_message(format!("Packing {}/{} nodes", i + 1, nodes.len())); - - log::debug!("create_nodes appending dir to tar"); - tar.append_dir_all(dir_prefix, node_dir)?; - } - - tar.finish()?; - log::debug!("create_nodes finished tar"); - let buffer: Vec = tar.into_inner()?.finish()?; + let n = nodes.len(); + progress.set_message(format!("Pushing {n} nodes")); + + // Extend the progress bar's total length by the uncompressed bytes on disk for + // these Merkle tree nodes. The [`CountingWriter`] will update the progress bar + // each time we write the uncompressed bytes of a Merkle tree node. + let estimated_upload_bytes = local_repo.merkle_store().raw_byte_count(&nodes); + progress.inc_total_bytes(estimated_upload_bytes); + + // Pack -> duplex writer (sync) -> duplex reader (async) -> HTTP body stream. + // 64 KiB duplex buffer mirrors the server-side streaming pattern in + // `crates/server/src/controllers/versions.rs`. + let (async_writer, async_reader) = tokio::io::duplex(64 * 1024); + let repo = local_repo.clone(); + let progress_for_pack = Arc::clone(progress); + let pack_handle = tokio::task::spawn_blocking(move || -> Result<(), OxenError> { + let mut counting_writer = CountingWriter { + inner: SyncIoBridge::new(async_writer), + progress: progress_for_pack, + }; + repo.merkle_store().pack_nodes( + &nodes, + // Legacy client-push wire format: required so older `oxen-server` deployments + // (which pre-pend `tree/nodes/` server-side at install time) install entries + // at the right paths. + PackOptions::LegacyClientPush, + &mut counting_writer, + )?; + Ok(()) + }); + + let body_stream = ReaderStream::new(async_reader); - // Upload the node let uri = "/tree/nodes".to_string(); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; let client = client::builder_for_url(&url)? .timeout(time::Duration::from_secs(120)) .build()?; + log::debug!("uploading {n} nodes to {url}"); - let size = buffer.len() as u64; - log::debug!( - "uploading node of size {} to {}", - bytesize::ByteSize::b(size), - url - ); - let res = client.post(&url).body(buffer.to_owned()).send().await?; + let res = client + .post(&url) + .body(reqwest::Body::wrap_stream(body_stream)) + .send() + .await?; let body = client::parse_json_body(&url, res).await?; log::debug!("upload node complete {body}"); + progress.finish(); + + // Surface any pack error after the upload completes (the duplex reader reaching EOF + // signals pack end-of-stream; panics and Result::Err come through the join handle). + pack_handle.await.map_err(|e| OxenError::JoinError { + context: "pack task panicked: ".to_string(), + cause: e, + })??; Ok(()) } @@ -330,6 +360,13 @@ pub async fn download_trees_between( Ok(()) } +/// Download a merkle-tree tarball from the remote repository and unpack it into the +/// local store. Streams the response body straight into the `MerkleUnpacker` so nothing +/// buffers the whole payload in memory. +/// +/// The VFS branch is preserved but no longer lives here: `FileBackend::unpack` handles +/// the `is_vfs` case internally (tempdir + `copy_dir_all` dance). That keeps the client +/// logic generic across backends. async fn node_download_request( local_repo: &LocalRepository, url: impl AsRef, @@ -339,41 +376,27 @@ async fn node_download_request( let client = client::builder_for_url(url)? .timeout(time::Duration::from_secs(12000)) .build()?; - log::debug!("node_download_request about to send request {url}"); + log::debug!("node_download_request sending request {url}"); let res = client.get(url).send().await?; let res = client::handle_non_json_response(url, res).await?; - // The remote tar packs it in TREE_DIR/NODES_DIR - // So this will unpack it in OXEN_HIDDEN_DIR/TREE_DIR/NODES_DIR - let full_unpacked_path = local_repo.path.join(OXEN_HIDDEN_DIR); - - // Create the temp path if it doesn't exist - util::fs::create_dir_all(&full_unpacked_path)?; - - let reader = res - .bytes_stream() - .map_err(futures::io::Error::other) - .into_async_read(); - - let decoder = GzipDecoder::new(futures::io::BufReader::new(reader)); - let archive = Archive::new(decoder); - - // If the repo is stored on a virtual file system, re-route the nodes through a temp dir - if local_repo.is_vfs() { - let temp_dir = TempDir::new()?; - let temp_path = temp_dir.path(); - - // Unpack the tar in a temp dir - log::debug!("node_download_request unpacking to {temp_path:?}"); - util::fs::unpack_async_tar_archive(archive, temp_path).await?; - log::debug!("Succesfully unpacked tar to temp dir"); - - // Copy to the repo - util::fs::copy_dir_all(&temp_dir, &full_unpacked_path)?; - } else { - // Else, unpack directly to the repo - util::fs::unpack_async_tar_archive(archive, &full_unpacked_path).await?; - } + // async Stream> → AsyncRead → sync Read, bridged across + // the spawn_blocking boundary so the sync trait consumes streamed bytes incrementally. + let async_reader = StreamReader::new(res.bytes_stream().map_err(std::io::Error::other)); + let mut sync_reader = SyncIoBridge::new(async_reader); + + let repo = local_repo.clone(); + tokio::task::spawn_blocking(move || -> Result<(), OxenError> { + repo.merkle_store() + // Download path: overwrite existing files on disk + .unpack(&mut sync_reader, UnpackOptions::Overwrite)?; + Ok(()) + }) + .await + .map_err(|e| OxenError::JoinError { + context: "unpack task panicked: ".to_string(), + cause: e, + })??; Ok(()) } diff --git a/crates/lib/src/core/db/merkle_node/file_backend.rs b/crates/lib/src/core/db/merkle_node/file_backend.rs index d80a07d40..9fc766533 100644 --- a/crates/lib/src/core/db/merkle_node/file_backend.rs +++ b/crates/lib/src/core/db/merkle_node/file_backend.rs @@ -21,8 +21,7 @@ use crate::model::merkle_tree::node::MerkleTreeNode; use crate::model::{LocalRepository, MerkleHash, TMerkleTreeNode}; use crate::util; -/// File-based Merkle node store backend. Implements the [`MerkleReader`], -/// [`MerkleWriter`], [`MerklePacker`], and [`MerkleUnpacker`] traits. +/// File-based Merkle node store backend. Implements the [`MerkleStore`] trait. /// /// Borrows the path to a local repository (via the [`LocalRepository`]'s [`PathBuf`]) so it /// can delegate straight to [`MerkleNodeDB`]'s existing repository-based methods without any @@ -193,50 +192,6 @@ impl NodeWriteSession for FileNodeSession { } } -/// Estimate the **uncompressed** tar payload size for [`MerklePacker::pack_nodes`] -/// over `hashes`, in bytes. Used to extend an upload progress bar's total length -/// before the pack/upload kicks off, so the bar has a known end and a meaningful ETA. -/// -/// The estimate sums each present node directory's file content sizes plus tar's -/// fixed-size overhead (one 512-byte header per file/directory entry, file content -/// padded to 512-byte multiples). It ignores the gzip ratio because the merkle -/// `node` and `children` files contain mostly random-looking hash bytes, which -/// compress to ~1.0× — so this uncompressed total is a tight upper bound on the -/// post-gzip bytes that will flow over the wire. -/// -/// Hashes whose node directory is missing on disk contribute 0, matching the -/// silent-skip semantics of [`MerklePacker::pack_nodes`]. -pub fn pack_nodes_byte_estimate(repo: &LocalRepository, hashes: &HashSet) -> u64 { - const TAR_HEADER_BYTES: u64 = 512; - const TAR_BLOCK_SIZE: u64 = 512; - - let mut total: u64 = 0; - for hash in hashes { - let node_dir = node_db_path(&repo.path, hash); - if !node_dir.exists() { - continue; - } - // The directory entry itself. - total = total.saturating_add(TAR_HEADER_BYTES); - let Ok(entries) = std::fs::read_dir(&node_dir) else { - continue; - }; - for entry in entries.flatten() { - let Ok(meta) = entry.metadata() else { - continue; - }; - if meta.is_file() { - let len = meta.len(); - let padded = len.div_ceil(TAR_BLOCK_SIZE).saturating_mul(TAR_BLOCK_SIZE); - total = total.saturating_add(TAR_HEADER_BYTES.saturating_add(padded)); - } else if meta.is_dir() { - total = total.saturating_add(TAR_HEADER_BYTES); - } - } - } - total -} - /// Pack the tar-gz wire format for a set of merkle hashes into `out`, using the /// in-tar layout and gzip compression level selected by `opts`. /// @@ -508,6 +463,48 @@ impl MerklePacker for FileBackend { write_all_tar(&self.repo_path, out, true)?; Ok(()) } + + /// Estimate the **uncompressed** packed node tar payload. + /// + /// The estimate sums each present node directory's file content sizes plus tar's + /// fixed-size overhead (one 512-byte header per file/directory entry, file content + /// padded to 512-byte multiples). It ignores the gzip ratio because the merkle + /// `node` and `children` files contain mostly random-looking hash bytes, which + /// compress to ~1.0× — so this uncompressed total is a tight upper bound on the + /// post-gzip bytes that will flow over the wire. + /// + /// Hashes whose node directory is missing on disk contribute 0, matching the + /// silent-skip semantics of [`MerklePacker::pack_nodes`]. + fn raw_byte_count(&self, hashes: &HashSet) -> u64 { + const TAR_HEADER_BYTES: u64 = 512; + const TAR_BLOCK_SIZE: u64 = 512; + + let mut total: u64 = 0; + for hash in hashes { + let node_dir = node_db_path(&self.repo_path, hash); + if !node_dir.exists() { + continue; + } + // The directory entry itself. + total = total.saturating_add(TAR_HEADER_BYTES); + let Ok(entries) = std::fs::read_dir(&node_dir) else { + continue; + }; + for entry in entries.flatten() { + let Ok(meta) = entry.metadata() else { + continue; + }; + if meta.is_file() { + let len = meta.len(); + let padded = len.div_ceil(TAR_BLOCK_SIZE).saturating_mul(TAR_BLOCK_SIZE); + total = total.saturating_add(TAR_HEADER_BYTES.saturating_add(padded)); + } else if meta.is_dir() { + total = total.saturating_add(TAR_HEADER_BYTES); + } + } + } + total + } } /// Merkle tree node unpacking implementation for the [`FileBackend`]. @@ -562,7 +559,7 @@ mod tests { session.finish()?; drop(store); - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); assert!( store .exists(commit_hash) @@ -848,23 +845,22 @@ mod tests { async fn test_transport_round_trip() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); let tmp = tempfile::TempDir::new()?; let clone = repositories::init(tmp.path())?; - let installed = FileBackend::new(&clone) + let installed = clone + .merkle_store() .unpack(&mut &packed[..], UnpackOptions::Overwrite) .expect("unpack failed"); assert!(!installed.is_empty(), "unpack installed no nodes"); for hash in &installed { assert!( - FileBackend::new(&clone) - .exists(hash) - .expect("exists failed"), + clone.merkle_store().exists(hash).expect("exists failed"), "expected installed hash {hash} to be readable" ); } @@ -888,7 +884,7 @@ mod tests { let new_pack_method = { let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -913,7 +909,7 @@ mod tests { let new_pack_method = { let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut via_trait) .expect("pack_all failed"); via_trait @@ -944,7 +940,7 @@ mod tests { let new_pack_method = { let hashes = HashSet::from_iter([hash]); let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -978,7 +974,7 @@ mod tests { hashes.insert(c.hash().expect("no hash for commit")); } let mut via_trait = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut via_trait) .expect("pack_nodes failed"); via_trait @@ -1043,7 +1039,8 @@ mod tests { let repo_new = repositories::init(tmp_new.path())?; // Old `unpack_nodes` skipped existing files; mirror that with // `UnpackOptions::SkipExisting` so the parity check is semantically faithful. - let new_hashes = FileBackend::new(&repo_new) + let new_hashes = repo_new + .merkle_store() .unpack(&mut &bytes[..], UnpackOptions::SkipExisting) .expect("new unpack failed"); @@ -1059,13 +1056,15 @@ mod tests { // Every installed hash must be readable through both stores. for h in &new_hashes { assert!( - FileBackend::new(&repo_old) + repo_old + .merkle_store() .exists(h) .expect("old repo exists check failed"), "hash {h} not readable in repo unpacked via legacy unpack_nodes" ); assert!( - FileBackend::new(&repo_new) + repo_new + .merkle_store() .exists(h) .expect("new repo exists check failed"), "hash {h} not readable in repo unpacked via trait unpack" @@ -1104,7 +1103,7 @@ mod tests { /// today), feed the **same** bytes to: /// - the old client unpack: `node_download_request_unpack_old` (the verbatim /// `unpack_async_tar_archive` install from `main`'s `node_download_request`), - /// - the new client unpack: `FileBackend::unpack(...)` (overwrite-existing + /// - the new client unpack: `merkle_store().unpack(...)` (overwrite-existing /// default, matching `unpack_async_tar_archive`'s behaviour). /// The on-disk merkle-node tree under `/tree/nodes/` must be identical /// in both target repos. The set of hashes the trait reports is also asserted to @@ -1113,7 +1112,7 @@ mod tests { async fn test_node_download_request_unpack_unchanged() -> Result<(), OxenError> { test::run_one_commit_local_repo_test_async(|repo| async move { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); @@ -1128,7 +1127,8 @@ mod tests { // New client install path: trait, with download-path overwrite semantics. let tmp_new = tempfile::TempDir::new()?; let repo_new = repositories::init(tmp_new.path())?; - let installed = FileBackend::new(&repo_new) + let installed = repo_new + .merkle_store() .unpack(&mut &packed[..], UnpackOptions::Overwrite) .expect("new unpack failed"); @@ -1161,9 +1161,7 @@ mod tests { assert!(!installed.is_empty(), "trait unpack reported no hashes"); for h in &installed { assert!( - FileBackend::new(&repo_new) - .exists(h) - .expect("exists failed"), + repo_new.merkle_store().exists(h).expect("exists failed"), "hash {h} not readable in repo unpacked via trait unpack" ); } @@ -1200,7 +1198,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect_err("path traversal must be rejected"); let msg = format!("{err}"); @@ -1238,7 +1237,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect_err("unsupported entry type must be rejected"); let msg = format!("{err}"); @@ -1285,14 +1285,15 @@ mod tests { // Pack just this hash. let hashes = HashSet::from_iter([stripped_hash]); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); // Unpack into a fresh repo and confirm the short hash made it out. let tmp = tempfile::TempDir::new()?; let target = repositories::init(tmp.path())?; - let installed = FileBackend::new(&target) + let installed = target + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect("unpack failed"); @@ -1330,7 +1331,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect_err("non-hex node id must be rejected"); let msg = format!("{err}"); @@ -1380,7 +1382,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect_err("over-deep entry must be rejected"); let msg = format!("{err}"); @@ -1424,7 +1427,8 @@ mod tests { let tmp = tempfile::TempDir::new()?; let repo = repositories::init(tmp.path())?; - let err = FileBackend::new(&repo) + let err = repo + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect_err("unknown leaf filename must be rejected"); let msg = format!("{err}"); @@ -1544,7 +1548,7 @@ mod tests { let new_pack = { let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::LegacyClientPush, &mut buf) .expect("new pack failed"); buf @@ -1566,7 +1570,7 @@ mod tests { #[test] fn test_exists_returns_false_for_missing_hash() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); assert!( !store.exists(&missing).expect("exists must not error"), @@ -1580,7 +1584,7 @@ mod tests { #[test] fn test_get_node_returns_none_for_missing_hash() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let missing = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); assert!( store @@ -1602,7 +1606,7 @@ mod tests { let commit = CommitNode::default(); let commit_hash = *commit.hash(); { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let session = store.begin().expect("begin failed"); let ns = session .create_node(&commit, None) @@ -1610,7 +1614,7 @@ mod tests { ns.finish().expect("finish node session failed"); session.finish().expect("finish session failed"); } - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let children = store .get_children(&commit_hash) .expect("get_children must not error"); @@ -1628,7 +1632,7 @@ mod tests { #[test] fn test_writer_session_with_no_nodes() -> Result<(), OxenError> { test::run_empty_local_repo_test(|repo| { - let store = FileBackend::new(&repo); + let store = repo.merkle_store(); let session = store.begin().expect("begin failed"); session .finish() @@ -1643,13 +1647,14 @@ mod tests { async fn test_unpack_empty_tarball() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&HashSet::new(), PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes(empty) must not error"); let tmp = tempfile::TempDir::new()?; let target = repositories::init(tmp.path())?; - let installed = FileBackend::new(&target) + let installed = target + .merkle_store() .unpack(&mut &buf[..], UnpackOptions::Overwrite) .expect("unpack of empty tarball must not error"); assert!( @@ -1677,7 +1682,7 @@ mod tests { hashes.insert(absent); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); @@ -1714,11 +1719,11 @@ mod tests { let mut hashes = HashSet::new(); hashes.insert(head_hash); - let estimate = pack_nodes_byte_estimate(&repo, &hashes); + let estimate = repo.merkle_store().raw_byte_count(&hashes); assert!(estimate > 0, "estimate must be non-zero for a present hash"); let mut buf = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_nodes(&hashes, PackOptions::ServerCanonical, &mut buf) .expect("pack_nodes failed"); assert!( @@ -1732,7 +1737,7 @@ mod tests { let absent = MerkleHash::new(0xDEAD_BEEF_DEAD_BEEF_DEAD_BEEF_DEAD_BEEF_u128); let absent_only: HashSet<_> = HashSet::from_iter([absent]); assert_eq!( - pack_nodes_byte_estimate(&repo, &absent_only), + repo.merkle_store().raw_byte_count(&absent_only), 0, "absent hash should contribute 0 to the estimate" ); @@ -1742,8 +1747,8 @@ mod tests { mixed.insert(head_hash); mixed.insert(absent); assert_eq!( - pack_nodes_byte_estimate(&repo, &mixed), - pack_nodes_byte_estimate(&repo, &hashes), + repo.merkle_store().raw_byte_count(&mixed), + repo.merkle_store().raw_byte_count(&hashes), "absent hash must not change the estimate when added to a present hash" ); Ok(()) @@ -1758,7 +1763,7 @@ mod tests { async fn test_unpack_via_vfs_branch() -> Result<(), OxenError> { test::run_one_commit_local_repo_test(|repo| { let mut packed = Vec::new(); - FileBackend::new(&repo) + repo.merkle_store() .pack_all(&mut packed) .expect("pack_all failed"); assert!(!packed.is_empty(), "pack_all produced empty buffer"); @@ -1768,7 +1773,8 @@ mod tests { clone.set_vfs(Some(true)); assert!(clone.is_vfs(), "vfs flag should be on for this test"); - let installed = FileBackend::new(&clone) + let installed = clone + .merkle_store() .unpack(&mut &packed[..], UnpackOptions::Overwrite) .expect("unpack via vfs branch failed"); assert!( @@ -1777,7 +1783,7 @@ mod tests { ); for h in &installed { assert!( - FileBackend::new(&clone).exists(h).expect("exists failed"), + clone.merkle_store().exists(h).expect("exists failed"), "hash {h} not readable in vfs-cloned repo" ); } diff --git a/crates/lib/src/core/progress/push_progress.rs b/crates/lib/src/core/progress/push_progress.rs index 490207258..ec4bb6552 100644 --- a/crates/lib/src/core/progress/push_progress.rs +++ b/crates/lib/src/core/progress/push_progress.rs @@ -41,6 +41,12 @@ impl PushProgress { self.sync_progress.add_bytes(bytes); } + /// Extend the underlying progress bar's total length by `delta`. See + /// [`SyncProgress::inc_total_bytes`] for usage notes. + pub fn inc_total_bytes(&self, delta: u64) { + self.sync_progress.inc_total_bytes(delta); + } + pub fn get_num_files(&self) -> u64 { self.sync_progress.get_num_files() } diff --git a/crates/lib/src/core/progress/sync_progress.rs b/crates/lib/src/core/progress/sync_progress.rs index 0297c852e..fa6a7fc02 100644 --- a/crates/lib/src/core/progress/sync_progress.rs +++ b/crates/lib/src/core/progress/sync_progress.rs @@ -72,6 +72,15 @@ impl SyncProgress { self.total_bytes = Some(total_bytes); } + /// Extend the underlying progress bar's total length by `delta`. Used when an + /// additional pre-known byte count needs to be added to a bar that was already + /// constructed with an initial total (e.g., the merkle-tree upload bytes layered + /// on top of file-content upload bytes during a push). Uses `ProgressBar`'s + /// interior mutability, so this works through `&Arc`. + pub fn inc_total_bytes(&self, delta: u64) { + self.progress_bar.inc_length(delta); + } + pub fn set_message(&self, message: impl Into>) { self.progress_bar.set_message(message); } diff --git a/crates/lib/src/error.rs b/crates/lib/src/error.rs index 9a0c22fa8..0956d057a 100644 --- a/crates/lib/src/error.rs +++ b/crates/lib/src/error.rs @@ -441,8 +441,12 @@ pub enum OxenError { RmpDecodeError(#[from] rmp_serde::decode::Error), /// Wraps any error that we get from joining tasks. - #[error("{0}")] - JoinError(#[from] JoinError), + #[error("{context}{cause}")] + JoinError { + context: String, + #[source] + cause: JoinError, + }, /// A synchronization primitive (Mutex/RwLock) was found poisoned because a thread panicked /// while holding it. Indicates a bug; should not occur in normal operation. @@ -1025,6 +1029,15 @@ impl OxenError { } } +impl From for OxenError { + fn from(error: JoinError) -> Self { + OxenError::JoinError { + context: "".to_string(), + cause: error, + } + } +} + // Manual From impls for types that need transformation impl From for OxenError { fn from(error: String) -> Self { diff --git a/crates/lib/src/model/merkle_tree/merkle_transport.rs b/crates/lib/src/model/merkle_tree/merkle_transport.rs index e5c8782fc..c4ebbb1ee 100644 --- a/crates/lib/src/model/merkle_tree/merkle_transport.rs +++ b/crates/lib/src/model/merkle_tree/merkle_transport.rs @@ -76,6 +76,12 @@ pub trait MerklePacker: Send + Sync { /// whole-tree pack on `main`. There is no legacy whole-tree variant, so this /// method does not accept [`PackOptions`]. fn pack_all(&self, out: &mut dyn Write) -> Result<(), OxenError>; + + /// Provide the total byte count necessary to store the set of Merkle hashes. + /// Used as an upper bound when packing nodes using [`Self::pack_nodes`], + /// sending them between client and server. This bound is used in the user-facing + /// progress bar so there's a known and meaningful ETA. + fn raw_byte_count(&self, hashes: &HashSet) -> u64; } /// Consume transport bytes and install the nodes they contain into the backend. diff --git a/crates/lib/src/model/repository/local_repository.rs b/crates/lib/src/model/repository/local_repository.rs index f3b41a3a8..e0e9bf21c 100644 --- a/crates/lib/src/model/repository/local_repository.rs +++ b/crates/lib/src/model/repository/local_repository.rs @@ -4,7 +4,7 @@ use crate::constants::{self, DEFAULT_VNODE_SIZE, MIN_OXEN_VERSION}; use crate::core::db::merkle_node::file_backend::FileBackend; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; -use crate::model::merkle_tree::MerkleStore; +use crate::model::merkle_tree::TransportableMerkleStore; use crate::model::merkle_tree::node::FileNode; use crate::model::{MetadataEntry, Remote, RemoteRepository}; use crate::storage::{NoopVersionStore, StorageConfig, VersionStore, create_version_store}; @@ -111,7 +111,7 @@ impl LocalRepository { // NOTE: When new backends (e.g. LMDB) are added, branch on the appropriate config // here and return a `Box::new()`. #[inline] - fn load_merkle_store(repo_path: PathBuf, _is_vfs: bool) -> Box { + fn load_merkle_store(repo_path: PathBuf, _is_vfs: bool) -> Box { // TODO: Add reading config to select Merkle store implementation that // the repository uses. // => Right now, the only option is the FileBackend. @@ -126,7 +126,7 @@ impl LocalRepository { /// Returns a boxed trait object so the trait surface stays simple and dyn-dispatch /// handles backend selection. Callers use it purely through the trait surface /// (read, write); backend selection is an implementation detail of this method. - pub fn merkle_store(&self) -> Box { + pub fn merkle_store(&self) -> Box { // **self.merkle_store Self::load_merkle_store(self.path.clone(), self.is_vfs()) } diff --git a/crates/lib/src/repositories/tree.rs b/crates/lib/src/repositories/tree.rs index b0b03e99d..7b27b30ab 100644 --- a/crates/lib/src/repositories/tree.rs +++ b/crates/lib/src/repositories/tree.rs @@ -1,21 +1,15 @@ use bytesize::ByteSize; -use flate2::Compression; -use flate2::read::GzDecoder; -use flate2::write::GzEncoder; use std::any::type_name_of_val; use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; -use std::str; -use tar::Archive; -use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR}; -use crate::core::db::merkle_node::merkle_node_db::node_db_path; use crate::core::node_sync_status; use crate::core::v_latest::index::CommitMerkleTree as CommitMerkleTreeLatest; use crate::core::v_latest::index::CommitMerkleTree; use crate::core::v_old::v0_19_0::index::CommitMerkleTree as CommitMerkleTreeV0_19_0; use crate::core::versions::MinOxenVersion; use crate::error::OxenError; +use crate::model::merkle_tree::merkle_transport::UnpackOptions; use crate::model::merkle_tree::merkle_writer::MerkleWriteSession; use crate::model::merkle_tree::node::{ CommitNode, DirNodeWithPath, EMerkleTreeNode, FileNode, FileNodeWithDir, MerkleTreeNode, @@ -841,180 +835,24 @@ pub fn cp_dir_hashes_to( } pub fn compress_tree(repository: &LocalRepository) -> Result, OxenError> { - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - compress_full_tree(repository, &mut tar)?; - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX); - + let mut buf = Vec::new(); + repository.merkle_store().pack_all(&mut buf)?; + let total_size: u64 = u64::try_from(buf.len()).unwrap_or(u64::MAX); log::debug!("Compressed entire tree size is {}", ByteSize::b(total_size)); - - Ok(buffer) -} - -pub fn compress_full_tree( - repository: &LocalRepository, - tar: &mut tar::Builder>>, -) -> Result<(), OxenError> { - // This will be the subdir within the tarball, - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR); - let nodes_dir = repository - .path - .join(OXEN_HIDDEN_DIR) - .join(TREE_DIR) - .join(NODES_DIR); - - log::debug!("Compressing tree in dir {nodes_dir:?}"); - - if nodes_dir.exists() { - tar.append_dir_all(&tar_subdir, nodes_dir)?; - } - - Ok(()) -} - -pub fn compress_nodes( - repository: &LocalRepository, - hashes: &HashSet, -) -> Result, OxenError> { - // zip up the node directories for each commit tree - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - - log::debug!("Compressing {} unique nodes...", hashes.len()); - for hash in hashes { - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - let node_dir = node_db_path(&repository.path, hash); - // log::debug!("Compressing node from dir {:?}", node_dir); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } - } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - Ok(buffer) -} - -pub fn compress_node( - repository: &LocalRepository, - hash: &MerkleHash, -) -> Result, OxenError> { - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - // zip up the node directory - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - let node_dir = node_db_path(&repository.path, hash); - - // log::debug!("Compressing node {} from dir {:?}", hash, node_dir); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX); - log::debug!( - "Compressed node {} size is {}", - hash, - ByteSize::b(total_size) - ); - - Ok(buffer) -} - -pub fn compress_commits( - repository: &LocalRepository, - commits: &Vec, -) -> Result, OxenError> { - // zip up the node directory - let enc = GzEncoder::new(Vec::new(), Compression::fast()); - let mut tar = tar::Builder::new(enc); - - for commit in commits { - let hash = commit.hash()?; - // This will be the subdir within the tarball - // so when we untar it, all the subdirs will be extracted to - // tree/nodes/... - let dir_prefix = hash.to_hex_hash().node_db_prefix(); - let tar_subdir = Path::new(TREE_DIR).join(NODES_DIR).join(dir_prefix); - - let node_dir = node_db_path(&repository.path, &hash); - log::debug!("Compressing commit from dir {node_dir:?}"); - if node_dir.exists() { - tar.append_dir_all(&tar_subdir, node_dir)?; - } - } - tar.finish()?; - - let buffer: Vec = tar.into_inner()?.finish()?; - Ok(buffer) + Ok(buf) } pub fn unpack_nodes( repository: &LocalRepository, buffer: &[u8], ) -> Result, OxenError> { - let mut hashes: HashSet = HashSet::new(); - log::debug!("Unpacking nodes from buffer..."); - let decoder = GzDecoder::new(buffer); - log::debug!("Decoder created"); - let mut archive = Archive::new(decoder); - log::debug!("Archive created"); - let Ok(entries) = archive.entries() else { - return Err(OxenError::basic_str( - "Could not unpack tree database from archive", - )); - }; - log::debug!("Extracting entries..."); - for file in entries { - let Ok(mut file) = file else { - log::error!("Could not unpack file in archive..."); - continue; - }; - let path = file.path().unwrap(); - let oxen_hidden_path = repository.path.join(OXEN_HIDDEN_DIR); - let dst_path = oxen_hidden_path.join(TREE_DIR).join(NODES_DIR).join(path); - - if let Some(parent) = dst_path.parent() { - util::fs::create_dir_all(parent).expect("Could not create parent dir"); - } - // log::debug!("create_node writing {:?}", dst_path); - if dst_path.exists() { - log::debug!("Node already exists at {dst_path:?}"); - continue; - } - file.unpack(&dst_path)?; - - // the hash is the last two path components combined - if !dst_path.ends_with("node") && !dst_path.ends_with("children") { - let id = dst_path - .components() - .rev() - .take(2) - .map(|c| c.as_os_str().to_str().unwrap()) - .collect::>() - .into_iter() - .rev() - .collect::(); - hashes.insert(id.parse()?); - } - } - Ok(hashes) + log::debug!("Unpacking nodes from buffer ({} bytes)", buffer.len()); + // `&[u8]` doesn't auto-coerce to `&mut dyn Read`, so we hand the unpacker + // a `&mut` over a fresh slice cursor that advances as bytes are consumed. + let mut reader: &[u8] = buffer; + repository + .merkle_store() + .unpack(&mut reader, UnpackOptions::SkipExisting) } /// Write a node to disk diff --git a/crates/server/src/controllers/tree.rs b/crates/server/src/controllers/tree.rs index ee1d40c68..98cb577e7 100644 --- a/crates/server/src/controllers/tree.rs +++ b/crates/server/src/controllers/tree.rs @@ -5,11 +5,13 @@ use liboxen::core::node_sync_status; use liboxen::error::OxenError; use liboxen::model::Commit; use liboxen::model::LocalRepository; +use liboxen::model::merkle_tree::PackOptions; use liboxen::view::MerkleHashesResponse; use liboxen::view::StatusMessage; use liboxen::view::tree::MerkleHashResponse; use liboxen::view::tree::merkle_hashes::MerkleHashes; +use std::collections::HashSet; use std::path::PathBuf; use liboxen::model::merkle_tree::node::{EMerkleTreeNode, MerkleTreeNode}; @@ -269,7 +271,16 @@ pub async fn download_tree_nodes( )? }; - let buffer = repositories::tree::compress_nodes(&repository, &node_hashes)?; + let buffer = { + let mut buffer = Vec::new(); + repository.merkle_store().pack_nodes( + &node_hashes, + PackOptions::ServerCanonical, + &mut buffer, + )?; + buffer + }; + let total_size: u64 = u64::try_from(buffer.len()).unwrap_or(u64::MAX); log::debug!( "Compressed {} commits size is {}", @@ -317,7 +328,15 @@ pub async fn download_node(req: HttpRequest) -> actix_web::Result