Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 101 additions & 78 deletions crates/lib/src/api/client/tree.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
use async_compression::futures::bufread::GzipDecoder;
use async_tar::Archive;
use flate2::Compression;
use flate2::write::GzEncoder;
use futures_util::TryStreamExt;
use std::collections::HashSet;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time;
use tempfile::TempDir;
use tokio_util::io::{ReaderStream, StreamReader, SyncIoBridge};

use crate::api;
use crate::api::client;
use crate::constants::{NODES_DIR, OXEN_HIDDEN_DIR, TREE_DIR};
use crate::core::progress::push_progress::PushProgress;
use crate::core::v_latest::index::CommitMerkleTree;
use crate::error::OxenError;
use crate::model::merkle_tree::merkle_transport::{PackOptions, UnpackOptions};
use crate::model::merkle_tree::node::MerkleTreeNode;
use crate::model::{LocalRepository, MerkleHash, RemoteRepository};
use crate::opts::download_tree_opts::DownloadTreeOpts;
use crate::opts::fetch_opts::FetchOpts;
use crate::view::tree::MerkleHashResponse;
use crate::view::tree::merkle_hashes::MerkleHashes;
use crate::view::{MerkleHashesResponse, StatusMessage};
use crate::{api, util};

/// Check if a node exists in the remote repository merkle tree by hash
#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -50,60 +47,93 @@ pub async fn has_node(
}
}

/// Upload a node to the remote repository merkle tree
/// Wraps a `Write` to call [`PushProgress::add_bytes`] on each `write` call.
struct CountingWriter<W> {
inner: W,
progress: Arc<PushProgress>,
}

impl<W: Write> Write for CountingWriter<W> {
#[inline(always)]
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let n = self.inner.write(buf)?;
self.progress.add_bytes(n as u64);
Ok(n)
}
#[inline(always)]
fn flush(&mut self) -> std::io::Result<()> {
self.inner.flush()
}
}

/// Upload a set of merkle nodes to the remote repository.
///
/// Packs the nodes into the canonical tar-gz wire format and streams the bytes straight
/// into the HTTP upload body — no intermediate `Vec<u8>` is materialized. The pack runs
/// on a blocking worker (`tokio::task::spawn_blocking`) that writes into one end of a
/// `tokio::io::duplex`; the HTTP body reads from the other end through `ReaderStream`,
/// so upload and pack progress together with back-pressure.
pub async fn create_nodes(
local_repo: &LocalRepository,
remote_repo: &RemoteRepository,
nodes: HashSet<MerkleHash>,
progress: &Arc<PushProgress>,
) -> Result<(), OxenError> {
// Compress the node
log::debug!("create_nodes starting compression");
// OPT: Try Compression::fast();
let enc = GzEncoder::new(Vec::new(), Compression::default());
log::debug!("create_nodes compressing nodes");
let mut tar = tar::Builder::new(enc);
log::debug!("create_nodes creating tar");
let node_path = local_repo
.path
.join(OXEN_HIDDEN_DIR)
.join(TREE_DIR)
.join(NODES_DIR);

for (i, node_hash) in nodes.iter().enumerate() {
let dir_prefix = node_hash.to_hex_hash().node_db_prefix();
let node_dir = node_path.join(&dir_prefix);
// log::debug!(
// "create_nodes appending objects dir {:?} to tar at path {:?}",
// dir_prefix,
// node_dir
// );
progress.set_message(format!("Packing {}/{} nodes", i + 1, nodes.len()));

log::debug!("create_nodes appending dir to tar");
tar.append_dir_all(dir_prefix, node_dir)?;
}

tar.finish()?;
log::debug!("create_nodes finished tar");
let buffer: Vec<u8> = tar.into_inner()?.finish()?;
let n = nodes.len();
progress.set_message(format!("Pushing {n} nodes"));

// Extend the progress bar's total length by the uncompressed bytes on disk for
// these Merkle tree nodes. The [`CountingWriter`] will update the progress bar
// each time we write the uncompressed bytes of a Merkle tree node.
let estimated_upload_bytes = local_repo.merkle_store().raw_byte_count(&nodes);
progress.inc_total_bytes(estimated_upload_bytes);

// Pack -> duplex writer (sync) -> duplex reader (async) -> HTTP body stream.
// 64 KiB duplex buffer mirrors the server-side streaming pattern in
// `crates/server/src/controllers/versions.rs`.
let (async_writer, async_reader) = tokio::io::duplex(64 * 1024);
let repo = local_repo.clone();
let progress_for_pack = Arc::clone(progress);
let pack_handle = tokio::task::spawn_blocking(move || -> Result<(), OxenError> {
let mut counting_writer = CountingWriter {
inner: SyncIoBridge::new(async_writer),
progress: progress_for_pack,
};
repo.merkle_store().pack_nodes(
&nodes,
// Legacy client-push wire format: required so older `oxen-server` deployments
// (which pre-pend `tree/nodes/` server-side at install time) install entries
// at the right paths.
PackOptions::LegacyClientPush,
&mut counting_writer,
)?;
Ok(())
});

let body_stream = ReaderStream::new(async_reader);

// Upload the node
let uri = "/tree/nodes".to_string();
let url = api::endpoint::url_from_repo(remote_repo, &uri)?;
let client = client::builder_for_url(&url)?
.timeout(time::Duration::from_secs(120))
.build()?;
log::debug!("uploading {n} nodes to {url}");

let size = buffer.len() as u64;
log::debug!(
"uploading node of size {} to {}",
bytesize::ByteSize::b(size),
url
);
let res = client.post(&url).body(buffer.to_owned()).send().await?;
let res = client
.post(&url)
.body(reqwest::Body::wrap_stream(body_stream))
.send()
.await?;
let body = client::parse_json_body(&url, res).await?;
log::debug!("upload node complete {body}");
progress.finish();

// Surface any pack error after the upload completes (the duplex reader reaching EOF
// signals pack end-of-stream; panics and Result::Err come through the join handle).
pack_handle.await.map_err(|e| OxenError::JoinError {
context: "pack task panicked: ".to_string(),
cause: e,
})??;

Ok(())
}
Expand Down Expand Up @@ -330,6 +360,13 @@ pub async fn download_trees_between(
Ok(())
}

