Refactor to use Merkle{Packer,Unpacker} traits throughout#516
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughMerkle node transport switched from client-side tar/gzip archives to streaming pack/unpack via the merkle store. Uploads stream backend pack output into HTTP bodies; downloads stream HTTP response bytes into backend unpack. A byte-estimate helper and progress APIs were added to support incremental upload progress. ChangesMerkle Node Streaming Transport
Sequence Diagram(s)sequenceDiagram
participant Client as API Client
participant PackTask as spawn_blocking Pack Task
participant Duplex as Duplex Pipe<br/>(Reader/Writer)
participant ReqBody as reqwest::Body<br/>with ReaderStream
participant Server as HTTP Server
Client->>PackTask: start packing nodes (SyncIoBridge writer)
Client->>Client: set progress "Pushing N nodes" and estimate total bytes
par stream
PackTask->>Duplex: write packed tar-gz bytes via writer
Client->>Duplex: read bytes via ReaderStream
Client->>Client: inc progress per chunk
Client->>ReqBody: wrap ReaderStream as Body
end
Client->>Server: POST /tree/nodes with streamed Body
Server-->>Client: HTTP response
Client->>PackTask: await join handle
PackTask-->>Client: pack complete or panic
Client->>Client: map panic to OxenError if needed
sequenceDiagram
participant Client as API Client
participant Server as HTTP Server
participant RespStream as HTTP Response Stream
participant StreamReader as StreamReader
participant SyncBridge as SyncIoBridge
participant UnpackTask as spawn_blocking Unpack Task
participant Backend as MerkleStore Backend
Client->>Server: GET /tree/nodes
Server-->>RespStream: stream tar-gz bytes
Client->>StreamReader: convert bytes_stream() -> AsyncRead
Client->>SyncBridge: wrap AsyncRead in SyncIoBridge
Client->>UnpackTask: start unpack(reader) in spawn_blocking
UnpackTask->>Backend: backend.unpack(reader, UnpackOptions::Overwrite)
Backend->>Backend: validate entries, extract hashes, write files
UnpackTask-->>Client: return result or panic
Client->>Client: map panic to OxenError if needed
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/lib/src/core/db/merkle_node/file_backend.rs`:
- Around line 715-727: The loop currently continues before recording parsed
merkle hashes, so when SkipExisting short-circuits an entry the hash is never
added; modify the code in the loop around dst_path.exists() / overwrite_existing
so you call extract_hash_from_entry_path(&dst_path, oxen_hidden)? (or otherwise
record the parsed hash from MerkleUnpacker::unpack) before executing the
continue, ensuring existing node directories are included in the returned hash
set; update any handling around file.unpack and the SkipExisting behavior to
preserve existing hashes.
- Around line 687-692: The loop over tar entries (the `for entry in entries`
block in file_backend.rs) currently logs and continues on tar entry decode
failures; change this to fail the unpack instead of continuing: on the `let
Ok(mut file) = entry else { ... }` branch return an Err (propagating or mapping
the original error) from the enclosing unpack/extract function (rather than
continue), so parser/transport errors abort like the path-structure checks; you
can replace the manual match with `let mut file = entry.map_err(|e| /*
wrap/convert to your crate error */)?;` or explicitly return an Err after
logging to preserve the original error context.
- Around line 693-719: Reject absolute/rooted tar paths before joining by
extending the path-traversal guard: in the code handling each tar entry (the
block using variable path from file.path()), check for Component::RootDir and
Component::Prefix(_) (and/or path.is_absolute()) in addition to
Component::ParentDir and return Err(MerkleDbError::PathTraversal(...)) if any
are found; ensure this check runs before computing dst_path or calling
file.unpack, so absolute Windows or Unix paths cannot discard oxen_hidden when
joined (refer to symbols: path, Component::ParentDir, Component::RootDir,
Component::Prefix, dst_path, oxen_hidden, file.unpack).
- Around line 852-859: The VFS branch currently always calls
util::fs::copy_dir_all(tmp.path(), &oxen_hidden) which overwrites files and
ignores UnpackOptions::SkipExisting; change this so when overwrite_existing
(derived from UnpackOptions) is false you perform a copy that checks destination
existence and skips existing files instead of unconditionally overwriting.
Locate the VFS block around self.repo.is_vfs() where extract_tar_under(...)
returns hashes and replace the direct copy_dir_all call with either a copy
routine that accepts a skip-existing flag or an explicit walk of tmp.path() that
copies each file into oxen_hidden only if the destination doesn't already exist
(use the same path resolution logic as the non-VFS path to ensure consistent
behavior).
In `@crates/lib/src/model/repository/local_repository.rs`:
- Line 7: The method merkle_store() currently returns/uses dyn MerkleStore which
erases the transport methods; change merkle_store() to return a
TransportableMerkleStore so callers can call pack_* / unpack. Locate the
merkle_store() method and its signature(s) (and any usages inside
LocalRepository) and replace the return type and trait object (dyn MerkleStore)
with TransportableMerkleStore, update imports (ensure TransportableMerkleStore
is used instead of MerkleStore) and adjust any places that call merkle_store()
to match the new concrete trait type rather than the erased dyn trait.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 60de4f36-3d69-417c-bb74-dd7b790244ea
📒 Files selected for processing (6)
crates/lib/src/api/client/tree.rscrates/lib/src/core/db/merkle_node/file_backend.rscrates/lib/src/core/progress/push_progress.rscrates/lib/src/core/progress/sync_progress.rscrates/lib/src/model/repository/local_repository.rscrates/lib/src/repositories/tree.rs
| if dst_path.exists() && !overwrite_existing { | ||
| log::info!("Node already exists at {dst_path:?}, skipping"); | ||
| continue; | ||
| } | ||
| file.unpack(&dst_path)?; | ||
|
|
||
| // Extract the merkle hash from this entry's path, if it identifies one. | ||
| // | ||
| // After the path-resolution above, `dst_path` is of the form | ||
| // `<oxen_hidden>/tree/nodes/<rest>`. We classify entries by the SHAPE | ||
| // of `<rest>`, never by whether components happen to be hex. We assume that | ||
| // we have the hex-encoded hash as the `{prefix}/{suffix}` dirs. | ||
| if let Some(hash) = extract_hash_from_entry_path(&dst_path, oxen_hidden)? { |
There was a problem hiding this comment.
Record parsed hashes before SkipExisting short-circuits.
MerkleUnpacker::unpack is documented to return hashes parsed from the tarball even when entries are skipped for SkipExisting, but this continue happens before extract_hash_from_entry_path(...). Any already-present node directory therefore disappears from the returned set.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@crates/lib/src/core/db/merkle_node/file_backend.rs` around lines 715 - 727,
The loop currently continues before recording parsed merkle hashes, so when
SkipExisting short-circuits an entry the hash is never added; modify the code in
the loop around dst_path.exists() / overwrite_existing so you call
extract_hash_from_entry_path(&dst_path, oxen_hidden)? (or otherwise record the
parsed hash from MerkleUnpacker::unpack) before executing the continue, ensuring
existing node directories are included in the returned hash set; update any
handling around file.unpack and the SkipExisting behavior to preserve existing
hashes.
d9c8c29 to
3e16ed9
Compare
bc22990 to
7647fe2
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/lib/src/api/client/tree.rs`:
- Around line 360-374: The SyncIoBridge and StreamReader must be constructed
inside the blocking task to avoid capturing the async runtime; move the creation
of StreamReader::new(res.bytes_stream().map_err(std::io::Error::other)) and
SyncIoBridge::new(...) into the closure passed to tokio::task::spawn_blocking,
then call repo.merkle_store().unpack(&mut sync_reader, UnpackOptions::Overwrite)
inside that closure; ensure res (or its bytes stream) can be moved into the
closure (clone or take ownership) so the blocking task owns the StreamReader and
SyncIoBridge.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: af626864-5374-4768-a949-936d719d679b
📒 Files selected for processing (6)
crates/lib/src/api/client/tree.rscrates/lib/src/core/db/merkle_node/file_backend.rscrates/lib/src/core/progress/push_progress.rscrates/lib/src/core/progress/sync_progress.rscrates/lib/src/model/repository/local_repository.rscrates/lib/src/repositories/tree.rs
✅ Files skipped from review due to trivial changes (1)
- crates/lib/src/core/progress/push_progress.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/lib/src/core/db/merkle_node/file_backend.rs
3e16ed9 to
5009a3a
Compare
7647fe2 to
f878708
Compare
|
NOTE: Stacked PR! Must merge #515 before merging. |
5009a3a to
484843a
Compare
f878708 to
d16d83f
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (4)
crates/lib/src/core/db/merkle_node/file_backend.rs (4)
686-690:⚠️ Potential issue | 🟠 Major | ⚡ Quick winFail the unpack on tar entry decode errors.
This branch logs and continues on a broken tar entry, which can leave a partially-installed repo while still returning
Ok(...). The TODO at line 688 acknowledges this should be raised to the caller.for entry in entries { - let Ok(mut file) = entry else { - log::error!("Could not unpack file in merkle tar archive"); - // TODO: raise this error to the caller instead!? - continue; - }; + let mut file = entry.map_err(|e| MerkleDbError::CannotReadMerkle(e))?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/lib/src/core/db/merkle_node/file_backend.rs` around lines 686 - 690, The block that currently does "let Ok(mut file) = entry else { log::error!(...); continue; }" should not swallow tar entry decode errors; instead propagate failure to the caller. Replace the logging+continue with returning an Err (or converting the tar::Entry error into the function's error type) so the unpack operation fails fast and the caller can abort cleanup; reference the exact pattern using "let Ok(mut file) = entry else" in file_backend.rs and ensure you wire the error with context (e.g., via anyhow::Context or map_err) so callers receive a descriptive error rather than a partial install.
713-716:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRecord parsed hashes before
SkipExistingshort-circuits.The
continueat line 715 happens beforeextract_hash_from_entry_pathat line 725, so skipped entries won't have their hashes recorded in the returned set. This breaks the documented contract thatunpackreturns all hashes from the tarball.Move the hash extraction before the skip check:
+ // Extract the merkle hash before potentially skipping + if let Some(hash) = extract_hash_from_entry_path(&dst_path, oxen_hidden)? { + hashes.insert(hash); + } if dst_path.exists() && !overwrite_existing { log::info!("Node already exists at {dst_path:?}, skipping"); continue; } file.unpack(&dst_path)?; - - if let Some(hash) = extract_hash_from_entry_path(&dst_path, oxen_hidden)? { - hashes.insert(hash); - } else {🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/lib/src/core/db/merkle_node/file_backend.rs` around lines 713 - 716, In unpack (file_backend.rs) the code skips entries early so extract_hash_from_entry_path is never called for skipped files; move the call to extract_hash_from_entry_path (and insertion into the hashes set) so it runs before the dst_path.exists() && !overwrite_existing check (i.e. compute and record the hash for the current tar entry immediately after parsing the entry path), ensuring the returned set from unpack contains hashes for skipped entries as well while leaving the existing SkipExisting behavior intact.
693-695:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winReject rooted tar paths before joining them.
The current guard only rejects
... Absolute Unix paths (/etc/passwd) and Windows-prefixed paths (C:\) still pass through, andPath::joinwill discardoxen_hiddenfor those inputs. A malicious archive can write outside.oxen/before validation fires.- if path.components().any(|c| matches!(c, Component::ParentDir)) { + if path.components().any(|c| matches!(c, Component::ParentDir | Component::RootDir | Component::Prefix(_))) { return Err(MerkleDbError::PathTraversal(path.display().to_string())); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/lib/src/core/db/merkle_node/file_backend.rs` around lines 693 - 695, The guard that rejects parent-directory components is insufficient because absolute/rooted paths (e.g., "/etc/passwd" or Windows "C:\...") will bypass it and cause Path::join to drop the oxen_hidden prefix; update the validation in file_backend.rs (where path and MerkleDbError::PathTraversal are used) to also reject rooted/absolute and prefix components before joining — e.g., check path.is_absolute() and/or ensure no Component::RootDir or Component::Prefix(_) are present (in addition to the existing ParentDir check) and return MerkleDbError::PathTraversal for those cases so absolute Windows/Unix paths cannot escape the oxen_hidden directory.
852-857:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftVFS staging copy ignores
UnpackOptions::SkipExisting.The VFS branch calls
copy_dir_allunconditionally at line 856, ignoring theoverwrite_existingflag. This breaks the API contract whenUnpackOptions::SkipExistingis passed — existing files will be overwritten anyway.Either pass a skip-existing flag to the copy routine, or walk the tempdir and check destination existence before copying each file.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/lib/src/core/db/merkle_node/file_backend.rs` around lines 852 - 857, The VFS branch currently always copies the temp dir into the VFS with util::fs::copy_dir_all (inside the self.repo.is_vfs() block), ignoring the UnpackOptions::SkipExisting represented by overwrite_existing; update this block so the copy honors the flag: after extracting with extract_tar_under, walk the TempDir tree and for each file/dir compute the destination path under oxen_hidden, and if overwrite_existing is false (i.e., SkipExisting) skip copying when the destination already exists; otherwise perform the copy/overwrite as before — modify the code around TempDir::new(), extract_tar_under(..., overwrite_existing) and replace the util::fs::copy_dir_all call with the conditional per-file copy logic (or use a copy helper that accepts a skip-existing boolean).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/lib/src/core/db/merkle_node/file_backend.rs`:
- Around line 817-835: The impl for MerklePacker refers to a non-existent
FileBackend<'repo> lifetime and accesses self.repo, but the actual struct is
declared without a lifetime and only has repo_path: PathBuf; fix by removing the
bogus lifetime from the impl signature (use impl MerklePacker for FileBackend)
and update pack_nodes and pack_all to call write_hashes_tar / write_all_tar with
the correct repository value (derive or open the repo from self.repo_path or
change the struct to include a repo field) so the implementation matches the
real FileBackend definition; also remove or merge any duplicate FileBackend
impls so only the correct, matching impl remains.
- Around line 530-572: There are duplicate implementations of several functions
(pack_nodes_byte_estimate, write_hashes_tar, pack_options_compression,
write_all_tar, extract_tar_under, extract_hash_from_entry_path) caused by a
merge/rebase; remove one of the duplicate groups so each symbol is defined
exactly once. Locate the two duplicated blocks (the earlier and later
implementations of those functions) and delete one whole block (prefer keeping
the more complete/updated version), then run cargo build to confirm no missing
imports/visibility changes; if any call sites expect a different signature,
reconcile the single remaining implementation to match usages.
---
Duplicate comments:
In `@crates/lib/src/core/db/merkle_node/file_backend.rs`:
- Around line 686-690: The block that currently does "let Ok(mut file) = entry
else { log::error!(...); continue; }" should not swallow tar entry decode
errors; instead propagate failure to the caller. Replace the logging+continue
with returning an Err (or converting the tar::Entry error into the function's
error type) so the unpack operation fails fast and the caller can abort cleanup;
reference the exact pattern using "let Ok(mut file) = entry else" in
file_backend.rs and ensure you wire the error with context (e.g., via
anyhow::Context or map_err) so callers receive a descriptive error rather than a
partial install.
- Around line 713-716: In unpack (file_backend.rs) the code skips entries early
so extract_hash_from_entry_path is never called for skipped files; move the call
to extract_hash_from_entry_path (and insertion into the hashes set) so it runs
before the dst_path.exists() && !overwrite_existing check (i.e. compute and
record the hash for the current tar entry immediately after parsing the entry
path), ensuring the returned set from unpack contains hashes for skipped entries
as well while leaving the existing SkipExisting behavior intact.
- Around line 693-695: The guard that rejects parent-directory components is
insufficient because absolute/rooted paths (e.g., "/etc/passwd" or Windows
"C:\...") will bypass it and cause Path::join to drop the oxen_hidden prefix;
update the validation in file_backend.rs (where path and
MerkleDbError::PathTraversal are used) to also reject rooted/absolute and prefix
components before joining — e.g., check path.is_absolute() and/or ensure no
Component::RootDir or Component::Prefix(_) are present (in addition to the
existing ParentDir check) and return MerkleDbError::PathTraversal for those
cases so absolute Windows/Unix paths cannot escape the oxen_hidden directory.
- Around line 852-857: The VFS branch currently always copies the temp dir into
the VFS with util::fs::copy_dir_all (inside the self.repo.is_vfs() block),
ignoring the UnpackOptions::SkipExisting represented by overwrite_existing;
update this block so the copy honors the flag: after extracting with
extract_tar_under, walk the TempDir tree and for each file/dir compute the
destination path under oxen_hidden, and if overwrite_existing is false (i.e.,
SkipExisting) skip copying when the destination already exists; otherwise
perform the copy/overwrite as before — modify the code around TempDir::new(),
extract_tar_under(..., overwrite_existing) and replace the
util::fs::copy_dir_all call with the conditional per-file copy logic (or use a
copy helper that accepts a skip-existing boolean).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 1e8f8ad6-c499-4014-a6f7-df4240f5de55
📒 Files selected for processing (7)
crates/lib/src/api/client/tree.rscrates/lib/src/core/db/merkle_node/file_backend.rscrates/lib/src/core/progress/push_progress.rscrates/lib/src/core/progress/sync_progress.rscrates/lib/src/core/v_latest/entries.rscrates/lib/src/model/repository/local_repository.rscrates/lib/src/repositories/tree.rs
✅ Files skipped from review due to trivial changes (4)
- crates/lib/src/core/progress/push_progress.rs
- crates/lib/src/core/v_latest/entries.rs
- crates/lib/src/core/progress/sync_progress.rs
- crates/lib/src/repositories/tree.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/lib/src/model/repository/local_repository.rs
- crates/lib/src/api/client/tree.rs
484843a to
d524e90
Compare
d16d83f to
6a466d4
Compare
11e30a5 to
5a896ca
Compare
6a466d4 to
0bbcac5
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/lib/src/api/client/tree.rs`:
- Around line 101-113: The code currently returns early on errors from
client.post().send().await or client::parse_json_body(), which drops pack_handle
without awaiting and leaks the blocking task; to fix, capture the upload/parse
result into a Result (e.g. let upload_result = (async { let res =
client.post(&url).body(...).send().await?; client::parse_json_body(&url,
res).await }).await), then always await pack_handle
(pack_handle.await.map_err(...)? ) before propagating the upload_result (return
upload_result? or return Err(upload_result.err().unwrap())). Ensure you
reference and modify the block around client.post(...).send().await,
client::parse_json_body(&url, res).await and pack_handle.await so pack_handle is
awaited in all control paths.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 38a33090-6353-431e-842a-34caa157990b
📒 Files selected for processing (6)
crates/lib/src/api/client/tree.rscrates/lib/src/core/db/merkle_node/file_backend.rscrates/lib/src/core/progress/push_progress.rscrates/lib/src/core/progress/sync_progress.rscrates/lib/src/model/repository/local_repository.rscrates/lib/src/repositories/tree.rs
✅ Files skipped from review due to trivial changes (1)
- crates/lib/src/repositories/tree.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- crates/lib/src/core/progress/sync_progress.rs
- crates/lib/src/core/progress/push_progress.rs
- crates/lib/src/core/db/merkle_node/file_backend.rs
| let res = client | ||
| .post(&url) | ||
| .body(reqwest::Body::wrap_stream(body_stream)) | ||
| .send() | ||
| .await?; | ||
| let body = client::parse_json_body(&url, res).await?; | ||
| log::debug!("upload node complete {body}"); | ||
|
|
||
| // Surface any pack error after the upload completes (the duplex reader reaching EOF | ||
| // signals pack end-of-stream; panics and Result::Err come through the join handle). | ||
| pack_handle | ||
| .await | ||
| .map_err(|e| OxenError::basic_str(format!("pack task panicked: {e}")))??; |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n crates/lib/src/api/client/tree.rs | sed -n '90,125p'Repository: Oxen-AI/Oxen
Length of output: 1614
🏁 Script executed:
cat -n crates/lib/src/api/client/tree.rs | sed -n '40,120p'Repository: Oxen-AI/Oxen
Length of output: 4237
🏁 Script executed:
rg "spawn_blocking" -B 5 -A 10 crates/lib/src/api/client/Repository: Oxen-AI/Oxen
Length of output: 7948
Always join pack_handle, even when the upload request fails.
If send().await or parse_json_body().await fail, the function returns early via ? before reaching pack_handle.await on line 111–113. Dropping the JoinHandle without awaiting leaves the blocking task running detached on the thread pool, wasting a blocking worker and resources—particularly problematic on large pushes with many nodes.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@crates/lib/src/api/client/tree.rs` around lines 101 - 113, The code currently
returns early on errors from client.post().send().await or
client::parse_json_body(), which drops pack_handle without awaiting and leaks
the blocking task; to fix, capture the upload/parse result into a Result (e.g.
let upload_result = (async { let res =
client.post(&url).body(...).send().await?; client::parse_json_body(&url,
res).await }).await), then always await pack_handle
(pack_handle.await.map_err(...)? ) before propagating the upload_result (return
upload_result? or return Err(upload_result.err().unwrap())). Ensure you
reference and modify the block around client.post(...).send().await,
client::parse_json_body(&url, res).await and pack_handle.await so pack_handle is
awaited in all control paths.
599d25e to
8bd2dcf
Compare
5a896ca to
98cb275
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
crates/lib/src/api/client/tree.rs (1)
101-113:⚠️ Potential issue | 🟠 Major | ⚡ Quick winPack task is not awaited if HTTP request fails.
If
client.post().send().await?(line 104-105) orparse_json_body().await?(line 106) fail, the function returns early via?before reachingpack_handle.awaiton lines 111-113. This drops theJoinHandlewithout awaiting, leaving the blocking task detached on the thread pool.While the task will eventually complete (or error) when the duplex writer's reader-end is dropped, explicitly awaiting ensures:
- Resources are cleaned up promptly
- Any panic in the pack task is observed rather than silently ignored
- Consistent behavior regardless of upload success/failure
🛡️ Proposed fix: capture upload result and always await pack_handle
- let res = client + let upload_result = async { + let res = client - .post(&url) - .body(reqwest::Body::wrap_stream(body_stream)) - .send() - .await?; - let body = client::parse_json_body(&url, res).await?; - log::debug!("upload node complete {body}"); + .post(&url) + .body(reqwest::Body::wrap_stream(body_stream)) + .send() + .await?; + client::parse_json_body(&url, res).await + } + .await; // Surface any pack error after the upload completes (the duplex reader reaching EOF // signals pack end-of-stream; panics and Result::Err come through the join handle). pack_handle .await .map_err(|e| OxenError::basic_str(format!("pack task panicked: {e}")))??; + let body = upload_result?; + log::debug!("upload node complete {body}"); + Ok(()) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/lib/src/api/client/tree.rs` around lines 101 - 113, The upload flow currently returns early on errors from client.post().send().await? and client::parse_json_body(&url, res).await?, dropping pack_handle's JoinHandle; change the logic to capture the upload result (store the Result of send/parse or use a local variable like upload_res/body_res rather than using `?` immediately), then always await pack_handle (call pack_handle.await and handle its Err/panic) before propagating any upload error, so that pack_handle is observed and cleaned up even when client::parse_json_body or the HTTP send fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@crates/server/src/controllers/tree.rs`:
- Around line 274-279: The code uses an undefined variable `repo` when calling
repo.merkle_store() to pack nodes; replace `repo` with the correct repository
variable name `repository` so the block uses repository.merkle_store(). Ensure
the call that references `node_hashes`, `PackOptions::ServerCanonical`, and
`&mut buffer` is updated accordingly so the buffer assignment compiles.
- Around line 328-336: The pack_nodes call in the buffer construction is
ignoring its Result; change the block that builds buffer (the let buffer = { ...
} around repository.merkle_store().pack_nodes(..., &mut buffer) }) to propagate
errors by using ? on pack_nodes so failures return an Err instead of sending a
partial/empty buffer; ensure the enclosing function (the controller handler)
returns a Result compatible with OxenError (or map the error) so the ? compiles,
and keep the same PackOptions::ServerCanonical and HashSet::from_iter([hash])
usage.
---
Duplicate comments:
In `@crates/lib/src/api/client/tree.rs`:
- Around line 101-113: The upload flow currently returns early on errors from
client.post().send().await? and client::parse_json_body(&url, res).await?,
dropping pack_handle's JoinHandle; change the logic to capture the upload result
(store the Result of send/parse or use a local variable like upload_res/body_res
rather than using `?` immediately), then always await pack_handle (call
pack_handle.await and handle its Err/panic) before propagating any upload error,
so that pack_handle is observed and cleaned up even when client::parse_json_body
or the HTTP send fails.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: d19bb5e5-9686-4a15-aef3-6c757f9e87a9
📒 Files selected for processing (7)
crates/lib/src/api/client/tree.rscrates/lib/src/core/db/merkle_node/file_backend.rscrates/lib/src/core/progress/push_progress.rscrates/lib/src/core/progress/sync_progress.rscrates/lib/src/model/repository/local_repository.rscrates/lib/src/repositories/tree.rscrates/server/src/controllers/tree.rs
✅ Files skipped from review due to trivial changes (2)
- crates/lib/src/core/progress/sync_progress.rs
- crates/lib/src/core/progress/push_progress.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/lib/src/core/db/merkle_node/file_backend.rs
0433ea2 to
8588950
Compare
98cb275 to
120f270
Compare
8588950 to
9055cc1
Compare
120f270 to
c28bcd5
Compare
9055cc1 to
225cb0c
Compare
5507f6b to
b6022bd
Compare
a82b35d to
b12893f
Compare
b6022bd to
02d9373
Compare
b12893f to
ac7ffb3
Compare
02d9373 to
4bd68a4
Compare
ac7ffb3 to
9435616
Compare
4bd68a4 to
851557c
Compare
9435616 to
66acf41
Compare
851557c to
60d4651
Compare
66acf41 to
ac538f6
Compare
60d4651 to
e148d79
Compare
ac538f6 to
62d573b
Compare
e148d79 to
9c92dd4
Compare
62d573b to
894b7d6
Compare
9c92dd4 to
d5a2e13
Compare
CleanCut
left a comment
There was a problem hiding this comment.
Couple of small things worth a look, but nothing that needs to hold this up.
| // tarball so the upload phase has a known end and a meaningful ETA. Random-ish | ||
| // merkle hash bytes compress to ~1.0×, so the uncompressed estimate is a tight | ||
| // upper bound on the bytes that will actually flow over the wire. | ||
| let estimated_upload_bytes = file_backend::pack_nodes_byte_estimate(local_repo, &nodes); |
There was a problem hiding this comment.
Looks like this slipped past the interfaces and is calling the file backend directly. Let's route it through one of the traits.
There was a problem hiding this comment.
Great catch! 🙌 Thank you for spotting this. It's a bug and an oversight in the MerklePacker trait. I'm adding this as a new trait method and moving this implementation there. The LMDB backend will need an update as it can't reuse this.
| // Tick `progress` per chunk so the user sees upload progress moving. | ||
| let progress_for_stream = Arc::clone(progress); | ||
| let body_stream = ReaderStream::new(async_reader).inspect_ok(move |chunk| { | ||
| progress_for_stream.add_bytes(chunk.len() as u64); | ||
| }); |
There was a problem hiding this comment.
Nit: the progress bar is going to stop short of 100%, which might look a little confusing to users.
The reason is that "random-ish merkle hash bytes" are only a portion of the payload. If we drill down into the actual node definitions, there's also a bunch of integers and UTF-8 text, both of which compress really well.
A couple of options here:
- the simple fix would be to add a progress.finish() at the end so it jumps to 100%, or
- a more complete fix would be to measure progress in terms of just the uncompressed bytes — set the total to the uncompressed total, and increment by uncompressed bytes as they get fed in to be packed.
There was a problem hiding this comment.
Great call out. For (2), please check me on this -- this is inverting the counting from updating as we read the tar gz bytes to updating as we write them. So we'd need to wrap the Write impl. so we could read the bytes right at write time, correct?
There was a problem hiding this comment.
I added fn raw_byte_count to MerklePacker so each backend can provide the on-disk size of the stored Merkle nodes. And I took your advice to change the progress bar to count the uncompressed bytes right as we write them. I also added in a .finish() to the bar when we done for good measure.
Thank you for this suggestion!
894b7d6 to
2a9136e
Compare
d5a2e13 to
be546fc
Compare
2a9136e to
e3c5388
Compare
be546fc to
e7956bc
Compare
e3c5388 to
12124e2
Compare
a2e457c to
0311615
Compare
Refactors oxen's Merkle tree node transport between clients and servers to use the new `MerklePacker` and `MerkleUnpacker` traits. The `LocalRepository::merkle_store()` method has been updated to return a `impl TransportableMerkleStore + '_`. And the private `merkle_store_dispatch::StoreEnum` has been updated to include dispatch to the `MerkleTransport`'s methods. Updated codepaths: - `repositories::tree::compress_tree`, `compress_nodes`, `compress_node`, `compress_commits`, and `unpack_nodes` route through the trait, deleting the inline tar/gz/file-system blocks. - `api::client::tree::create_nodes` packs via `merkle_store().pack_nodes` with `PackOptions::LegacyClientPush`; `node_download_request` unpacks via `merkle_store().unpack` with `UnpackOptions::Overwrite`. The async ↔ sync boundary is bridged with `SyncIoBridge` inside `spawn_blocking`. - `push_progress` and `sync_progress` add a `pack_bytes` length field, used to extend the bar before the pack/upload kicks off so the bar has a known end and a meaningful ETA. - `file_backend.rs` test sites switch from `FileBackend::new(&repo)` to `repo.merkle_store()`, exercising the trait. **MerklePacker::raw_byte_count** Updates the packing trait to include `raw_byte_count`, which is used as an upper bound on the total number of bytes that will be transfered if `pack_nodes` is called on a set of Merkle hashes. This is used by user-facing progress bars in `api::client::tree::create_nodes`: the number of raw bytes written (_pre compression_) is incremented in the progress bar on each write.
0311615 to
6ad4dad
Compare
Refactors oxen's Merkle tree node transport between clients
and servers to use the new
MerklePackerandMerkleUnpackertraits. The
LocalRepository::merkle_store()method has beenupdated to return a
impl TransportableMerkleStore + '_. Andthe private
merkle_store_dispatch::StoreEnumhas been updatedto include dispatch to the
MerkleTransport's methods.Updated codepaths:
repositories::tree::compress_tree,compress_nodes,compress_node,compress_commits, andunpack_nodesroute through the trait, deletingthe inline tar/gz/file-system blocks.
api::client::tree::create_nodespacks viamerkle_store().pack_nodeswith
PackOptions::LegacyClientPush;node_download_requestunpacks viamerkle_store().unpackwithUnpackOptions::Overwrite. The async ↔ syncboundary is bridged with
SyncIoBridgeinsidespawn_blocking.push_progressandsync_progressadd apack_byteslength field, usedto extend the bar before the pack/upload kicks off so the bar has a known
end and a meaningful ETA.
file_backend.rstest sites switch fromFileBackend::new(&repo)torepo.merkle_store(), exercising the trait.MerklePacker::raw_byte_count
Updates the packing trait to include
raw_byte_count, which is usedas an upper bound on the total number of bytes that will be transfered if
pack_nodesis called on a set of Merkle hashes. This is used by user-facingprogress bars in
api::client::tree::create_nodes: the number of raw byteswritten (pre compression) is incremented in the progress bar on each write.