/// Download a merkle-tree tarball from the remote repository and unpack it into the
/// local store. Streams the response body straight into the `MerkleUnpacker` so nothing
/// buffers the whole payload in memory.
///
/// The VFS branch is preserved but no longer lives here: `FileBackend::unpack` handles
/// the `is_vfs` case internally (tempdir + `copy_dir_all` dance). That keeps the client
/// logic generic across backends.
async fn node_download_request(
local_repo: &LocalRepository,
url: impl AsRef<str>,
Expand All @@ -339,41 +376,27 @@ async fn node_download_request(
let client = client::builder_for_url(url)?
.timeout(time::Duration::from_secs(12000))
.build()?;
log::debug!("node_download_request about to send request {url}");
log::debug!("node_download_request sending request {url}");
let res = client.get(url).send().await?;
let res = client::handle_non_json_response(url, res).await?;

// The remote tar packs it in TREE_DIR/NODES_DIR
// So this will unpack it in OXEN_HIDDEN_DIR/TREE_DIR/NODES_DIR
let full_unpacked_path = local_repo.path.join(OXEN_HIDDEN_DIR);

// Create the temp path if it doesn't exist
util::fs::create_dir_all(&full_unpacked_path)?;

let reader = res
.bytes_stream()
.map_err(futures::io::Error::other)
.into_async_read();

let decoder = GzipDecoder::new(futures::io::BufReader::new(reader));
let archive = Archive::new(decoder);

// If the repo is stored on a virtual file system, re-route the nodes through a temp dir
if local_repo.is_vfs() {
let temp_dir = TempDir::new()?;
let temp_path = temp_dir.path();

// Unpack the tar in a temp dir
log::debug!("node_download_request unpacking to {temp_path:?}");
util::fs::unpack_async_tar_archive(archive, temp_path).await?;
log::debug!("Succesfully unpacked tar to temp dir");

// Copy to the repo
util::fs::copy_dir_all(&temp_dir, &full_unpacked_path)?;
} else {
// Else, unpack directly to the repo
util::fs::unpack_async_tar_archive(archive, &full_unpacked_path).await?;
}
// async Stream<Item = Result<Bytes, _>> → AsyncRead → sync Read, bridged across
// the spawn_blocking boundary so the sync trait consumes streamed bytes incrementally.
let async_reader = StreamReader::new(res.bytes_stream().map_err(std::io::Error::other));
let mut sync_reader = SyncIoBridge::new(async_reader);

let repo = local_repo.clone();
tokio::task::spawn_blocking(move || -> Result<(), OxenError> {
repo.merkle_store()
// Download path: overwrite existing files on disk
.unpack(&mut sync_reader, UnpackOptions::Overwrite)?;
Ok(())
})
.await
.map_err(|e| OxenError::JoinError {
context: "unpack task panicked: ".to_string(),
cause: e,
})??;

Ok(())
}
Expand Down
Loading
Loading