diff --git a/crates/spfs-cli/main/Cargo.toml b/crates/spfs-cli/main/Cargo.toml index 32dbe3409d..88aa7cab83 100644 --- a/crates/spfs-cli/main/Cargo.toml +++ b/crates/spfs-cli/main/Cargo.toml @@ -69,4 +69,5 @@ version = "0.51" [dev-dependencies] rstest = { workspace = true } +spfs = { workspace = true, features = ["test-fixtures"] } tempfile = { workspace = true } diff --git a/crates/spfs-cli/main/src/cmd_check.rs b/crates/spfs-cli/main/src/cmd_check.rs index 8099b302fe..ec24f604ab 100644 --- a/crates/spfs-cli/main/src/cmd_check.rs +++ b/crates/spfs-cli/main/src/cmd_check.rs @@ -90,14 +90,14 @@ impl CmdCheck { drop(checker); // clean up progress bars let spfs::check::CheckSummary { missing_tags, - checked_tags, + valid_tags, missing_objects, repaired_objects, - checked_objects, + valid_objects, missing_payloads, repaired_payloads, - checked_payloads, - checked_payload_bytes, + valid_payloads, + valid_payload_bytes, } = summary; let missing_objects = missing_objects.len(); let missing_payloads = missing_payloads.len(); @@ -105,14 +105,14 @@ impl CmdCheck { println!("{} after {duration:.0?}:", "Finished".bold()); let missing = "missing".red().italic(); let repaired = "repaired".cyan().italic(); - println!("{checked_tags:>12} tags visited ({missing_tags} {missing})"); + println!("{valid_tags:>12} tags visited ({missing_tags} {missing})"); println!( - "{checked_objects:>12} objects visited ({missing_objects} {missing}, {repaired_objects} {repaired})", + "{valid_objects:>12} objects visited ({missing_objects} {missing}, {repaired_objects} {repaired})", ); println!( - "{checked_payloads:>12} payloads visited ({missing_payloads} {missing}, {repaired_payloads} {repaired})", + "{valid_payloads:>12} payloads visited ({missing_payloads} {missing}, {repaired_payloads} {repaired})", ); - let human_bytes = match NumberPrefix::binary(checked_payload_bytes as f64) { + let human_bytes = match NumberPrefix::binary(valid_payload_bytes as f64) { NumberPrefix::Standalone(amt) => format!("{amt} bytes"), NumberPrefix::Prefixed(p, amt) => format!("{amt:.2} {}B", p.symbol()), }; diff --git a/crates/spfs-cli/main/src/cmd_info.rs b/crates/spfs-cli/main/src/cmd_info.rs index 8e463f90e3..6e8fe0b168 100644 --- a/crates/spfs-cli/main/src/cmd_info.rs +++ b/crates/spfs-cli/main/src/cmd_info.rs @@ -13,9 +13,16 @@ use spfs::find_path::ObjectPathEntry; use spfs::graph::Annotation; use spfs::io::{self, DigestFormat, Pluralize}; use spfs::prelude::*; -use spfs::{self}; +use spfs::{ + Error, + {self}, +}; use spfs_cli_common as cli; +#[cfg(test)] +#[path = "./cmd_info_test.rs"] +mod cmd_info_test; + /// Display information about the current environment, or specific items #[derive(Debug, Args)] pub struct CmdInfo { @@ -69,9 +76,21 @@ impl CmdInfo { self.pretty_print_file(&reference, &repo, self.logging.verbose as usize) .await?; } else { - let item = repo.read_ref(reference.as_str()).await?; - self.pretty_print_ref(item, &repo, self.logging.verbose as usize) - .await?; + match repo.read_ref(reference.as_str()).await { + Ok(item) => { + self.pretty_print_ref(item, &repo, self.logging.verbose as usize) + .await?; + } + Err(Error::UnknownObject(_)) => { + let digest = repo.resolve_ref(reference.as_str()).await?; + let payload_size = repo.payload_size(digest).await?; + self.pretty_print_payload(digest, payload_size, &repo) + .await?; + } + Err(err) => { + return Err(err.into()); + } + } } if !self.to_process.is_empty() { println!(); @@ -216,6 +235,31 @@ impl CmdInfo { Ok(()) } + /// Display the spfs payload information + async fn pretty_print_payload( + &mut self, + digest: spfs::encoding::Digest, + payload_size: u64, + repo: &spfs::storage::RepositoryHandle, + ) -> Result<()> { + println!( + "{}:\n {}:", + self.format_digest(digest, repo).await?, + "blob".green() + ); + println!( + " {} {}", + "digest:".bright_blue(), + self.format_digest(digest, repo).await? + ); + println!( + " {} {}", + "size:".bright_blue(), + spfs::io::format_size(payload_size) + ); + Ok(()) + } + /// Display the status of the current runtime. async fn print_global_info(&self, repo: &spfs::storage::RepositoryHandle) -> Result<()> { let runtime = spfs::active_runtime().await?; diff --git a/crates/spfs-cli/main/src/cmd_info_test.rs b/crates/spfs-cli/main/src/cmd_info_test.rs new file mode 100644 index 0000000000..3c8762c386 --- /dev/null +++ b/crates/spfs-cli/main/src/cmd_info_test.rs @@ -0,0 +1,55 @@ +// Copyright (c) Contributors to the SPK project. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/spkenv/spk + +use clap::Parser; +use rstest::rstest; +use spfs::Config; +use spfs::fixtures::*; +use spfs::prelude::*; + +use super::CmdInfo; + +#[derive(Parser)] +struct Opt { + #[command(flatten)] + info: CmdInfo, +} + +#[rstest] +#[case::fs(tmprepo("fs"))] +#[tokio::test] +async fn info_on_payload( + #[case] + #[future] + repo: TempRepo, + #[values(true, false)] use_partial_digest: bool, +) { + let repo = repo.await; + + let manifest = generate_tree(&repo).await.to_graph_manifest(); + let file = manifest + .iter_entries() + .find(|entry| entry.is_regular_file()) + .expect("at least one regular file"); + + let digest_arg = if use_partial_digest { + file.object() + .to_string() + .chars() + .take(12) + .collect::() + } else { + file.object().to_string() + }; + + let mut opt = + Opt::try_parse_from(["info", "-r", &repo.address().to_string(), &digest_arg]).unwrap(); + let config = Config::default(); + let code = opt + .info + .run(&config) + .await + .expect("`spfs info` on a file digest is successful"); + assert_eq!(code, 0, "`spfs info` on a file digest returns exit code 0"); +} diff --git a/crates/spfs-cli/main/src/cmd_pull.rs b/crates/spfs-cli/main/src/cmd_pull.rs index adf901f8fa..d8214f381f 100644 --- a/crates/spfs-cli/main/src/cmd_pull.rs +++ b/crates/spfs-cli/main/src/cmd_pull.rs @@ -4,6 +4,7 @@ use clap::Args; use miette::Result; +use spfs::sync::reporter::Summary; use spfs_cli_common as cli; /// Pull one or more objects to the local repository diff --git a/crates/spfs-cli/main/src/cmd_push.rs b/crates/spfs-cli/main/src/cmd_push.rs index c16f4367ad..74d2419fb7 100644 --- a/crates/spfs-cli/main/src/cmd_push.rs +++ b/crates/spfs-cli/main/src/cmd_push.rs @@ -4,6 +4,7 @@ use clap::Args; use miette::Result; +use spfs::sync::reporter::Summary; use spfs_cli_common as cli; /// Push one or more objects to a remote repository diff --git a/crates/spfs-cli/main/src/cmd_read.rs b/crates/spfs-cli/main/src/cmd_read.rs index 0b7eab99e3..c1208130c7 100644 --- a/crates/spfs-cli/main/src/cmd_read.rs +++ b/crates/spfs-cli/main/src/cmd_read.rs @@ -8,6 +8,10 @@ use spfs::Error; use spfs::prelude::*; use spfs_cli_common as cli; +#[cfg(test)] +#[path = "./cmd_read_test.rs"] +mod cmd_read_test; + /// Output the contents of a blob to stdout #[derive(Debug, Args)] #[clap(visible_aliases = &["read-file", "cat", "cat-file"])] @@ -26,24 +30,27 @@ pub struct CmdRead { impl CmdRead { pub async fn run(&mut self, config: &spfs::Config) -> Result { + use spfs::graph::object::Enum; + let repo = spfs::config::open_repository_from_string(config, self.repos.remote.as_ref()).await?; #[cfg(feature = "sentry")] tracing::info!(target: "sentry", "using repo: {}", repo.address()); - let item = repo.read_ref(&self.reference.to_string()).await?; - use spfs::graph::object::Enum; - let blob = match item.to_enum() { - Enum::Blob(blob) => blob, - _ => { + let digest = match repo.read_ref(&self.reference.to_string()).await.map(|fb| { + let fb_enum = fb.to_enum(); + (fb, fb_enum) + }) { + Ok((_, Enum::Blob(blob))) => *blob.digest(), + Ok((obj, _)) => { let path = match &self.path { None => { - miette::bail!("PATH must be given to read from {:?}", item.kind()); + miette::bail!("PATH must be given to read from {:?}", obj.kind()); } Some(p) => p.strip_prefix("/spfs").unwrap_or(p).to_string(), }; - let manifest = spfs::compute_object_manifest(item, &repo).await?; + let manifest = spfs::compute_object_manifest(obj, &repo).await?; let entry = match manifest.get_path(&path) { Some(e) => e, None => { @@ -55,11 +62,15 @@ impl CmdRead { tracing::error!("path is a directory or masked file: {path}"); return Ok(1); } - repo.read_blob(entry.object).await? + entry.object + } + Err(Error::UnknownObject(digest)) => digest, + Err(err) => { + return Err(err.into()); } }; - let (mut payload, filename) = repo.open_payload(*blob.digest()).await?; + let (mut payload, filename) = repo.open_payload(digest).await?; tokio::io::copy(&mut payload, &mut tokio::io::stdout()) .await .map_err(|err| Error::StorageReadError("copy of payload to stdout", filename, err))?; diff --git a/crates/spfs-cli/main/src/cmd_read_test.rs b/crates/spfs-cli/main/src/cmd_read_test.rs new file mode 100644 index 0000000000..8d5105bae7 --- /dev/null +++ b/crates/spfs-cli/main/src/cmd_read_test.rs @@ -0,0 +1,55 @@ +// Copyright (c) Contributors to the SPK project. +// SPDX-License-Identifier: Apache-2.0 +// https://github.com/spkenv/spk + +use clap::Parser; +use rstest::rstest; +use spfs::Config; +use spfs::fixtures::*; +use spfs::prelude::*; + +use super::CmdRead; + +#[derive(Parser)] +struct Opt { + #[command(flatten)] + read: CmdRead, +} + +#[rstest] +#[case::fs(tmprepo("fs"))] +#[tokio::test] +async fn read_on_payload( + #[case] + #[future] + repo: TempRepo, + #[values(true, false)] use_partial_digest: bool, +) { + let repo = repo.await; + + let manifest = generate_tree(&repo).await.to_graph_manifest(); + let file = manifest + .iter_entries() + .find(|entry| entry.is_regular_file()) + .expect("at least one regular file"); + + let digest_arg = if use_partial_digest { + file.object() + .to_string() + .chars() + .take(12) + .collect::() + } else { + file.object().to_string() + }; + + let mut opt = + Opt::try_parse_from(["read", "-r", &repo.address().to_string(), &digest_arg]).unwrap(); + let config = Config::default(); + let code = opt + .read + .run(&config) + .await + .expect("`spfs read` on a file digest is successful"); + assert_eq!(code, 0, "`spfs read` on a file digest returns exit code 0"); +} diff --git a/crates/spfs-cli/main/src/cmd_write.rs b/crates/spfs-cli/main/src/cmd_write.rs index e45ba437df..848dda6a2e 100644 --- a/crates/spfs-cli/main/src/cmd_write.rs +++ b/crates/spfs-cli/main/src/cmd_write.rs @@ -55,7 +55,7 @@ impl CmdWrite { None => Box::pin(tokio::io::BufReader::new(tokio::io::stdin())), }; - let digest = repo.commit_blob(reader).await?; + let digest = repo.commit_payload(reader).await?; tracing::info!(%digest, "created"); for tag in self.tags.iter() { diff --git a/crates/spfs-encoding/src/hash.rs b/crates/spfs-encoding/src/hash.rs index b19b433f37..dd3b773e9c 100644 --- a/crates/spfs-encoding/src/hash.rs +++ b/crates/spfs-encoding/src/hash.rs @@ -259,7 +259,10 @@ impl Decodable for String { } } -/// The first N bytes of a digest that may still be unambiguous as a reference +/// The first N bytes of a digest that may still be unambiguous as a reference. +/// +/// This type can represent a full digest, but it does not require that an item +/// exists with that digest or what type of item it is. #[derive(Deserialize, Debug, Hash, Eq, PartialEq, Ord, PartialOrd, Clone)] pub struct PartialDigest(Vec); diff --git a/crates/spfs/Cargo.toml b/crates/spfs/Cargo.toml index f3850f6b9d..4585bbfb3c 100644 --- a/crates/spfs/Cargo.toml +++ b/crates/spfs/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [features] default = [] +test-fixtures = ["dep:rstest", "dep:tracing-subscriber"] # If enabled, will create the "local" repository in a subdirectory # of the standard storage root, named "ci/pipeline_${CI_PIPELINE_ID}". @@ -68,6 +69,8 @@ prost = { workspace = true } rand = { workspace = true } relative-path = { workspace = true, features = ["serde"] } ring = { workspace = true } +criterion = { version = "0.3", features = ["async_tokio", "html_reports"] } +rstest = { workspace = true, optional = true } semver = "1.0" sentry = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } @@ -97,6 +100,9 @@ tokio-stream = { version = "0.1", features = ["fs", "net"] } tokio-util = { version = "0.7.3", features = ["compat", "io"] } tonic = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true, features = [ + "env-filter", +], optional = true } ulid = { workspace = true } unix_mode = "0.1.3" url = { version = "2.2", features = ["serde"] } diff --git a/crates/spfs/src/check.rs b/crates/spfs/src/check.rs index 4db6c6f0b8..47f8214ec4 100644 --- a/crates/spfs/src/check.rs +++ b/crates/spfs/src/check.rs @@ -7,15 +7,16 @@ use std::future::ready; use std::sync::Arc; use colored::Colorize; +use futures::StreamExt; use futures::stream::{FuturesUnordered, TryStreamExt}; use once_cell::sync::OnceCell; use progress_bar_derive_macro::ProgressBar; use tokio::sync::Semaphore; -use crate::graph::AnnotationValue; +use crate::graph::{AnnotationValue, FoundDigest}; use crate::prelude::*; use crate::sync::SyncPolicy; -use crate::sync::reporter::{SyncObjectResult, SyncPayloadResult}; +use crate::sync::reporter::{Summary, SyncObjectResult, SyncPayloadResult}; use crate::{Error, Result, encoding, graph, storage, tracking}; #[cfg(test)] @@ -120,8 +121,19 @@ where /// Validate that all of the children exist for all of the objects in the repository. pub async fn check_all_objects(&self) -> Result> { self.repo - .find_digests(graph::DigestSearchCriteria::All) - .and_then(|digest| ready(Ok(self.check_digest(digest)))) + .find_digests(&graph::DigestSearchCriteria::All) + .filter_map(|res| async move { + match res { + Ok(FoundDigest::Object(digest)) => Some(Ok(digest)), + Ok(FoundDigest::Payload(_digest)) => { + // Payloads are skipped on the basis that they have no + // child objects. + None + } + Err(err) => Some(Err(err)), + } + }) + .and_then(|digest| ready(Ok(self.check_object_digest(digest)))) .try_buffer_unordered(50) .try_collect() .await @@ -150,13 +162,12 @@ where pub async fn check_env_item(&self, item: tracking::EnvSpecItem) -> Result { let res = match item { tracking::EnvSpecItem::Digest(digest) => self - .check_digest(digest) - .await - .map(CheckEnvItemResult::Object)?, - tracking::EnvSpecItem::PartialDigest(digest) => self - .check_partial_digest(digest) + .check_object_digest(digest) .await .map(CheckEnvItemResult::Object)?, + tracking::EnvSpecItem::PartialDigest(digest) => { + self.check_partial_digest(digest).await.map(Into::into)? + } tracking::EnvSpecItem::TagSpec(tag_spec) => self .check_tag_spec(tag_spec) .await @@ -205,7 +216,7 @@ where pub async fn check_tag(&self, tag: tracking::Tag) -> Result { tracing::debug!(?tag, "Checking tag"); self.reporter.visit_tag(&tag); - let result = self.check_digest(tag.target).await?; + let result = self.check_item_digest(tag.target).await?; let res = CheckTagResult::Checked { tag: Box::new(tag), result, @@ -218,21 +229,38 @@ where pub async fn check_partial_digest( &self, partial: encoding::PartialDigest, - ) -> Result { - let digest = self.repo.resolve_full_digest(&partial).await?; - self.check_digest(digest).await + ) -> Result { + match self.repo.resolve_full_digest(&partial).await? { + FoundDigest::Object(digest) => self + .check_object_digest(digest) + .await + .map(CheckItemResult::Object), + FoundDigest::Payload(digest) => self + .check_payload(digest) + .await + .map(CheckItemResult::Payload), + } } - /// Validate that the identified object exists and all of its children. - pub async fn check_digest(&self, digest: encoding::Digest) -> Result { - self.check_digest_with_perms_opt(digest, None).await + /// Validate that the identified item exists and all of its children. + /// + /// This works for object and payload digests. + pub async fn check_item_digest(&self, digest: encoding::Digest) -> Result { + match self.check_object_digest(digest).await? { + CheckObjectResult::Missing(_) => self + .check_payload(digest) + .await + .map(CheckItemResult::Payload), + obj_result => Ok(CheckItemResult::Object(obj_result)), + } } - async fn check_digest_with_perms_opt( - &self, - digest: encoding::Digest, - perms: Option, - ) -> Result { + /// Validate that the identified object exists and all of its children. + /// + /// This only checks objects, not payloads. To check payloads, use + /// [`Self::check_payload`] + #[async_recursion::async_recursion] + pub async fn check_object_digest(&self, digest: encoding::Digest) -> Result { // don't write the digest here, as that is the responsibility // of the function that actually handles the data copying. // a short-circuit is still nice when possible, though @@ -254,12 +282,7 @@ where } Err(err) => Err(err), Ok((obj, fallback)) => { - let mut res = unsafe { - // Safety: it's unsafe to call this unless the object - // is known to exist, but we just loaded it from the repo - // or had it synced via the callback - self.check_object_with_perms_opt(obj, perms).await? - }; + let mut res = self.check_object(obj).await?; if matches!(fallback, Fallback::Repaired) { tracing::trace!(?digest, ?res, "setting repaired flag"); res.set_repaired(); @@ -271,34 +294,8 @@ where /// Validate that the identified object's children all exist. /// - /// To also check if the object exists, use [`Self::check_digest`] - /// - /// # Safety - /// - /// This function may sync payloads without checking blob data, - /// which is unsafe. This function is unsafe to call unless the object - /// is known to exist in the repository being checked - pub async unsafe fn check_object(&self, obj: graph::Object) -> Result { - // Safety: unsafe unless the object exists, we pass this up to the caller - unsafe { self.check_object_with_perms_opt(obj, None).await } - } - - /// Validate that all children of this object exist. - /// - /// Any provided permissions are associated with the blob when - /// syncing it. See [`tracking::BlobRead::permissions`]. - /// - /// # Safety - /// - /// This function may sync payloads without checking blob data, - /// which is unsafe. This function is unsafe to call unless the object - /// is known to exist in the repository being checked - #[async_recursion::async_recursion] - async unsafe fn check_object_with_perms_opt( - &self, - obj: graph::Object, - perms: Option, - ) -> Result { + /// To also check if the object exists, use [`Self::check_object_digest`] + pub async fn check_object(&self, obj: graph::Object) -> Result { use graph::object::Enum; if let Some(CheckProgress::CheckStarted) = self .processed_digests @@ -310,11 +307,9 @@ where let res = match obj.into_enum() { Enum::Layer(obj) => CheckObjectResult::Layer(self.check_layer(obj).await?.into()), Enum::Platform(obj) => CheckObjectResult::Platform(self.check_platform(obj).await?), - Enum::Blob(obj) => CheckObjectResult::Blob(unsafe { - // Safety: it is unsafe to call this function unless the blob - // is known to exist, which is the same rule we pass up to the caller - self.must_check_blob_with_perms_opt(&obj, perms).await? - }), + Enum::Blob(_obj) => { + return Err("internal error: check_object called on blob".into()); + } Enum::Manifest(obj) => CheckObjectResult::Manifest(self.check_manifest(obj).await?), }; self.reporter.checked_object(&res); @@ -323,11 +318,11 @@ where /// Validate that the identified platform's children all exist. /// - /// To also check if the platform object exists, use [`Self::check_digest`] + /// To also check if the platform object exists, use [`Self::check_object_digest`] pub async fn check_platform(&self, platform: graph::Platform) -> Result { let futures: FuturesUnordered<_> = platform .iter_bottom_up() - .map(|d| self.check_digest(*d)) + .map(|d| self.check_object_digest(*d)) .collect(); let results = futures.try_collect().await?; let res = CheckPlatformResult { @@ -340,10 +335,10 @@ where /// Validate that the identified layer's children all exist. /// - /// To also check if the layer object exists, use [`Self::check_digest`] + /// To also check if the layer object exists, use [`Self::check_object_digest`] pub async fn check_layer(&self, layer: graph::Layer) -> Result { let manifest_result = if let Some(manifest_digest) = layer.manifest() { - self.check_digest(*manifest_digest).await? + self.check_object_digest(*manifest_digest).await? } else { // This layer has no manifest, don't worry about it CheckObjectResult::Ignorable @@ -376,14 +371,14 @@ where /// Validate that the identified manifest's children all exist. /// - /// To also check if the manifest object exists, use [`Self::check_digest`] + /// To also check if the manifest object exists, use [`Self::check_object_digest`] pub async fn check_manifest(&self, manifest: graph::Manifest) -> Result { let futures: FuturesUnordered<_> = manifest .iter_entries() .filter(|e| e.kind().is_blob()) - // run through check_digest to ensure that blobs can be loaded + // run through check_digest to ensure that payloads can be loaded // from the db and allow for possible repairs - .map(|e| self.check_digest_with_perms_opt(*e.object(), Some(e.mode()))) + .map(|e| self.check_payload_with_perms_opt(*e.object(), Some(e.mode()))) .collect(); let results = futures.try_collect().await?; let res = CheckManifestResult { @@ -402,11 +397,11 @@ where let res = match annotation.value() { AnnotationValue::String(_) => CheckAnnotationResult::InternalValue, AnnotationValue::Blob(d) => { - // Need to check the child blob object and its payload exist. - let result = self.check_digest_with_perms_opt(*d, None).await?; + // Need to check the child payload exists. + let result = self.check_payload_with_perms_opt(*d, None).await?; CheckAnnotationResult::Checked { digest: *d, - result: result.try_into()?, + result, repaired: false, } } @@ -414,100 +409,28 @@ where Ok(res) } - /// Validate that the identified blob has its payload. - /// - /// To also check if the blob object exists, use [`Self::check_digest`] - /// - /// # Safety - /// This function may sync a payload without - /// syncing the blob, which is unsafe unless the blob - /// is known to exist in the repository being checked - pub async unsafe fn check_blob(&self, blob: &graph::Blob) -> Result { - let digest = blob.digest(); - if let Some(CheckProgress::CheckStarted) = self - .processed_digests - .insert(*digest, CheckProgress::CheckStarted) - { - return Ok(CheckBlobResult::Duplicate); - } - // Safety: this function may sync a payload and so - // is unsafe to call unless we know the blob exists, - // which is why this is an unsafe function - unsafe { self.must_check_blob_with_perms_opt(blob, None).await } - } - - /// Checks a blob, ignoring whether it has already been checked and - /// without logging that it has been checked. - /// - /// Any provided permissions are associated with the blob when - /// syncing it. See [`tracking::BlobRead::permissions`]. - /// - /// # Safety - /// - /// This function may sync a payload without - /// syncing the blob, which is unsafe unless the blob - /// is known to exist in the repository being checked - async unsafe fn must_check_blob_with_perms_opt( - &self, - blob: &graph::Blob, - perms: Option, - ) -> Result { - self.reporter.visit_blob(blob); - let result = unsafe { - // Safety: this function may sync a payload and so - // is unsafe to call unless we know the blob exists, - // which is why this is an unsafe function - self.check_payload_with_perms_opt(*blob.payload(), perms) - .await? - }; - let res = CheckBlobResult::Checked { - blob: blob.to_owned(), - result, - repaired: result == CheckPayloadResult::Repaired, - }; - self.reporter.checked_blob(&res); - Ok(res) - } - /// Check a payload with the provided digest, repairing it if possible. - /// - /// # Safety - /// - /// This function may repair a payload, which - /// is unsafe to do if the associated blob is not synced - /// with it or already present. - pub async unsafe fn check_payload( - &self, - digest: encoding::Digest, - ) -> Result { - // Safety: unsafe unless the blob exists, we pass this up to the caller - unsafe { self.check_payload_with_perms_opt(digest, None).await } + pub async fn check_payload(&self, digest: encoding::Digest) -> Result { + self.check_payload_with_perms_opt(digest, None).await } /// Any provided permissions are associated with the blob when /// syncing it. See [`tracking::BlobRead::permissions`]. - /// - /// # Safety - /// - /// This function may repair a payload, which - /// is unsafe to do if the associated blob is not synced - /// with it or already present. - async unsafe fn check_payload_with_perms_opt( + async fn check_payload_with_perms_opt( &self, digest: encoding::Digest, perms: Option, ) -> Result { self.reporter.visit_payload(digest); let mut result = CheckPayloadResult::Missing(digest); - if self.repo.has_payload(digest).await { - result = CheckPayloadResult::Ok; - } else if let Some(syncer) = &self.repair_with { - // Safety: this sync is unsafe unless the blob is also created - // or exists. We pass this rule up to the caller. - if let Ok(r) = unsafe { syncer.sync_payload_with_perms_opt(digest, perms).await } { - self.reporter.repaired_payload(&r); - result = CheckPayloadResult::Repaired; - } + if let Ok(size_bytes) = self.repo.payload_size(digest).await { + result = CheckPayloadResult::Ok { size_bytes }; + } else if let Some(syncer) = &self.repair_with + && let Ok(r) = syncer.sync_payload_with_perms_opt(digest, perms).await + { + let size_bytes = r.summary().synced_payload_bytes; + self.reporter.repaired_payload(&r); + result = CheckPayloadResult::Repaired { size_bytes }; } self.reporter.checked_payload(&result); Ok(result) @@ -527,7 +450,7 @@ where let Some(syncer) = &self.repair_with else { return Err(err); }; - if let Ok(result) = syncer.sync_digest(digest).await { + if let Ok(result) = syncer.sync_object_digest(digest).await { self.reporter.repaired_object(&result); self.repo .read_object(digest) @@ -594,9 +517,6 @@ pub trait CheckReporter: Send + Sync { /// Called when a blob has been identified to check fn visit_blob(&self, _blob: &graph::Blob) {} - /// Called when a blob has finished being checked - fn checked_blob(&self, _result: &CheckBlobResult) {} - /// Called when a payload has been identified to check fn visit_payload(&self, _digest: encoding::Digest) {} @@ -695,27 +615,27 @@ pub struct CheckSummary { /// The number of missing tags pub missing_tags: usize, /// The number of tags checked and found to be okay - pub checked_tags: usize, + pub valid_tags: usize, /// The missing objects that were discovered pub missing_objects: HashSet, /// The number of missing objects that were repaired pub repaired_objects: usize, /// The number of objects checked and found to be okay - pub checked_objects: usize, + pub valid_objects: usize, /// The missing payloads that were discovered pub missing_payloads: HashSet, /// The number of missing payloads that were repaired pub repaired_payloads: usize, /// The number of payloads checked and found to be okay - pub checked_payloads: usize, - /// The total number of payload bytes checked - pub checked_payload_bytes: u64, + pub valid_payloads: usize, + /// The total size of valid payloads checked, in bytes + pub valid_payload_bytes: u64, } impl CheckSummary { fn checked_one_object() -> Self { Self { - checked_objects: 1, + valid_objects: 1, ..Default::default() } } @@ -727,22 +647,22 @@ impl std::ops::AddAssign for CheckSummary { // (causing compile errors for new ones that need to be added) let CheckSummary { missing_tags, - checked_tags, + valid_tags, missing_objects, - checked_objects, - checked_payloads, + valid_objects, + valid_payloads, missing_payloads, - checked_payload_bytes, + valid_payload_bytes, repaired_objects, repaired_payloads, } = rhs; self.missing_tags += missing_tags; - self.checked_tags += checked_tags; + self.valid_tags += valid_tags; self.missing_objects.extend(missing_objects); - self.checked_objects += checked_objects; - self.checked_payloads += checked_payloads; + self.valid_objects += valid_objects; + self.valid_payloads += valid_payloads; self.missing_payloads.extend(missing_payloads); - self.checked_payload_bytes += checked_payload_bytes; + self.valid_payload_bytes += valid_payload_bytes; self.repaired_objects += repaired_objects; self.repaired_payloads += repaired_payloads; } @@ -773,6 +693,7 @@ impl CheckEnvResult { pub enum CheckEnvItemResult { Tag(CheckTagResult), Object(CheckObjectResult), + Payload(CheckPayloadResult), } impl CheckEnvItemResult { @@ -780,6 +701,16 @@ impl CheckEnvItemResult { match self { Self::Tag(r) => r.summary(), Self::Object(r) => r.summary(), + Self::Payload(r) => r.summary(), + } + } +} + +impl From for CheckEnvItemResult { + fn from(value: CheckItemResult) -> Self { + match value { + CheckItemResult::Object(r) => CheckEnvItemResult::Object(r), + CheckItemResult::Payload(r) => CheckEnvItemResult::Payload(r), } } } @@ -820,7 +751,7 @@ pub enum CheckTagResult { /// The tag was checked Checked { tag: Box, - result: CheckObjectResult, + result: CheckItemResult, }, } @@ -833,25 +764,39 @@ impl CheckTagResult { }, Self::Checked { result, .. } => { let mut summary = result.summary(); - summary.checked_tags += 1; + summary.valid_tags += 1; summary } } } } +#[derive(Clone, Debug)] +pub enum CheckItemResult { + Object(CheckObjectResult), + Payload(CheckPayloadResult), +} + +impl CheckItemResult { + pub fn summary(&self) -> CheckSummary { + match self { + CheckItemResult::Object(res) => res.summary(), + CheckItemResult::Payload(res) => res.summary(), + } + } +} + #[derive(Clone, Debug)] pub enum CheckObjectResult { - /// The object was already checked in this session + /// The item was already checked in this session Duplicate, - /// The object can be ignored, it does not need checking, it has + /// The item can be ignored, it does not need checking, it has /// no representation in the database. Ignorable, - /// The object was found to be missing from the database + /// The item was found to be missing from the database Missing(encoding::Digest), Platform(CheckPlatformResult), Layer(Box), - Blob(CheckBlobResult), Manifest(CheckManifestResult), Annotation(CheckAnnotationResult), } @@ -866,26 +811,23 @@ impl CheckObjectResult { CheckObjectResult::Missing(_) => (), CheckObjectResult::Platform(r) => r.set_repaired(), CheckObjectResult::Layer(r) => r.set_repaired(), - CheckObjectResult::Blob(r) => r.set_repaired(), CheckObjectResult::Manifest(r) => r.set_repaired(), CheckObjectResult::Annotation(r) => r.set_repaired(), } } pub fn summary(&self) -> CheckSummary { - use CheckObjectResult::*; match self { - Duplicate => CheckSummary::default(), - Ignorable => CheckSummary::default(), - Missing(digest) => CheckSummary { + CheckObjectResult::Duplicate => CheckSummary::default(), + CheckObjectResult::Ignorable => CheckSummary::default(), + CheckObjectResult::Missing(digest) => CheckSummary { missing_objects: Some(*digest).into_iter().collect(), ..Default::default() }, - Platform(res) => res.summary(), - Layer(res) => res.summary(), - Blob(res) => res.summary(), - Manifest(res) => res.summary(), - Annotation(res) => res.summary(), + CheckObjectResult::Platform(res) => res.summary(), + CheckObjectResult::Layer(res) => res.summary(), + CheckObjectResult::Manifest(res) => res.summary(), + CheckObjectResult::Annotation(res) => res.summary(), } } } @@ -940,7 +882,7 @@ impl CheckLayerResult { pub struct CheckManifestResult { pub repaired: bool, pub manifest: graph::Manifest, - pub results: Vec, + pub results: Vec, } impl CheckManifestResult { @@ -967,7 +909,7 @@ pub enum CheckAnnotationResult { /// The annotation was stored in a blob and was checked Checked { digest: encoding::Digest, - result: CheckBlobResult, + result: CheckPayloadResult, repaired: bool, }, } @@ -992,7 +934,7 @@ pub enum CheckEntryResult { /// The entry was not one that needed checking Skipped, /// The entry was checked - Checked { result: CheckBlobResult }, + Checked { result: CheckPayloadResult }, } impl CheckEntryResult { @@ -1003,80 +945,14 @@ impl CheckEntryResult { } } } - -#[derive(Clone, Debug)] -pub enum CheckBlobResult { - /// The blob was already checked in this session - Duplicate, - /// The blob was not found in the database - Missing(encoding::Digest), - /// The blob was checked - Checked { - repaired: bool, - blob: graph::Blob, - result: CheckPayloadResult, - }, -} - -impl CheckBlobResult { - /// Marks this result as being repaired. - fn set_repaired(&mut self) { - if let Self::Checked { repaired, .. } = self { - *repaired = true; - } - } - - pub fn summary(&self) -> CheckSummary { - match self { - Self::Duplicate => CheckSummary::default(), - Self::Missing(digest) => CheckSummary { - checked_objects: 1, - missing_objects: Some(*digest).into_iter().collect(), - ..Default::default() - }, - Self::Checked { - repaired, - result, - blob, - } => { - let mut summary = result.summary(); - summary += CheckSummary { - checked_objects: 1, - checked_payload_bytes: blob.size(), - repaired_objects: *repaired as usize, - ..Default::default() - }; - summary - } - } - } -} - -impl TryFrom for CheckBlobResult { - type Error = Error; - - fn try_from(value: CheckObjectResult) -> std::result::Result { - Ok(match value { - CheckObjectResult::Duplicate => CheckBlobResult::Duplicate, - CheckObjectResult::Missing(digest) => CheckBlobResult::Missing(digest), - CheckObjectResult::Blob(check_blob_result) => check_blob_result, - err => { - return Err(Error::String(format!( - "Unexpected CheckObjectResult variant when converting to CheckBlobResult: {err:?}" - ))); - } - }) - } -} - #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum CheckPayloadResult { /// The payload is missing from this repository Missing(encoding::Digest), /// The payload was missing from this repository but was repaired - Repaired, + Repaired { size_bytes: u64 }, /// The payload was checked and is present - Ok, + Ok { size_bytes: u64 }, } impl CheckPayloadResult { @@ -1086,13 +962,15 @@ impl CheckPayloadResult { missing_payloads: Some(*digest).into_iter().collect(), ..Default::default() }, - Self::Repaired => CheckSummary { - checked_payloads: 1, + Self::Repaired { size_bytes } => CheckSummary { + valid_payloads: 1, repaired_payloads: 1, + valid_payload_bytes: *size_bytes, ..Default::default() }, - Self::Ok => CheckSummary { - checked_payloads: 1, + Self::Ok { size_bytes } => CheckSummary { + valid_payloads: 1, + valid_payload_bytes: *size_bytes, ..Default::default() }, } diff --git a/crates/spfs/src/check_test.rs b/crates/spfs/src/check_test.rs index 5bf70fb325..129b193d5f 100644 --- a/crates/spfs/src/check_test.rs +++ b/crates/spfs/src/check_test.rs @@ -10,7 +10,7 @@ use spfs_encoding::prelude::*; use super::{CheckSummary, Checker}; use crate::check::CheckReporter; use crate::fixtures::*; -use crate::graph::{Database, DatabaseExt}; +use crate::graph::DatabaseExt; use crate::storage::{PayloadStorage, RepositoryExt}; #[rstest] @@ -35,7 +35,7 @@ async fn test_check_missing_payload(#[future] tmprepo: TempRepo) { .iter_entries() .filter(|e| e.is_regular_file()) .count(); - let total_objects = total_blobs + 1; //the manifest + let total_objects = 1; //the manifest let results = Checker::new(&tmprepo.repo()) .check_all_objects() @@ -45,11 +45,11 @@ async fn test_check_missing_payload(#[future] tmprepo: TempRepo) { let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); tracing::info!("{summary:#?}"); assert_eq!( - summary.checked_objects, total_objects, - "expected all items to be visited" + summary.valid_objects, total_objects, + "expected all objects to be visited" ); assert_eq!( - summary.checked_payloads, + summary.valid_payloads, total_blobs - 1, "expected all payloads to be visited except missing one" ); @@ -64,57 +64,6 @@ async fn test_check_missing_payload(#[future] tmprepo: TempRepo) { ); } -#[rstest] -#[tokio::test] -async fn test_check_missing_object(#[future] tmprepo: TempRepo) { - init_logging(); - let tmprepo = tmprepo.await; - - let manifest = generate_tree(&tmprepo).await.to_graph_manifest(); - let file = manifest - .iter_entries() - .find(|entry| entry.is_regular_file()) - .expect("at least one regular file"); - - tracing::info!(digest=%file.object(), "remove object"); - tmprepo - .remove_object(*file.object()) - .await - .expect("failed to remove object"); - - let total_blobs = manifest - .iter_entries() - .filter(|e| e.is_regular_file()) - .count(); - let total_objects = total_blobs + 1; // the manifest - - let results = Checker::new(&tmprepo.repo()) - .check_all_objects() - .await - .unwrap(); - - let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); - tracing::info!("{summary:#?}"); - assert_eq!( - summary.checked_objects, - total_objects - 1, - "expected all items to be visited except missing one" - ); - assert_eq!( - summary.checked_payloads, - total_blobs - 1, - "one payload should not be seen because of missing object" - ); - assert!( - summary.missing_objects.contains(file.object()), - "should find one missing object" - ); - assert!( - summary.missing_payloads.is_empty(), - "should see no missing payloads" - ); -} - #[rstest] #[tokio::test] async fn test_check_missing_payload_recover(#[future] tmprepo: TempRepo) { @@ -125,7 +74,7 @@ async fn test_check_missing_payload_recover(#[future] tmprepo: TempRepo) { let manifest = generate_tree(&tmprepo).await.to_graph_manifest(); let digest = manifest.digest().unwrap(); crate::Syncer::new(&tmprepo.repo(), &repo2.repo()) - .sync_digest(digest) + .sync_object_digest(digest) .await .expect("Failed to sync repos"); @@ -144,7 +93,7 @@ async fn test_check_missing_payload_recover(#[future] tmprepo: TempRepo) { .iter_entries() .filter(|e| e.is_regular_file()) .count(); - let total_objects = total_blobs + 1; //the manifest + let total_objects = 1; //the manifest let results = Checker::new(&tmprepo.repo()) .with_repair_source(&repo2.repo()) @@ -155,11 +104,11 @@ async fn test_check_missing_payload_recover(#[future] tmprepo: TempRepo) { let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); tracing::info!("{summary:#?}"); assert_eq!( - summary.checked_objects, total_objects, + summary.valid_objects, total_objects, "expected all items to be visited" ); assert_eq!( - summary.checked_payloads, total_blobs, + summary.valid_payloads, total_blobs, "expected all payloads to be visited after repair" ); assert!( @@ -186,7 +135,7 @@ async fn test_check_missing_object_recover(#[future] tmprepo: TempRepo) { let manifest = generate_tree(&tmprepo).await.to_graph_manifest(); let digest = manifest.digest().unwrap(); crate::Syncer::new(&tmprepo.repo(), &repo2.repo()) - .sync_digest(digest) + .sync_object_digest(digest) .await .expect("Failed to sync repos"); @@ -197,7 +146,7 @@ async fn test_check_missing_object_recover(#[future] tmprepo: TempRepo) { tracing::info!(digest=%file.object(), "remove object"); tmprepo - .remove_object(*file.object()) + .remove_payload(*file.object()) .await .expect("failed to remove object"); @@ -205,7 +154,7 @@ async fn test_check_missing_object_recover(#[future] tmprepo: TempRepo) { .iter_entries() .filter(|e| e.is_regular_file()) .count(); - let total_objects = total_blobs + 1; //the manifest + let total_objects = 1; //the manifest let results = Checker::new(&tmprepo.repo()) .with_repair_source(&repo2.repo()) @@ -216,18 +165,21 @@ async fn test_check_missing_object_recover(#[future] tmprepo: TempRepo) { let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); tracing::info!("{summary:#?}"); assert_eq!( - summary.checked_objects, total_objects, + summary.valid_objects, total_objects, "expected all items to be visited after repair" ); assert_eq!( - summary.checked_payloads, total_blobs, + summary.valid_payloads, total_blobs, "all payloads should be seen after repair" ); assert!( summary.missing_objects.is_empty(), "should repair missing object" ); - assert_eq!(summary.repaired_objects, 1, "should repair missing object"); + assert_eq!( + summary.repaired_payloads, 1, + "should repair missing payload" + ); assert!( summary.missing_payloads.is_empty(), "should see no missing payloads", @@ -237,6 +189,7 @@ async fn test_check_missing_object_recover(#[future] tmprepo: TempRepo) { #[derive(Default)] struct DebugReporter { checked_object_results: Mutex>, + checked_payload_results: Mutex>, } impl CheckReporter for &DebugReporter { @@ -246,19 +199,23 @@ impl CheckReporter for &DebugReporter { .unwrap() .push(result.clone()); } + + fn checked_payload(&self, result: &super::CheckPayloadResult) { + self.checked_payload_results.lock().unwrap().push(*result); + } } -/// A check on a repo that is missing an annotation blob. +/// A check on a repo that is missing an annotation payload. /// -/// The check should complete successfully and report a missing object. +/// The check should complete successfully and report a missing payload. #[rstest] #[tokio::test] -async fn check_missing_annotation_blob(#[future] tmprepo: TempRepo) { +async fn check_missing_annotation_payload(#[future] tmprepo: TempRepo) { init_logging(); let tmprepo = tmprepo.await; let blob = tmprepo - .commit_blob(Box::pin(b"this is some data".as_slice())) + .commit_payload(Box::pin(b"this is some data".as_slice())) .await .unwrap(); @@ -278,12 +235,12 @@ async fn check_missing_annotation_blob(#[future] tmprepo: TempRepo) { let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); tracing::info!("{summary:#?}"); - assert_eq!(summary.checked_objects, 2); - assert_eq!(summary.checked_payloads, 1); + assert_eq!(summary.valid_objects, 1); + assert_eq!(summary.valid_payloads, 1); } - // Remove the blob backing the annotation. - tmprepo.remove_object(blob).await.unwrap(); + // Remove the payload backing the annotation. + tmprepo.remove_payload(blob).await.unwrap(); let debug_reporter = DebugReporter::default(); @@ -295,19 +252,19 @@ async fn check_missing_annotation_blob(#[future] tmprepo: TempRepo) { let summary: CheckSummary = results.iter().map(|r| r.summary()).sum(); tracing::info!("{summary:#?}"); - assert_eq!(summary.checked_objects, 2); - assert_eq!(summary.checked_payloads, 0); + assert_eq!(summary.valid_objects, 1); + assert_eq!(summary.valid_payloads, 0); assert!( - summary.missing_objects.contains(&blob), - "should report missing annotation blob" + summary.missing_payloads.contains(&blob), + "should report missing annotation payload" ); - let checked_objects = debug_reporter.checked_object_results.lock().unwrap(); - assert_eq!(checked_objects.len(), 2); - // Confirm there is a Missing result for the annotation blob + let checked_payloads = debug_reporter.checked_payload_results.lock().unwrap(); + assert_eq!(checked_payloads.len(), 1); + // Confirm there is a Missing result for the annotation payload assert!( - checked_objects + checked_payloads .iter() - .any(|r| matches!(r, super::CheckObjectResult::Missing(digest) if *digest == blob)) + .any(|r| matches!(r, super::CheckPayloadResult::Missing(digest) if *digest == blob)) ); } diff --git a/crates/spfs/src/clean.rs b/crates/spfs/src/clean.rs index 8d6a0c3ca1..766d2b356f 100644 --- a/crates/spfs/src/clean.rs +++ b/crates/spfs/src/clean.rs @@ -529,7 +529,7 @@ where result.visited_objects += 1; if let graph::object::Enum::Blob(b) = obj.to_enum() { result.visited_payloads += 1; - self.reporter.visit_payload(&b); + self.reporter.visit_payload(b.digest()); } // This recursively calls visit_attached_objects() (this @@ -549,115 +549,74 @@ where .repo .iter_objects() // we have no interest in removing attached items - .try_filter(|(digest, _object)| ready(!self.attached.contains(digest))) + .try_filter(|item| ready(!self.attached.contains(item.digest()))) // we have already visited all attached objects // but also want to report these ones - .and_then(|obj| { - self.reporter.visit_object(&obj.1); - result.visited_objects += 1; - ready(Ok(obj)) + .inspect_ok(|item| match item { + graph::DatabaseItem::Object(_digest, flat_object) => { + self.reporter.visit_object(flat_object); + result.visited_objects += 1; + } + graph::DatabaseItem::Payload(digest) => { + self.reporter.visit_payload(digest); + result.visited_payloads += 1; + } }) - .and_then(|(digest, object)| { + .and_then(|item| { if self.dry_run { - return ready(Ok(ready(Ok((digest, object, true))).boxed())); + // XXX: this ignores must_be_older_than + return ready(Ok(ready(Ok((item, true))).boxed())); } - let future = self - .repo - .remove_object_if_older_than(self.must_be_older_than, digest) - .map(|res| { - if let Err(Error::UnknownObject(_)) = res { - return Ok(true); - } - res - }) - .map_ok(move |removed| (digest, object, removed)); - ready(Ok(future.boxed())) + let future = (match &item { + graph::DatabaseItem::Object(digest, _flat_object) => self + .repo + .remove_object_if_older_than(self.must_be_older_than, *digest) + .boxed(), + graph::DatabaseItem::Payload(digest) => self + .repo + .remove_payload_if_older_than(self.must_be_older_than, *digest) + .boxed(), + }) + .map(|res| { + if let Err(Error::UnknownObject(_)) = res { + return Ok(true); + } + res + }) + .map_ok(move |removed| (item, removed)) + .boxed(); + ready(Ok(future)) }) .try_buffer_unordered(self.removal_concurrency) - .try_filter_map(|(digest, obj, removed)| { + .try_filter_map(|(item, removed)| { if !removed { // objects that are too new to be removed become // implicitly attached - self.attached.insert(digest); + self.attached.insert(*item.digest()); } - ready(Ok(removed.then_some(obj))) + ready(Ok(removed.then_some(item))) }) - .and_then(|obj| { - self.reporter.object_removed(&obj); - ready(Ok(obj)) - }) - // also try to remove the corresponding payload - // each removed blob - .try_filter_map(|obj| match obj.into_enum() { - graph::object::Enum::Blob(blob) => ready(Ok(Some(blob))), - _ => ready(Ok(None)), - }) - .and_then(|blob| { - if self.dry_run { - return ready(Ok(ready(Ok(blob)).boxed())); + .inspect_ok(|item| match item { + graph::DatabaseItem::Object(_digest, flat_object) => { + self.reporter.object_removed(flat_object); + } + graph::DatabaseItem::Payload(digest) => { + self.reporter.payload_removed(digest); } - self.reporter.visit_payload(&blob); - result.visited_payloads += 1; - let future = self - .repo - .remove_payload(*blob.payload()) - .map(|res| { - if let Err(Error::UnknownObject(_)) = res { - return Ok(()); - } - res - }) - .map_ok(|_| blob); - ready(Ok(future.boxed())) }) - .try_buffer_unordered(self.removal_concurrency) .boxed(); let mut result = CleanResult::default(); - while let Some(blob) = stream.try_next().await? { - result.removed_payloads.insert(*blob.payload()); - self.reporter.payload_removed(&blob) - } - drop(stream); - - let mut stream = self - .repo - .iter_payload_digests() - .try_filter_map(|payload| { - if self.attached.contains(&payload) { - return ready(Ok(None)); + while let Some(item) = stream.try_next().await? { + match item { + graph::DatabaseItem::Object(digest, _flat_object) => { + result.removed_objects.insert(digest); } - // TODO: this should be able to get the size of the payload, but - // currently there is no way to do this unless you start with - // the blob - let blob = graph::Blob::new(payload, 0); - ready(Ok(Some(blob))) - }) - .and_then(|blob| { - self.reporter.visit_payload(&blob); - result.visited_payloads += 1; - if self.dry_run { - return ready(Ok(ready(Ok(blob)).boxed())); + graph::DatabaseItem::Payload(digest) => { + result.removed_payloads.insert(digest); } - let future = self - .repo - .remove_payload(*blob.payload()) - .map(|res| { - if let Err(Error::UnknownObject(_)) = res { - return Ok(()); - } - res - }) - .map_ok(|_| blob); - ready(Ok(future.boxed())) - }) - .try_buffer_unordered(self.removal_concurrency) - .boxed(); - while let Some(blob) = stream.try_next().await? { - result.removed_payloads.insert(*blob.payload()); - self.reporter.payload_removed(&blob) + }; } drop(stream); - Ok(result) } @@ -948,10 +907,10 @@ pub trait CleanReporter: Send + Sync { fn object_removed(&self, _object: &graph::Object) {} /// Called when the cleaner visits a payload during scanning - fn visit_payload(&self, _payload: &graph::Blob) {} + fn visit_payload(&self, _payload: &encoding::Digest) {} /// Called when the cleaner removes a payload from the database - fn payload_removed(&self, _payload: &graph::Blob) {} + fn payload_removed(&self, _payload: &encoding::Digest) {} /// Called when the cleaner visits a proxy during scanning fn visit_proxy(&self, _proxy: &encoding::Digest) {} @@ -993,11 +952,11 @@ impl CleanReporter for TracingCleanReporter { tracing::info!(%object, "object removed"); } - fn visit_payload(&self, payload: &graph::Blob) { + fn visit_payload(&self, payload: &encoding::Digest) { tracing::info!(?payload, "visit payload"); } - fn payload_removed(&self, payload: &graph::Blob) { + fn payload_removed(&self, payload: &encoding::Digest) { tracing::info!(?payload, "payload removed"); } @@ -1051,11 +1010,11 @@ impl CleanReporter for ConsoleCleanReporter { self.get_bars().objects.inc_length(1); } - fn visit_payload(&self, _payload: &graph::Blob) { + fn visit_payload(&self, _payload: &encoding::Digest) { self.get_bars().payloads.inc(1); } - fn payload_removed(&self, _payload: &graph::Blob) { + fn payload_removed(&self, _payload: &encoding::Digest) { self.get_bars().payloads.inc_length(1); } diff --git a/crates/spfs/src/commit.rs b/crates/spfs/src/commit.rs index 6cae9b9a7b..a9fdfb9ff5 100644 --- a/crates/spfs/src/commit.rs +++ b/crates/spfs/src/commit.rs @@ -53,7 +53,7 @@ pub struct WriteToRepositoryBlobHasher<'repo> { #[tonic::async_trait] impl BlobHasher for WriteToRepositoryBlobHasher<'_> { async fn hash_blob(&self, reader: Pin>) -> Result { - self.repo.commit_blob(reader).await + self.repo.commit_payload(reader).await } } @@ -297,7 +297,7 @@ where .into_bytes(); let reader = Box::pin(tokio::io::BufReader::new(std::io::Cursor::new(content))); - self.repo.commit_blob(reader).await? + self.repo.commit_payload(reader).await? } else { let file = tokio::fs::File::open(&local_path).await.map_err(|err| { // TODO: add better message for file missing @@ -308,7 +308,7 @@ where ) })?; let reader = Box::pin(tokio::io::BufReader::new(file)); - self.repo.commit_blob(reader).await? + self.repo.commit_payload(reader).await? }; if created != entry.object { return Err(Error::String(format!( diff --git a/crates/spfs/src/graph/database.rs b/crates/spfs/src/graph/database.rs index 3382350b76..d9b2228a6b 100644 --- a/crates/spfs/src/graph/database.rs +++ b/crates/spfs/src/graph/database.rs @@ -7,7 +7,7 @@ use std::pin::Pin; use std::task::Poll; use chrono::{DateTime, Utc}; -use futures::{Future, Stream, StreamExt, TryStreamExt}; +use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use super::{FlatObject, Object, ObjectProto}; use crate::{Error, Result, encoding}; @@ -77,11 +77,8 @@ impl Stream for DatabaseWalker<'_> { #[allow(clippy::type_complexity)] pub struct DatabaseIterator<'db> { db: &'db dyn DatabaseView, - next: Option<( - encoding::Digest, - Pin> + Send + 'db>>, - )>, - inner: Pin> + Send>>, + next: Option> + Send + 'db>>>, + inner: Pin> + Send>>, } impl<'db> DatabaseIterator<'db> { @@ -91,7 +88,7 @@ impl<'db> DatabaseIterator<'db> { /// # Errors /// The same as [`DatabaseView::read_object`] pub fn new(db: &'db dyn DatabaseView) -> Self { - let iter = db.find_digests(crate::graph::DigestSearchCriteria::All); + let iter = db.find_digests(&crate::graph::DigestSearchCriteria::All); DatabaseIterator { db, inner: iter, @@ -100,32 +97,57 @@ impl<'db> DatabaseIterator<'db> { } } +/// An item returned by [`DatabaseIterator`]. +pub enum DatabaseItem { + Object(encoding::Digest, Object), + Payload(encoding::Digest), +} + +impl DatabaseItem { + /// Borrow the inner digest. + #[inline] + pub fn digest(&self) -> &encoding::Digest { + match self { + DatabaseItem::Object(d, _) => d, + DatabaseItem::Payload(d) => d, + } + } +} + impl Stream for DatabaseIterator<'_> { - type Item = Result<(encoding::Digest, Object)>; + type Item = Result; fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let (digest, mut current_future) = match self.next.take() { + let mut current_future = match self.next.take() { Some(f) => f, None => match Pin::new(&mut self.inner).poll_next(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(inner_next) => match inner_next { None => return Poll::Ready(None), Some(Err(err)) => return Poll::Ready(Some(Err(err))), - Some(Ok(digest)) => (digest, self.db.read_object(digest)), + Some(Ok(FoundDigest::Object(digest))) => self + .db + .read_object(digest) + .map_ok(move |x| DatabaseItem::Object(digest, x)) + .map_err(move |err| format!("Error reading object {digest}: {err}").into()) + .boxed(), + Some(Ok(FoundDigest::Payload(digest))) => { + futures::future::ready(Ok(DatabaseItem::Payload(digest))).boxed() + } }, }, }; match Pin::new(&mut current_future).poll(cx) { Poll::Pending => { - self.next = Some((digest, current_future)); + self.next = Some(current_future); Poll::Pending } Poll::Ready(res) => Poll::Ready(match res { - Ok(obj) => Some(Ok((digest, obj))), - Err(err) => Some(Err(format!("Error reading object {digest}: {err}").into())), + Ok(obj) => Some(Ok(obj)), + Err(err) => Some(Err(err)), }), } } @@ -137,6 +159,42 @@ pub enum DigestSearchCriteria { StartsWith(encoding::PartialDigest), } +/// The types of digests that can exist in a database. +#[derive(PartialEq, Eq)] +pub enum FoundDigest { + Object(encoding::Digest), + Payload(encoding::Digest), +} + +impl FoundDigest { + /// Borrow the inner digest. + #[inline] + pub fn digest(&self) -> &encoding::Digest { + match self { + FoundDigest::Object(d) => d, + FoundDigest::Payload(d) => d, + } + } + + /// Return the inner digest. + #[inline] + pub fn into_digest(self) -> encoding::Digest { + match self { + FoundDigest::Object(d) => d, + FoundDigest::Payload(d) => d, + } + } + + /// Return the digest as a byte slice. + #[inline] + pub fn as_bytes(&self) -> &[u8] { + match self { + FoundDigest::Object(d) => d.as_bytes(), + FoundDigest::Payload(d) => d.as_bytes(), + } + } +} + /// A read-only object database. #[async_trait::async_trait] pub trait DatabaseView: Sync + Send { @@ -146,19 +204,23 @@ pub trait DatabaseView: Sync + Send { /// - [`Error::UnknownObject`]: if the object is not in this database async fn read_object(&self, digest: encoding::Digest) -> Result; - /// Find the object digests in this database matching a search criteria. - fn find_digests( + /// Find the digests in this database matching a search criteria. + /// + /// This can include both object digests and payload digests. + fn find_digests<'a>( &self, - search_criteria: DigestSearchCriteria, - ) -> Pin> + Send>>; + search_criteria: &'a DigestSearchCriteria, + ) -> Pin> + Send + 'a>>; - /// Return true if this database contains the identified object + /// Return true if this database contains the identified object. + /// + /// This does not check for payloads. async fn has_object(&self, digest: encoding::Digest) -> bool; - /// Iterate all the object in this database. + /// Iterate all the objects and payloads in this database. fn iter_objects(&self) -> DatabaseIterator<'_>; - /// Walk all objects connected to the given root object. + /// Walk all objects and payloads connected to the given root object. fn walk_objects<'db>(&'db self, root: &encoding::Digest) -> DatabaseWalker<'db>; /// Return the shortened version of the given digest. @@ -169,9 +231,9 @@ pub trait DatabaseView: Sync + Send { const SIZE_STEP: usize = 5; // creates 8 char string at base 32 let mut shortest_size: usize = SIZE_STEP; let mut shortest = &digest.as_bytes()[..shortest_size]; - let mut digests = self.find_digests(DigestSearchCriteria::StartsWith( - encoding::PartialDigest::from(shortest), - )); + let search_criteria = + DigestSearchCriteria::StartsWith(encoding::PartialDigest::from(shortest)); + let mut digests = self.find_digests(&search_criteria); while let Some(other) = digests.next().await { match other { Err(_) => continue, @@ -179,7 +241,7 @@ pub trait DatabaseView: Sync + Send { if &other.as_bytes()[0..shortest_size] != shortest { continue; } - if other == digest { + if *other.digest() == digest { continue; } while &other.as_bytes()[..shortest_size] == shortest { @@ -192,23 +254,17 @@ pub trait DatabaseView: Sync + Send { data_encoding::BASE32.encode(shortest) } - /// Resolve the complete object digest from a shortened one. + /// Resolve the complete item digest from a shortened one. /// - /// By default this is an O(n) operation defined by the number of objects. + /// By default this is an O(n) operation defined by the number of items. /// Other implementations may provide better results. /// /// # Errors /// - UnknownReferenceError: if the digest cannot be resolved - /// - AmbiguousReferenceError: if the digest could point to multiple objects - async fn resolve_full_digest( - &self, - partial: &encoding::PartialDigest, - ) -> Result { - if let Some(digest) = partial.to_digest() { - return Ok(digest); - } + /// - AmbiguousReferenceError: if the digest could point to multiple items + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { let options: Vec<_> = self - .find_digests(crate::graph::DigestSearchCriteria::StartsWith( + .find_digests(&crate::graph::DigestSearchCriteria::StartsWith( partial.clone(), )) .try_collect() @@ -216,7 +272,7 @@ pub trait DatabaseView: Sync + Send { match options.len() { 0 => Err(Error::UnknownReference(partial.to_string())), - 1 => Ok(options.first().unwrap().to_owned()), + 1 => Ok(options.into_iter().next().unwrap()), _ => Err(Error::AmbiguousReference(partial.to_string())), } } @@ -232,10 +288,10 @@ impl DatabaseView for &T { DatabaseView::read_object(&**self, digest).await } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { DatabaseView::find_digests(&**self, search_criteria) } diff --git a/crates/spfs/src/graph/mod.rs b/crates/spfs/src/graph/mod.rs index ee64fb823b..b5637594b0 100644 --- a/crates/spfs/src/graph/mod.rs +++ b/crates/spfs/src/graph/mod.rs @@ -28,10 +28,12 @@ pub use blob::Blob; pub use database::{ Database, DatabaseExt, + DatabaseItem, DatabaseIterator, DatabaseView, DatabaseWalker, DigestSearchCriteria, + FoundDigest, }; pub use entry::Entry; pub use kind::{HasKind, Kind, ObjectKind}; diff --git a/crates/spfs/src/lib.rs b/crates/spfs/src/lib.rs index a8d04c7f07..3a0ae7d136 100644 --- a/crates/spfs/src/lib.rs +++ b/crates/spfs/src/lib.rs @@ -6,7 +6,7 @@ pub const VERSION: &str = env!("CARGO_PKG_VERSION"); -#[cfg(test)] +#[cfg(any(test, feature = "test-fixtures"))] pub mod fixtures; pub mod bootstrap; diff --git a/crates/spfs/src/proto/conversions.rs b/crates/spfs/src/proto/conversions.rs index ec4b115d92..44da100093 100644 --- a/crates/spfs/src/proto/conversions.rs +++ b/crates/spfs/src/proto/conversions.rs @@ -49,6 +49,37 @@ impl<'a> TryFrom<&'a super::Digest> for &'a encoding::Digest { } } +impl From for super::FoundDigest { + fn from(source: graph::FoundDigest) -> Self { + use super::found_digest::Kind; + match source { + graph::FoundDigest::Object(digest) => Self { + kind: Some(Kind::Object(digest.into())), + }, + graph::FoundDigest::Payload(digest) => Self { + kind: Some(Kind::Payload(digest.into())), + }, + } + } +} + +impl TryFrom for graph::FoundDigest { + type Error = Error; + fn try_from(source: super::FoundDigest) -> Result { + match source.kind { + Some(kind) => match kind { + super::found_digest::Kind::Object(digest) => { + Ok(graph::FoundDigest::Object(digest.try_into()?)) + } + super::found_digest::Kind::Payload(digest) => { + Ok(graph::FoundDigest::Payload(digest.try_into()?)) + } + }, + None => Err("Unknown found digest kind".into()), + } + } +} + impl From for super::Digest { fn from(source: encoding::Digest) -> Self { Self { diff --git a/crates/spfs/src/proto/defs/database.proto b/crates/spfs/src/proto/defs/database.proto index 437ee3e263..902062f7c2 100644 --- a/crates/spfs/src/proto/defs/database.proto +++ b/crates/spfs/src/proto/defs/database.proto @@ -37,9 +37,10 @@ message DigestSearchCriteria { message FindDigestsRequest { DigestSearchCriteria search_criteria = 1; } message FindDigestsResponse { + reserved 2; oneof result { Error error = 1; - Digest ok = 2; + FoundDigest ok = 3; } } diff --git a/crates/spfs/src/proto/defs/payload.proto b/crates/spfs/src/proto/defs/payload.proto index 117fb6420b..b7b2f0e6b4 100644 --- a/crates/spfs/src/proto/defs/payload.proto +++ b/crates/spfs/src/proto/defs/payload.proto @@ -23,6 +23,16 @@ message HasPayloadResponse{ bool exists =3; } +message PayloadSizeRequest{ + Digest digest = 1; +} +message PayloadSizeResponse{ + oneof result { + Error error = 1; + uint64 ok = 2; + } +} + message WritePayloadRequest{} message WritePayloadResponse{ message UploadOption { @@ -69,10 +79,23 @@ message RemovePayloadResponse{ } } +message RemovePayloadIfOlderThanRequest{ + DateTime older_than = 1; + Digest digest = 2; +} +message RemovePayloadIfOlderThanResponse{ + oneof result { + Error error = 1; + bool ok = 2; + } +} + service PayloadService { rpc IterDigests(IterDigestsRequest) returns (stream IterDigestsResponse); rpc HasPayload(HasPayloadRequest) returns (HasPayloadResponse); + rpc PayloadSize(PayloadSizeRequest) returns (PayloadSizeResponse); rpc WritePayload(WritePayloadRequest) returns (WritePayloadResponse); rpc OpenPayload(OpenPayloadRequest) returns (OpenPayloadResponse); rpc RemovePayload(RemovePayloadRequest) returns (RemovePayloadResponse); + rpc RemovePayloadIfOlderThan(RemovePayloadIfOlderThanRequest) returns (RemovePayloadIfOlderThanResponse); } diff --git a/crates/spfs/src/proto/defs/types.proto b/crates/spfs/src/proto/defs/types.proto index 063715c60b..ee881af5be 100644 --- a/crates/spfs/src/proto/defs/types.proto +++ b/crates/spfs/src/proto/defs/types.proto @@ -9,6 +9,13 @@ message Digest { bytes bytes = 1; } +message FoundDigest { + oneof kind { + Digest object = 1; + Digest payload = 2; + } +} + message DateTime { string iso_timestamp = 1; } diff --git a/crates/spfs/src/proto/result.rs b/crates/spfs/src/proto/result.rs index cc32cf4eff..c299e7ce00 100644 --- a/crates/spfs/src/proto/result.rs +++ b/crates/spfs/src/proto/result.rs @@ -102,7 +102,7 @@ rpc_result!( rpc_result!( g::FindDigestsResponse, g::find_digests_response::Result, - g::Digest + g::FoundDigest ); rpc_result!( g::IterDigestsResponse, @@ -127,6 +127,11 @@ rpc_result!( bool ); +rpc_result!( + g::PayloadSizeResponse, + g::payload_size_response::Result, + u64 +); rpc_result!( g::WritePayloadResponse, g::write_payload_response::Result, @@ -143,3 +148,8 @@ rpc_result!( g::write_payload_response::upload_response::Result, g::write_payload_response::upload_response::UploadResult ); +rpc_result!( + g::RemovePayloadIfOlderThanResponse, + g::remove_payload_if_older_than_response::Result, + bool +); diff --git a/crates/spfs/src/resolve.rs b/crates/spfs/src/resolve.rs index 429d4af78a..f79b0af0ca 100644 --- a/crates/spfs/src/resolve.rs +++ b/crates/spfs/src/resolve.rs @@ -148,7 +148,18 @@ pub async fn compute_environment_manifest( .iter() .filter_map(|i| match i { tracking::EnvSpecItem::Digest(d) => Some(std::future::ready(Ok(*d)).boxed()), - tracking::EnvSpecItem::PartialDigest(p) => Some(repo.resolve_full_digest(p).boxed()), + tracking::EnvSpecItem::PartialDigest(p) => Some( + repo.resolve_full_digest(p) + .and_then(|found_digest| async move { + match found_digest { + graph::FoundDigest::Object(digest) => Ok(digest), + graph::FoundDigest::Payload(_digest) => { + Err("unexpected payload digest in environment spec".into()) + } + } + }) + .boxed(), + ), tracking::EnvSpecItem::TagSpec(t) => { Some(repo.resolve_tag(t).map_ok(|t| t.target).boxed()) } diff --git a/crates/spfs/src/resolve_test.rs b/crates/spfs/src/resolve_test.rs index e3fa4088a9..5297f0d0f6 100644 --- a/crates/spfs/src/resolve_test.rs +++ b/crates/spfs/src/resolve_test.rs @@ -163,7 +163,6 @@ async fn test_auto_merge_layers_with_edit(tmpdir: tempfile::TempDir) { let d = dirs.last().unwrap(); for node in d.to_tracking_manifest().walk_abs("/spfs") { // There should only be one node/entry, the file. - let blob = repo.read_blob(node.entry.object).await.unwrap(); println!( " {} {}", node.path, @@ -172,7 +171,7 @@ async fn test_auto_merge_layers_with_edit(tmpdir: tempfile::TempDir) { .unwrap(), ); - let (mut payload, _filename) = repo.open_payload(*blob.digest()).await.unwrap(); + let (mut payload, _filename) = repo.open_payload(node.entry.object).await.unwrap(); let mut writer: Vec = vec![]; tokio::io::copy(&mut payload, &mut writer).await.unwrap(); let contents = String::from_utf8(writer).unwrap(); diff --git a/crates/spfs/src/runtime/storage.rs b/crates/spfs/src/runtime/storage.rs index d036ccee8a..682fbdf361 100644 --- a/crates/spfs/src/runtime/storage.rs +++ b/crates/spfs/src/runtime/storage.rs @@ -1063,7 +1063,7 @@ impl Storage { /// Create a new blob payload to hold the given string value pub(crate) async fn create_blob_for_string(&self, payload: String) -> Result { self.inner - .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .commit_payload(Box::pin(std::io::Cursor::new(payload.into_bytes()))) .await } @@ -1168,8 +1168,7 @@ impl Storage { let data = match annotation.value() { AnnotationValue::String(s) => s, AnnotationValue::Blob(digest) => { - let blob = self.inner.read_blob(*digest).await?; - let (mut payload, filename) = self.inner.open_payload(*blob.digest()).await?; + let (mut payload, filename) = self.inner.open_payload(*digest).await?; let mut writer: Vec = vec![]; tokio::io::copy(&mut payload, &mut writer) .await @@ -1262,7 +1261,7 @@ impl Storage { let (_, config_digest) = tokio::try_join!( self.inner.write_object(&platform), self.inner - .commit_blob(Box::pin(std::io::Cursor::new(config_data.into_bytes())),) + .commit_payload(Box::pin(std::io::Cursor::new(config_data.into_bytes())),) )?; tokio::try_join!( diff --git a/crates/spfs/src/server/database.rs b/crates/spfs/src/server/database.rs index 19c41b64a5..72ac409fd9 100644 --- a/crates/spfs/src/server/database.rs +++ b/crates/spfs/src/server/database.rs @@ -6,8 +6,9 @@ use std::convert::TryInto; use std::pin::Pin; use std::sync::Arc; +use async_stream::try_stream; use chrono::{DateTime, Utc}; -use futures::{Stream, StreamExt}; +use futures::Stream; use tonic::{Request, Response, Status}; use crate::prelude::*; @@ -61,11 +62,12 @@ impl proto::database_service_server::DatabaseService for DatabaseService { .search_criteria .try_into() .map_err(|err: crate::Error| Status::invalid_argument(err.to_string()))?; - let stream = self - .repo - .find_digests(search_criteria) - .map(proto::FindDigestsResponse::from_result) - .map(Ok); + let repo = Arc::clone(&self.repo); + let stream = try_stream! { + for await result in repo.find_digests(&search_criteria) { + yield proto::FindDigestsResponse::from_result(result); + } + }; let stream: Self::FindDigestsStream = Box::pin(stream); let response = Response::new(stream); Ok(response) diff --git a/crates/spfs/src/server/payload.rs b/crates/spfs/src/server/payload.rs index e44bcdd6cf..7772188696 100644 --- a/crates/spfs/src/server/payload.rs +++ b/crates/spfs/src/server/payload.rs @@ -5,13 +5,14 @@ use std::pin::Pin; use std::sync::Arc; +use chrono::{DateTime, Utc}; use futures::{Stream, StreamExt, TryStreamExt}; use prost::Message; use tonic::{Request, Response, Status}; use crate::prelude::*; use crate::proto::payload_service_server::PayloadServiceServer; -use crate::proto::{self, RpcResult, convert_digest}; +use crate::proto::{self, RpcResult, convert_digest, convert_to_datetime}; use crate::storage; /// The payload service is both a gRPC service AND an http server @@ -70,6 +71,18 @@ impl proto::payload_service_server::PayloadService for PayloadService { Ok(Response::new(result)) } + async fn payload_size( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let digest = convert_digest(request.digest) + .map_err(|err| Status::invalid_argument(err.to_string()))?; + let size = proto::handle_error!(self.repo.payload_size(digest).await); + let result = proto::PayloadSizeResponse::ok(size); + Ok(Response::new(result)) + } + async fn open_payload( &self, request: Request, @@ -99,6 +112,23 @@ impl proto::payload_service_server::PayloadService for PayloadService { let result = proto::RemovePayloadResponse::ok(proto::Ok {}); Ok(Response::new(result)) } + + async fn remove_payload_if_older_than( + &self, + request: Request, + ) -> Result, Status> { + let request = request.into_inner(); + let older_than: DateTime = + proto::handle_error!(convert_to_datetime(request.older_than)); + let digest: crate::encoding::Digest = proto::handle_error!(convert_digest(request.digest)); + let deleted = proto::handle_error!( + self.repo + .remove_payload_if_older_than(older_than, digest) + .await + ); + let result = proto::RemovePayloadIfOlderThanResponse::ok(deleted); + Ok(Response::new(result)) + } } impl hyper::service::Service> for PayloadService @@ -180,11 +210,7 @@ async fn handle_uncompressed_upload( repo: Arc, reader: Pin>, ) -> crate::Result> { - // Safety: it is unsafe to create a payload without its corresponding - // blob, but this payload http server is part of a larger repository - // and does not intend to be responsible for ensuring the integrity - // of the object graph - only the up/down of payload data - let result = unsafe { repo.write_data(reader).await }; + let result = repo.write_data(reader).await; let (digest, size) = result.map_err(|err| { crate::Error::String(format!( "An error occurred while spawning a thread for this operation: {err:?}" diff --git a/crates/spfs/src/storage/blob.rs b/crates/spfs/src/storage/blob.rs deleted file mode 100644 index 71d005bbea..0000000000 --- a/crates/spfs/src/storage/blob.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright (c) Contributors to the SPK project. -// SPDX-License-Identifier: Apache-2.0 -// https://github.com/spkenv/spk - -use std::pin::Pin; - -use futures::Stream; -use tokio_stream::StreamExt; - -use crate::{Error, Result, encoding, graph}; - -pub type BlobStreamItem = Result<(encoding::Digest, graph::Blob)>; - -#[async_trait::async_trait] -pub trait BlobStorage: graph::Database + Sync + Send { - /// Iterate the objects in this storage which are blobs. - fn iter_blobs<'db>(&'db self) -> Pin + 'db>> { - let stream = self.iter_objects().filter_map(|res| match res { - Ok((digest, obj)) => obj.into_blob().map(|b| Ok((digest, b))), - Err(err) => Some(Err(err)), - }); - Box::pin(stream) - } - - /// Return the blob identified by the given digest. - async fn read_blob(&self, digest: encoding::Digest) -> Result { - match self.read_object(digest).await.map(graph::Object::into_blob) { - Err(err) => Err(err), - Ok(Some(blob)) => Ok(blob), - Ok(None) => Err(Error::NotCorrectKind { - desired: graph::ObjectKind::Blob, - digest, - }), - } - } -} - -/// Blanket implementation. -impl BlobStorage for T where T: graph::Database + Sync + Send {} - -#[async_trait::async_trait] -pub trait BlobStorageExt: graph::DatabaseExt { - /// Store the given blob - async fn write_blob(&self, blob: graph::Blob) -> Result<()> { - self.write_object(&blob).await - } -} - -/// Blanket implementation. -impl BlobStorageExt for T where T: graph::DatabaseExt + Sync + Send {} diff --git a/crates/spfs/src/storage/database_test.rs b/crates/spfs/src/storage/database_test.rs index b84ca9a8de..6af386fc9d 100644 --- a/crates/spfs/src/storage/database_test.rs +++ b/crates/spfs/src/storage/database_test.rs @@ -2,25 +2,31 @@ // SPDX-License-Identifier: Apache-2.0 // https://github.com/spkenv/spk +use std::borrow::Cow; + use rstest::rstest; use crate::fixtures::*; +use crate::graph; use crate::prelude::*; -use crate::{encoding, graph}; #[rstest] #[case::fs(tmprepo("fs"))] #[case::tar(tmprepo("tar"))] #[cfg_attr(feature = "server", case::rpc(tmprepo("rpc")))] #[tokio::test] +#[serial_test::serial(config)] async fn test_object_existence( #[case] #[future] tmprepo: TempRepo, ) { let tmprepo = tmprepo.await; - let digest = encoding::EMPTY_DIGEST.into(); - let obj = graph::Blob::new(digest, 0); + let obj = graph::Layer::new_with_annotation( + "test", + graph::AnnotationValue::String(Cow::Owned("data".to_owned())), + ); + let digest = obj.digest().unwrap(); tmprepo .write_object(&obj) .await diff --git a/crates/spfs/src/storage/fallback/repository.rs b/crates/spfs/src/storage/fallback/repository.rs index b18564ce46..c1bd8a2177 100644 --- a/crates/spfs/src/storage/fallback/repository.rs +++ b/crates/spfs/src/storage/fallback/repository.rs @@ -11,7 +11,7 @@ use futures::Stream; use relative_path::RelativePath; use crate::config::{ToAddress, default_fallback_repo_include_secondary_tags}; -use crate::graph::ObjectProto; +use crate::graph::{FoundDigest, ObjectProto}; use crate::prelude::*; use crate::storage::fs::{FsHashStore, ManifestRenderPath, OpenFsRepository, RenderStore}; use crate::storage::proxy::ProxyRepositoryExt; @@ -223,10 +223,10 @@ impl graph::DatabaseView for FallbackProxy { res } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { self.primary.find_digests(search_criteria) } @@ -280,17 +280,16 @@ impl PayloadStorage for FallbackProxy { false } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + crate::storage::proxy::payload_size(self, digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { self.primary.iter_payload_digests() } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - let res = unsafe { self.primary.write_data(reader).await? }; + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + let res = self.primary.write_data(reader).await?; Ok(res) } @@ -304,16 +303,7 @@ impl PayloadStorage for FallbackProxy { let missing_payload_error = match self.primary.open_payload(digest).await { Ok(r) => return Ok(r), Err(err @ Error::ObjectMissingPayload(_, _)) => err, - Err(err @ Error::UnknownObject(_)) => { - // Try to repair the missing blob. There can be hash - // collisions so use `read_blob` specifically in case - // there is an object of some other type with the same - // digest. - if self.read_blob(digest).await.is_ok() { - continue; - } - return Err(err); - } + Err(err @ Error::UnknownObject(_)) => err, Err(err) => return Err(err), }; @@ -328,7 +318,7 @@ impl PayloadStorage for FallbackProxy { // context, so don't make another one here. SyncReporters::silent(), ); - match syncer.sync_digest(digest).await { + match syncer.sync_payload(digest).await { Ok(_) => { // Warn for non-sentry users; info for sentry users. #[cfg(not(feature = "sentry"))] @@ -370,6 +360,17 @@ impl PayloadStorage for FallbackProxy { self.primary.remove_payload(digest).await?; Ok(()) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + Ok(self + .primary + .remove_payload_if_older_than(older_than, digest) + .await?) + } } impl ProxyRepositoryExt for FallbackProxy { diff --git a/crates/spfs/src/storage/fallback/repository_test.rs b/crates/spfs/src/storage/fallback/repository_test.rs index cd21305809..dec534ec0b 100644 --- a/crates/spfs/src/storage/fallback/repository_test.rs +++ b/crates/spfs/src/storage/fallback/repository_test.rs @@ -26,12 +26,12 @@ async fn test_proxy_payload_repair(tmpdir: tempfile::TempDir) { ); let digest = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); diff --git a/crates/spfs/src/storage/fs/database.rs b/crates/spfs/src/storage/fs/database.rs index 74bb83e3d7..8f1e7d3765 100644 --- a/crates/spfs/src/storage/fs/database.rs +++ b/crates/spfs/src/storage/fs/database.rs @@ -4,6 +4,7 @@ #[cfg(unix)] use std::os::unix::fs::PermissionsExt; +use std::path::Path; use std::pin::Pin; use chrono::{DateTime, Utc}; @@ -12,7 +13,7 @@ use encoding::prelude::*; use futures::{Stream, StreamExt, TryFutureExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::graph::{DatabaseView, Object, ObjectProto}; +use crate::graph::{DatabaseView, FoundDigest, Object, ObjectProto}; use crate::{Error, Result, encoding, graph}; #[async_trait::async_trait] @@ -28,10 +29,10 @@ impl DatabaseView for super::MaybeOpenFsRepository { self.opened().await?.read_object(digest).await } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { self.opened() .map_ok(|opened| opened.find_digests(search_criteria)) .try_flatten_stream() @@ -46,10 +47,7 @@ impl DatabaseView for super::MaybeOpenFsRepository { graph::DatabaseWalker::new(self, *root) } - async fn resolve_full_digest( - &self, - partial: &encoding::PartialDigest, - ) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.opened().await?.resolve_full_digest(partial).await } } @@ -105,11 +103,20 @@ impl DatabaseView for super::OpenFsRepository { Object::new(buf) } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { - Box::pin(self.objects.find(search_criteria)) + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { + Box::pin( + self.objects + .find(search_criteria) + .map(|d| d.map(FoundDigest::Object)) + .chain( + self.payloads + .find(search_criteria) + .map(|d| d.map(FoundDigest::Payload)), + ), + ) } fn iter_objects(&self) -> graph::DatabaseIterator<'_> { @@ -120,12 +127,57 @@ impl DatabaseView for super::OpenFsRepository { graph::DatabaseWalker::new(self, *root) } - async fn resolve_full_digest( - &self, - partial: &encoding::PartialDigest, - ) -> Result { - self.objects.resolve_full_digest(partial).await + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { + match self.objects.resolve_full_digest(partial).await { + Ok(digest) => Ok(FoundDigest::Object(digest)), + Err(_) => { + let digest = self.payloads.resolve_full_digest(partial).await?; + Ok(FoundDigest::Payload(digest)) + } + } + } +} + +/// Remove the file at `filepath` if it is older than `older_than`. +/// +/// Returns true if the file was removed, false if it was not. +pub(crate) async fn remove_file_if_older_than( + older_than: DateTime, + filepath: &Path, + digest: encoding::Digest, +) -> crate::Result { + let metadata = tokio::fs::symlink_metadata(&filepath) + .await + .map_err(|err| match err.kind() { + std::io::ErrorKind::NotFound => Error::UnknownObject(digest), + _ => { + Error::StorageReadError("symlink_metadata on digest path", filepath.to_owned(), err) + } + })?; + + let mtime = metadata.modified().map_err(|err| { + Error::StorageReadError( + "modified on symlink metadata of digest path", + filepath.to_owned(), + err, + ) + })?; + + if DateTime::::from(mtime) >= older_than { + return Ok(false); + } + + if let Err(err) = tokio::fs::remove_file(&filepath).await { + return match err.kind() { + std::io::ErrorKind::NotFound => Ok(true), + _ => Err(Error::StorageWriteError( + "remove_file in remove_file_if_older_than", + filepath.to_owned(), + err, + )), + }; } + Ok(true) } #[async_trait::async_trait] @@ -157,51 +209,17 @@ impl graph::Database for super::OpenFsRepository { digest: encoding::Digest, ) -> crate::Result { let filepath = self.objects.build_digest_path(&digest); - - // this might fail but we don't consider that fatal just yet - #[cfg(unix)] - let _ = tokio::fs::set_permissions(&filepath, std::fs::Permissions::from_mode(0o777)).await; - - let metadata = tokio::fs::symlink_metadata(&filepath) - .await - .map_err(|err| match err.kind() { - std::io::ErrorKind::NotFound => Error::UnknownObject(digest), - _ => Error::StorageReadError( - "symlink_metadata on digest path", - filepath.clone(), - err, - ), - })?; - - let mtime = metadata.modified().map_err(|err| { - Error::StorageReadError( - "modified on symlink metadata of digest path", - filepath.clone(), - err, - ) - })?; - - if DateTime::::from(mtime) >= older_than { - return Ok(false); - } - - if let Err(err) = tokio::fs::remove_file(&filepath).await { - return match err.kind() { - std::io::ErrorKind::NotFound => Ok(true), - _ => Err(Error::StorageWriteError( - "remove_file on object file in remove_object_if_older_than", - filepath, - err, - )), - }; - } - Ok(true) + remove_file_if_older_than(older_than, &filepath, digest).await } } #[async_trait::async_trait] impl graph::DatabaseExt for super::OpenFsRepository { async fn write_object(&self, obj: &graph::FlatObject) -> Result<()> { + if matches!(obj.to_enum(), graph::object::Enum::Blob(_)) { + return Err("writing blob objects is not permitted".into()); + }; + let digest = obj.digest()?; let filepath = self.objects.build_digest_path(&digest); if filepath.exists() { diff --git a/crates/spfs/src/storage/fs/hash_store.rs b/crates/spfs/src/storage/fs/hash_store.rs index bb68b5ed90..80f97df2c1 100644 --- a/crates/spfs/src/storage/fs/hash_store.rs +++ b/crates/spfs/src/storage/fs/hash_store.rs @@ -79,10 +79,10 @@ impl FsHashStore { self.root.join(WORK_DIRNAME) } - async fn find_in_entry( - search_criteria: crate::graph::DigestSearchCriteria, + async fn find_in_entry<'a>( + search_criteria: &'a crate::graph::DigestSearchCriteria, entry: DirEntry, - ) -> Pin> + Send + Sync + 'static>> { + ) -> Pin> + Send + Sync + 'a>> { let entry_filename = entry.file_name(); let entry_filename = entry_filename.to_string_lossy().into_owned(); if entry_filename == WORK_DIRNAME || entry_filename == PROXY_DIRNAME { @@ -163,10 +163,10 @@ impl FsHashStore { }) } - pub fn find( + pub fn find<'a>( &self, - search_criteria: crate::graph::DigestSearchCriteria, - ) -> impl Stream> + use<> { + search_criteria: &'a crate::graph::DigestSearchCriteria, + ) -> impl Stream> + use<'a> { // Don't capture self inside try_stream. let root = self.root.clone(); @@ -176,7 +176,7 @@ impl FsHashStore { let entry_filename = entry.file_name(); let entry_filename = entry_filename.to_string_lossy(); - let mut entry_stream = Self::find_in_entry(search_criteria.clone(), entry).await; + let mut entry_stream = Self::find_in_entry(search_criteria, entry).await; while let Some(digest) = entry_stream.try_next().await? { yield digest } @@ -198,7 +198,7 @@ impl FsHashStore { } pub fn iter(&self) -> impl Stream> + use<> { - self.find(crate::graph::DigestSearchCriteria::All) + self.find(&crate::graph::DigestSearchCriteria::All) } /// Return true if the given digest is stored in this storage diff --git a/crates/spfs/src/storage/fs/hash_store_test.rs b/crates/spfs/src/storage/fs/hash_store_test.rs index 91231131fd..01e2d35584 100644 --- a/crates/spfs/src/storage/fs/hash_store_test.rs +++ b/crates/spfs/src/storage/fs/hash_store_test.rs @@ -53,7 +53,7 @@ async fn test_hash_store_find_digest(tmpdir: tempfile::TempDir) { let partial = crate::encoding::PartialDigest::parse(starts_with).expect("valid partial digest"); let mut matches: Vec<_> = store - .find(DigestSearchCriteria::StartsWith(partial)) + .find(&DigestSearchCriteria::StartsWith(partial)) .try_collect() .await .expect("should not fail to search"); diff --git a/crates/spfs/src/storage/fs/mod.rs b/crates/spfs/src/storage/fs/mod.rs index 8712436a44..b2c61ab6b4 100644 --- a/crates/spfs/src/storage/fs/mod.rs +++ b/crates/spfs/src/storage/fs/mod.rs @@ -33,7 +33,7 @@ pub use renderer::{ RenderType, Renderer, }; -#[cfg(test)] +#[cfg(any(test, feature = "test-fixtures"))] pub use repository::MaybeOpenFsRepositoryImpl; pub use repository::{ Config, diff --git a/crates/spfs/src/storage/fs/payloads.rs b/crates/spfs/src/storage/fs/payloads.rs index d1205ceabe..444a02138e 100644 --- a/crates/spfs/src/storage/fs/payloads.rs +++ b/crates/spfs/src/storage/fs/payloads.rs @@ -5,13 +5,14 @@ use std::io::ErrorKind; use std::pin::Pin; +use chrono::{DateTime, Utc}; use futures::future::ready; use futures::{Stream, StreamExt, TryFutureExt}; use super::{MaybeOpenFsRepository, OpenFsRepository}; -use crate::storage::prelude::*; +use crate::storage::fs::database::remove_file_if_older_than; use crate::tracking::BlobRead; -use crate::{Error, Result, encoding, graph}; +use crate::{Error, Result, encoding}; #[async_trait::async_trait] impl crate::storage::PayloadStorage for MaybeOpenFsRepository { @@ -22,6 +23,11 @@ impl crate::storage::PayloadStorage for MaybeOpenFsRepository { opened.has_payload(digest).await } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + let opened = self.opened().await?; + opened.payload_size(digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { self.opened() .and_then(|opened| ready(Ok(opened.iter_payload_digests()))) @@ -29,14 +35,9 @@ impl crate::storage::PayloadStorage for MaybeOpenFsRepository { .boxed() } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { let opened = self.opened().await?; - // Safety: we are simply deferring this function to the inner - // one and so the same safety rules apply to our caller - unsafe { opened.write_data(reader).await } + opened.write_data(reader).await } async fn open_payload( @@ -49,6 +50,17 @@ impl crate::storage::PayloadStorage for MaybeOpenFsRepository { async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { self.opened().await?.remove_payload(digest).await } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + self.opened() + .await? + .remove_payload_if_older_than(older_than, digest) + .await + } } #[async_trait::async_trait] @@ -58,14 +70,22 @@ impl crate::storage::PayloadStorage for OpenFsRepository { tokio::fs::symlink_metadata(path).await.is_ok() } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + let path = self.payloads.build_digest_path(&digest); + tokio::fs::symlink_metadata(&path) + .await + .map(|meta| meta.len()) + .map_err(|err| match err.kind() { + ErrorKind::NotFound => Error::UnknownObject(digest), + _ => Error::StorageReadError("symlink_metadata on payload", path, err), + }) + } + fn iter_payload_digests(&self) -> Pin> + Send>> { Box::pin(self.payloads.iter()) } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { self.payloads.write_data(reader).await } @@ -77,25 +97,15 @@ impl crate::storage::PayloadStorage for OpenFsRepository { match tokio::fs::File::open(&path).await { Ok(file) => Ok((Box::pin(tokio::io::BufReader::new(file)), path)), Err(err) => match err.kind() { - ErrorKind::NotFound => { - // Return an error specific to this situation, whether the - // blob is really unknown or just the payload is missing. - match self.read_blob(digest).await { - Ok(blob) => Err(Error::ObjectMissingPayload(blob.into(), digest)), - Err( - err @ Error::NotCorrectKind { - desired: graph::ObjectKind::Blob, - .. - }, - ) => Err(err), - Err(_) => Err(Error::UnknownObject(digest)), - } - } + ErrorKind::NotFound => Err(Error::UnknownObject(digest)), _ => Err(Error::StorageReadError("open on payload", path, err)), }, } } + // TODO: This operation is unsafe, a payload must not be removed if _any_ + // reference to it still exists, which could be in Manifests, Layer + // annotations, tags (can you tag a blob directly?), etc. async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { let path = self.payloads.build_digest_path(&digest); match tokio::fs::remove_file(&path).await { @@ -110,4 +120,13 @@ impl crate::storage::PayloadStorage for OpenFsRepository { }, } } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + let filepath = self.payloads.build_digest_path(&digest); + remove_file_if_older_than(older_than, &filepath, digest).await + } } diff --git a/crates/spfs/src/storage/fs/renderer_test.rs b/crates/spfs/src/storage/fs/renderer_test.rs index 30e967b449..9c92512977 100644 --- a/crates/spfs/src/storage/fs/renderer_test.rs +++ b/crates/spfs/src/storage/fs/renderer_test.rs @@ -49,7 +49,7 @@ async fn test_render_manifest( .await .unwrap(); storage - .commit_blob(Box::pin(tokio::io::BufReader::new(data))) + .commit_payload(Box::pin(tokio::io::BufReader::new(data))) .await .unwrap(); } diff --git a/crates/spfs/src/storage/handle.rs b/crates/spfs/src/storage/handle.rs index 846b763078..c0b3278e11 100644 --- a/crates/spfs/src/storage/handle.rs +++ b/crates/spfs/src/storage/handle.rs @@ -14,7 +14,7 @@ use spfs_encoding as encoding; use super::prelude::*; use super::tag::TagSpecAndTagStream; use super::{TagNamespace, TagNamespaceBuf, TagStorageMut}; -use crate::graph::ObjectProto; +use crate::graph::{FoundDigest, ObjectProto}; use crate::tracking::{self, BlobRead}; use crate::{Error, Result, graph}; @@ -257,17 +257,16 @@ impl PayloadStorage for RepositoryHandle { each_variant!(self, repo, { repo.has_payload(digest).await }) } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + each_variant!(self, repo, { repo.payload_size(digest).await }) + } + fn iter_payload_digests(&self) -> Pin> + Send>> { each_variant!(self, repo, { repo.iter_payload_digests() }) } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - unsafe { each_variant!(self, repo, { repo.write_data(reader).await }) } + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + each_variant!(self, repo, { repo.write_data(reader).await }) } async fn open_payload( @@ -280,6 +279,16 @@ impl PayloadStorage for RepositoryHandle { async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { each_variant!(self, repo, { repo.remove_payload(digest).await }) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + each_variant!(self, repo, { + repo.remove_payload_if_older_than(older_than, digest).await + }) + } } #[async_trait::async_trait] @@ -292,10 +301,10 @@ impl DatabaseView for RepositoryHandle { each_variant!(self, repo, { repo.read_object(digest).await }) } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { each_variant!(self, repo, { repo.find_digests(search_criteria) }) } @@ -432,17 +441,16 @@ impl PayloadStorage for Arc { each_variant!(&**self, repo, { repo.has_payload(digest).await }) } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + each_variant!(&**self, repo, { repo.payload_size(digest).await }) + } + fn iter_payload_digests(&self) -> Pin> + Send>> { each_variant!(&**self, repo, { repo.iter_payload_digests() }) } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - unsafe { each_variant!(&**self, repo, { repo.write_data(reader).await }) } + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + each_variant!(&**self, repo, { repo.write_data(reader).await }) } async fn open_payload( @@ -455,6 +463,16 @@ impl PayloadStorage for Arc { async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { each_variant!(&**self, repo, { repo.remove_payload(digest).await }) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + each_variant!(&**self, repo, { + repo.remove_payload_if_older_than(older_than, digest).await + }) + } } #[async_trait::async_trait] @@ -467,10 +485,10 @@ impl DatabaseView for Arc { each_variant!(&**self, repo, { repo.read_object(digest).await }) } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { each_variant!(&**self, repo, { repo.find_digests(search_criteria) }) } diff --git a/crates/spfs/src/storage/layer.rs b/crates/spfs/src/storage/layer.rs index 28d01f22c5..bddff3a43d 100644 --- a/crates/spfs/src/storage/layer.rs +++ b/crates/spfs/src/storage/layer.rs @@ -17,7 +17,10 @@ pub trait LayerStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are layers. fn iter_layers<'db>(&'db self) -> Pin + 'db>> { let stream = self.iter_objects().filter_map(|res| match res { - Ok((digest, obj)) => obj.into_layer().map(|b| Ok((digest, b))), + Ok(graph::DatabaseItem::Object(digest, obj)) => { + obj.into_layer().map(|b| Ok((digest, b))) + } + Ok(graph::DatabaseItem::Payload(_digest)) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/manifest.rs b/crates/spfs/src/storage/manifest.rs index 455a630ff4..fa719d81fe 100644 --- a/crates/spfs/src/storage/manifest.rs +++ b/crates/spfs/src/storage/manifest.rs @@ -20,7 +20,10 @@ pub trait ManifestStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are manifests. fn iter_manifests<'db>(&'db self) -> Pin + 'db>> { let stream = self.iter_objects().filter_map(|res| match res { - Ok((digest, obj)) => obj.into_manifest().map(|b| Ok((digest, b))), + Ok(graph::DatabaseItem::Object(digest, obj)) => { + obj.into_manifest().map(|b| Ok((digest, b))) + } + Ok(graph::DatabaseItem::Payload(_digest)) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/manifest_test.rs b/crates/spfs/src/storage/manifest_test.rs index 18b839fc82..17403ac77c 100644 --- a/crates/spfs/src/storage/manifest_test.rs +++ b/crates/spfs/src/storage/manifest_test.rs @@ -20,6 +20,8 @@ async fn test_read_write_manifest( repo: TempRepo, tmpdir: tempfile::TempDir, ) { + use crate::graph::FoundDigest; + let dir = tmpdir.path(); let repo = repo.await; std::fs::File::create(dir.join("file.txt")).unwrap(); @@ -40,11 +42,11 @@ async fn test_read_write_manifest( repo.write_object(&manifest2).await.unwrap(); let digests: crate::Result> = repo - .find_digests(crate::graph::DigestSearchCriteria::All) + .find_digests(&crate::graph::DigestSearchCriteria::All) .collect() .await; let digests = digests.unwrap(); - assert!(digests.contains(&expected)); + assert!(digests.contains(&FoundDigest::Object(expected))); } #[rstest] diff --git a/crates/spfs/src/storage/mod.rs b/crates/spfs/src/storage/mod.rs index b719f2c2b5..d586498435 100644 --- a/crates/spfs/src/storage/mod.rs +++ b/crates/spfs/src/storage/mod.rs @@ -3,7 +3,6 @@ // https://github.com/spkenv/spk mod address; -mod blob; mod error; mod layer; mod manifest; @@ -24,7 +23,6 @@ pub mod rpc; pub mod tar; pub use address::Address; -pub use blob::{BlobStorage, BlobStorageExt}; pub use error::OpenRepositoryError; pub use handle::RepositoryHandle; pub use layer::{LayerStorage, LayerStorageExt}; diff --git a/crates/spfs/src/storage/payload.rs b/crates/spfs/src/storage/payload.rs index d04a75d651..2fc4d66c99 100644 --- a/crates/spfs/src/storage/payload.rs +++ b/crates/spfs/src/storage/payload.rs @@ -4,6 +4,7 @@ use std::pin::Pin; +use chrono::{DateTime, Utc}; use futures::Stream; use crate::tracking::BlobRead; @@ -22,17 +23,11 @@ pub trait PayloadStorage: Sync + Send { /// Return true if the identified payload exists. async fn has_payload(&self, digest: encoding::Digest) -> bool; + /// Return the payload size if the identified payload exists. + async fn payload_size(&self, digest: encoding::Digest) -> Result; + /// Store the contents of the given stream, returning its digest and size - /// - /// # Safety - /// - /// It is unsafe to write payload data without also creating a blob - /// to track that payload in the database. Usually, its better to - /// call [`super::RepositoryExt::commit_blob`] instead. - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)>; + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)>; /// Return a handle and filename to the full content of a payload. /// @@ -48,6 +43,19 @@ pub trait PayloadStorage: Sync + Send { /// Errors: /// - [`crate::Error::UnknownObject`]: if the payload does not exist in this storage async fn remove_payload(&self, digest: encoding::Digest) -> Result<()>; + + /// Remove the payload identified by the given digest. + /// + /// It is only removed if it is older than the given timestamp. Returns true + /// if the payload was removed, false if it was not. + /// + /// Errors: + /// - [`crate::Error::UnknownObject`]: if the payload does not exist in this storage + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result; } #[async_trait::async_trait] @@ -56,17 +64,16 @@ impl PayloadStorage for &T { PayloadStorage::has_payload(&**self, digest).await } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + PayloadStorage::payload_size(&**self, digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { PayloadStorage::iter_payload_digests(&**self) } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - unsafe { PayloadStorage::write_data(&**self, reader).await } + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + PayloadStorage::write_data(&**self, reader).await } async fn open_payload( @@ -79,4 +86,12 @@ impl PayloadStorage for &T { async fn remove_payload(&self, digest: encoding::Digest) -> Result<()> { PayloadStorage::remove_payload(&**self, digest).await } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + PayloadStorage::remove_payload_if_older_than(&**self, older_than, digest).await + } } diff --git a/crates/spfs/src/storage/payload_test.rs b/crates/spfs/src/storage/payload_test.rs index 7896143e8e..aee6f8e53a 100644 --- a/crates/spfs/src/storage/payload_test.rs +++ b/crates/spfs/src/storage/payload_test.rs @@ -23,13 +23,10 @@ async fn test_payload_io( let bytes = "simple string data".as_bytes(); let reader = Box::pin(bytes); - // Safety: we are intentionally calling this function to test it - let (digest, size) = unsafe { - tmprepo - .write_data(reader) - .await - .expect("failed to write payload data") - }; + let (digest, size) = tmprepo + .write_data(reader) + .await + .expect("failed to write payload data"); assert_eq!(size, bytes.len() as u64); let mut actual = String::new(); @@ -58,13 +55,10 @@ async fn test_payload_existence( let bytes = "simple string data".as_bytes(); let reader = Box::pin(bytes); - // Safety: we are intentionally calling this unsafe function to test it - let (digest, size) = unsafe { - tmprepo - .write_data(reader) - .await - .expect("failed to write payload data") - }; + let (digest, size) = tmprepo + .write_data(reader) + .await + .expect("failed to write payload data"); assert_eq!(size, bytes.len() as u64); let actual = tmprepo.has_payload(digest).await; @@ -99,15 +93,15 @@ async fn test_payloads_iter( let mut expected = vec![ tmprepo - .commit_blob(reader_0) + .commit_payload(reader_0) .await .expect("failed to write payload data"), tmprepo - .commit_blob(reader_1) + .commit_payload(reader_1) .await .expect("failed to write payload data"), tmprepo - .commit_blob(reader_2) + .commit_payload(reader_2) .await .expect("failed to write payload data"), ]; diff --git a/crates/spfs/src/storage/pinned/repository.rs b/crates/spfs/src/storage/pinned/repository.rs index bb7a272d57..9f2958d84b 100644 --- a/crates/spfs/src/storage/pinned/repository.rs +++ b/crates/spfs/src/storage/pinned/repository.rs @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc}; use futures::Stream; use spfs_encoding as encoding; -use crate::graph::ObjectProto; +use crate::graph::{FoundDigest, ObjectProto}; use crate::storage::prelude::*; use crate::tracking::BlobRead; use crate::{Error, Result, graph}; @@ -63,10 +63,10 @@ where self.inner.read_object(digest).await } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { self.inner.find_digests(search_criteria) } @@ -78,10 +78,7 @@ where self.inner.walk_objects(root) } - async fn resolve_full_digest( - &self, - partial: &encoding::PartialDigest, - ) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.inner.resolve_full_digest(partial).await } } @@ -127,21 +124,20 @@ where self.inner.has_payload(digest).await } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + self.inner.payload_size(digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { self.inner.iter_payload_digests() } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { // payloads are stored by digest, not time, and so can still // be safely written to a past repository view. In practice, // this allows some recovery and sync operations to still function // on pinned repositories - - // Safety: we are simply calling the same inner unsafe function - unsafe { self.inner.write_data(reader).await } + self.inner.write_data(reader).await } async fn open_payload( @@ -154,6 +150,14 @@ where async fn remove_payload(&self, _digest: encoding::Digest) -> Result<()> { Err(Error::RepositoryIsPinned) } + + async fn remove_payload_if_older_than( + &self, + _older_than: DateTime, + _digest: encoding::Digest, + ) -> Result { + Err(Error::RepositoryIsPinned) + } } impl Address for PinnedRepository diff --git a/crates/spfs/src/storage/platform.rs b/crates/spfs/src/storage/platform.rs index d4ca6e3731..567beed180 100644 --- a/crates/spfs/src/storage/platform.rs +++ b/crates/spfs/src/storage/platform.rs @@ -16,7 +16,10 @@ pub trait PlatformStorage: graph::Database + Sync + Send { /// Iterate the objects in this storage which are platforms. fn iter_platforms<'db>(&'db self) -> Pin + 'db>> { let stream = self.iter_objects().filter_map(|res| match res { - Ok((digest, obj)) => obj.into_platform().map(|b| Ok((digest, b))), + Ok(graph::DatabaseItem::Object(digest, obj)) => { + obj.into_platform().map(|b| Ok((digest, b))) + } + Ok(graph::DatabaseItem::Payload(_digest)) => None, Err(err) => Some(Err(err)), }); Box::pin(stream) diff --git a/crates/spfs/src/storage/prelude.rs b/crates/spfs/src/storage/prelude.rs index bfa5723345..2be1c18291 100644 --- a/crates/spfs/src/storage/prelude.rs +++ b/crates/spfs/src/storage/prelude.rs @@ -5,8 +5,6 @@ pub use super::config::{FromConfig, FromUrl}; pub use super::{ Address, - BlobStorage, - BlobStorageExt, LayerStorage, LayerStorageExt, ManifestStorage, diff --git a/crates/spfs/src/storage/proxy/mod.rs b/crates/spfs/src/storage/proxy/mod.rs index 3fa442b3be..d331eadf5b 100644 --- a/crates/spfs/src/storage/proxy/mod.rs +++ b/crates/spfs/src/storage/proxy/mod.rs @@ -12,6 +12,7 @@ pub(crate) use repository::{ find_tags_in_namespace, iter_tag_streams_in_namespace, ls_tags_in_namespace, + payload_size, read_tag_in_namespace, }; diff --git a/crates/spfs/src/storage/proxy/repository.rs b/crates/spfs/src/storage/proxy/repository.rs index 216676e77b..eb2d1a92cc 100644 --- a/crates/spfs/src/storage/proxy/repository.rs +++ b/crates/spfs/src/storage/proxy/repository.rs @@ -13,7 +13,7 @@ use futures::{Stream, StreamExt, future}; use relative_path::RelativePath; use crate::config::{ToAddress, default_proxy_repo_include_secondary_tags}; -use crate::graph::ObjectProto; +use crate::graph::{FoundDigest, ObjectProto}; use crate::prelude::*; use crate::storage::proxy::ProxyRepositoryExt; use crate::storage::tag::TagSpecAndTagStream; @@ -159,10 +159,10 @@ impl graph::DatabaseView for ProxyRepository { res } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { self.primary.find_digests(search_criteria) } @@ -202,6 +202,21 @@ impl graph::DatabaseExt for ProxyRepository { } } +pub(crate) async fn payload_size(repo: R, digest: encoding::Digest) -> Result +where + R: ProxyRepositoryExt, +{ + if let Ok(size) = repo.primary().payload_size(digest).await { + return Ok(size); + } + for secondary in repo.secondary().iter() { + if let Ok(size) = secondary.payload_size(digest).await { + return Ok(size); + } + } + Err(crate::Error::UnknownObject(digest)) +} + #[async_trait::async_trait] impl PayloadStorage for ProxyRepository { async fn has_payload(&self, digest: encoding::Digest) -> bool { @@ -216,18 +231,16 @@ impl PayloadStorage for ProxyRepository { false } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + payload_size(self, digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { self.primary.iter_payload_digests() } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - let res = unsafe { self.primary.write_data(reader).await? }; - Ok(res) + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + self.primary.write_data(reader).await } async fn open_payload( @@ -253,6 +266,17 @@ impl PayloadStorage for ProxyRepository { self.primary.remove_payload(digest).await?; Ok(()) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + Ok(self + .primary + .remove_payload_if_older_than(older_than, digest) + .await?) + } } impl ProxyRepositoryExt for ProxyRepository { diff --git a/crates/spfs/src/storage/proxy/repository_test.rs b/crates/spfs/src/storage/proxy/repository_test.rs index d0b17a05e4..c7bc741ccd 100644 --- a/crates/spfs/src/storage/proxy/repository_test.rs +++ b/crates/spfs/src/storage/proxy/repository_test.rs @@ -26,7 +26,7 @@ async fn test_proxy_payload_read_through(tmpdir: tempfile::TempDir) { .unwrap(); let digest = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); @@ -56,7 +56,7 @@ async fn test_proxy_object_read_through(tmpdir: tempfile::TempDir) { .unwrap(); let payload = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); @@ -66,10 +66,10 @@ async fn test_proxy_object_read_through(tmpdir: tempfile::TempDir) { include_secondary_tags: default_proxy_repo_include_secondary_tags(), }; - proxy - .read_object(payload) - .await - .expect("object should be loadable via the secondary repo"); + assert!( + proxy.has_payload(payload).await, + "payload should exist via the secondary repo" + ); } #[rstest] @@ -86,7 +86,7 @@ async fn test_proxy_tag_read_through(tmpdir: tempfile::TempDir) { .unwrap(); let payload = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -114,7 +114,7 @@ async fn test_proxy_tag_ls(tmpdir: tempfile::TempDir) { .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -126,7 +126,7 @@ async fn test_proxy_tag_ls(tmpdir: tempfile::TempDir) { .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); secondary.push_tag(&tag_spec, &payload2).await.unwrap(); @@ -163,7 +163,7 @@ async fn test_proxy_tag_ls_config_for_primary_only(tmpdir: tempfile::TempDir) { .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -175,7 +175,7 @@ async fn test_proxy_tag_ls_config_for_primary_only(tmpdir: tempfile::TempDir) { .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec2 = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through2").unwrap(); @@ -223,7 +223,7 @@ async fn test_proxy_tag_find(tmpdir: tempfile::TempDir) { .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -235,7 +235,7 @@ async fn test_proxy_tag_find(tmpdir: tempfile::TempDir) { .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); secondary.push_tag(&tag_spec, &payload2).await.unwrap(); @@ -271,7 +271,7 @@ async fn test_proxy_tag_find_for_primary_only(tmpdir: tempfile::TempDir) { .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -283,7 +283,7 @@ async fn test_proxy_tag_find_for_primary_only(tmpdir: tempfile::TempDir) { .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec2 = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through2").unwrap(); @@ -322,7 +322,7 @@ async fn test_proxy_tag_iter_streams(tmpdir: tempfile::TempDir) { .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -334,7 +334,7 @@ async fn test_proxy_tag_iter_streams(tmpdir: tempfile::TempDir) { .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); secondary.push_tag(&tag_spec, &payload2).await.unwrap(); @@ -370,7 +370,7 @@ async fn test_proxy_tag_iter_streams_for_primary_only(tmpdir: tempfile::TempDir) .unwrap(); let payload1 = primary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through").unwrap(); @@ -382,7 +382,7 @@ async fn test_proxy_tag_iter_streams_for_primary_only(tmpdir: tempfile::TempDir) .unwrap(); let payload2 = secondary - .commit_blob(Box::pin(b"some data".as_slice())) + .commit_payload(Box::pin(b"some data".as_slice())) .await .unwrap(); let tag_spec2 = crate::tracking::TagSpec::parse("spfs-test/proxy-read-through2").unwrap(); diff --git a/crates/spfs/src/storage/repository.rs b/crates/spfs/src/storage/repository.rs index aba782d1f4..469124d1f9 100644 --- a/crates/spfs/src/storage/repository.rs +++ b/crates/spfs/src/storage/repository.rs @@ -43,7 +43,6 @@ pub trait Repository: + super::TagStorage + super::PayloadStorage + super::ManifestStorage - + super::BlobStorage + super::LayerStorage + super::PlatformStorage + graph::Database @@ -53,6 +52,8 @@ pub trait Repository: + Sync { /// Return true if this repository contains the given reference. + /// + /// This does not work for payload digests. async fn has_ref(&self, reference: &str) -> bool { self.read_ref(reference).await.is_ok() } @@ -67,10 +68,18 @@ pub trait Repository: let partial = encoding::PartialDigest::parse(reference) .map_err(|_| Error::UnknownReference(reference.to_string()))?; - self.resolve_full_digest(&partial).await + // This will discover the type of item but discard it. Do callers want + // this information? A new type could be added to wrap FoundDigest with + // a variant that doesn't have the type information. Note how resolving + // a tag above does not determine the type, or require the item exists. + self.resolve_full_digest(&partial) + .await + .map(|found_digest| found_digest.into_digest()) } /// Read an object of unknown type by tag or digest. + /// + /// This does not work for payload digests. async fn read_ref(&self, reference: &str) -> Result { let digest = self.resolve_ref(reference).await?; self.read_object(digest).await @@ -107,7 +116,6 @@ impl Repository for T where + super::TagStorage + super::PayloadStorage + super::ManifestStorage - + super::BlobStorage + super::LayerStorage + super::PlatformStorage + graph::Database @@ -120,13 +128,9 @@ impl Repository for T where #[async_trait] pub trait RepositoryExt: super::PayloadStorage + graph::DatabaseExt { - /// Commit the data from 'reader' as a blob in this repository - async fn commit_blob(&self, reader: Pin>) -> Result { - // Safety: it is unsafe to write data without also creating a blob - // to track that payload, which is exactly what this function is doing - let (digest, size) = unsafe { self.write_data(reader).await? }; - let blob = graph::Blob::new(digest, size); - self.write_object(&blob).await?; + /// Commit the data from 'reader' as a payload in this repository + async fn commit_payload(&self, reader: Pin>) -> Result { + let (digest, _size) = self.write_data(reader).await?; Ok(digest) } } diff --git a/crates/spfs/src/storage/rpc/database.rs b/crates/spfs/src/storage/rpc/database.rs index 16f3649416..4aebc795ba 100644 --- a/crates/spfs/src/storage/rpc/database.rs +++ b/crates/spfs/src/storage/rpc/database.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use futures::{Stream, TryStreamExt}; use proto::RpcResult; -use crate::graph::{self, ObjectProto}; +use crate::graph::{self, FoundDigest, ObjectProto}; use crate::{Result, encoding, proto}; #[async_trait::async_trait] @@ -40,12 +40,12 @@ impl graph::DatabaseView for super::RpcRepository { obj.try_into() } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { let request = proto::FindDigestsRequest { - search_criteria: Some(search_criteria.into()), + search_criteria: Some(search_criteria.clone().into()), }; let mut client = self.db_client.clone(); let stream = futures::stream::once(async move { client.find_digests(request).await }) diff --git a/crates/spfs/src/storage/rpc/payload.rs b/crates/spfs/src/storage/rpc/payload.rs index 210fedde4c..d41630d4b8 100644 --- a/crates/spfs/src/storage/rpc/payload.rs +++ b/crates/spfs/src/storage/rpc/payload.rs @@ -5,6 +5,7 @@ use std::convert::TryInto; use std::pin::Pin; +use chrono::{DateTime, Utc}; use futures::{Stream, TryStreamExt}; use prost::Message; @@ -27,6 +28,20 @@ impl storage::PayloadStorage for super::RpcRepository { .unwrap_or(false) } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + let request = proto::PayloadSizeRequest { + digest: Some(digest.into()), + }; + let response = self + .payload_client + .clone() + .payload_size(request) + .await? + .into_inner() + .to_result()?; + Ok(response) + } + fn iter_payload_digests(&self) -> Pin> + Send>> { let request = proto::IterDigestsRequest {}; let mut client = self.payload_client.clone(); @@ -39,10 +54,7 @@ impl storage::PayloadStorage for super::RpcRepository { Box::pin(stream) } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { let request = proto::WritePayloadRequest {}; let option = self .payload_client @@ -137,6 +149,24 @@ impl storage::PayloadStorage for super::RpcRepository { .to_result()?; Ok(()) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + let request = proto::RemovePayloadIfOlderThanRequest { + older_than: Some(proto::convert_from_datetime(&older_than)), + digest: Some(digest.into()), + }; + Ok(self + .payload_client + .clone() + .remove_payload_if_older_than(request) + .await? + .into_inner() + .to_result()?) + } } impl super::RpcRepository { diff --git a/crates/spfs/src/storage/tar/repository.rs b/crates/spfs/src/storage/tar/repository.rs index a17dbc7d49..38e448e860 100644 --- a/crates/spfs/src/storage/tar/repository.rs +++ b/crates/spfs/src/storage/tar/repository.rs @@ -14,7 +14,7 @@ use relative_path::RelativePath; use tar::{Archive, Builder}; use crate::config::{ToAddress, pathbuf_deserialize_with_tilde_expansion}; -use crate::graph::ObjectProto; +use crate::graph::{FoundDigest, ObjectProto}; use crate::prelude::*; use crate::storage::fs::DURABLE_EDITS_DIR; use crate::storage::tag::TagSpecAndTagStream; @@ -225,10 +225,10 @@ impl graph::DatabaseView for TarRepository { self.repo.read_object(digest).await } - fn find_digests( + fn find_digests<'a>( &self, - search_criteria: graph::DigestSearchCriteria, - ) -> Pin> + Send>> { + search_criteria: &'a graph::DigestSearchCriteria, + ) -> Pin> + Send + 'a>> { self.repo.find_digests(search_criteria) } @@ -240,10 +240,7 @@ impl graph::DatabaseView for TarRepository { self.repo.walk_objects(root) } - async fn resolve_full_digest( - &self, - partial: &encoding::PartialDigest, - ) -> Result { + async fn resolve_full_digest(&self, partial: &encoding::PartialDigest) -> Result { self.repo.resolve_full_digest(partial).await } } @@ -287,17 +284,16 @@ impl PayloadStorage for TarRepository { self.repo.has_payload(digest).await } + async fn payload_size(&self, digest: encoding::Digest) -> Result { + self.repo.payload_size(digest).await + } + fn iter_payload_digests(&self) -> Pin> + Send>> { self.repo.iter_payload_digests() } - async unsafe fn write_data( - &self, - reader: Pin>, - ) -> Result<(encoding::Digest, u64)> { - // Safety: we are wrapping the same underlying unsafe function and - // so the same safety holds for our callers - let res = unsafe { self.repo.write_data(reader).await? }; + async fn write_data(&self, reader: Pin>) -> Result<(encoding::Digest, u64)> { + let res = self.repo.write_data(reader).await?; self.up_to_date .store(false, std::sync::atomic::Ordering::Release); Ok(res) @@ -316,6 +312,22 @@ impl PayloadStorage for TarRepository { .store(false, std::sync::atomic::Ordering::Release); Ok(()) } + + async fn remove_payload_if_older_than( + &self, + older_than: DateTime, + digest: encoding::Digest, + ) -> Result { + let deleted = self + .repo + .remove_payload_if_older_than(older_than, digest) + .await?; + if deleted { + self.up_to_date + .store(false, std::sync::atomic::Ordering::Release); + } + Ok(deleted) + } } #[async_trait::async_trait] diff --git a/crates/spfs/src/sync.rs b/crates/spfs/src/sync.rs index 2848e8c041..3ac389a9e2 100644 --- a/crates/spfs/src/sync.rs +++ b/crates/spfs/src/sync.rs @@ -26,6 +26,7 @@ use tokio::sync::Semaphore; use crate::graph::AnnotationValue; use crate::prelude::*; +use crate::sync::reporter::SyncItemResult; use crate::{Error, Result, encoding, graph, storage, tracking}; /// The default limit for concurrent manifest sync operations @@ -202,14 +203,17 @@ impl<'src, 'dst> Syncer<'src, 'dst> { tracing::debug!(?item, "Syncing item"); self.reporter.visit_env_item(&item); let res = match item { - tracking::EnvSpecItem::Digest(digest) => self - .sync_digest(digest) - .await - .map(SyncEnvItemResult::Object)?, - tracking::EnvSpecItem::PartialDigest(digest) => self - .sync_partial_digest(digest) - .await - .map(SyncEnvItemResult::Object)?, + tracking::EnvSpecItem::Digest(digest) => match self.sync_object_digest(digest).await { + Ok(r) => SyncEnvItemResult::Object(r), + Err(Error::UnknownObject(digest)) => self + .sync_payload(digest) + .await + .map(SyncEnvItemResult::Payload)?, + Err(e) => return Err(e), + }, + tracking::EnvSpecItem::PartialDigest(digest) => { + self.sync_partial_digest(digest).await.map(Into::into)? + } tracking::EnvSpecItem::TagSpec(tag_spec) => { self.sync_tag(tag_spec).await.map(SyncEnvItemResult::Tag)? } @@ -229,7 +233,14 @@ impl<'src, 'dst> Syncer<'src, 'dst> { } self.reporter.visit_tag(&tag); let resolved = self.src.resolve_tag(&tag).await?; - let result = self.sync_digest(resolved.target).await?; + let result = match self.sync_object_digest(resolved.target).await { + Ok(r) => SyncItemResult::Object(r), + Err(Error::UnknownObject(digest)) => self + .sync_payload(digest) + .await + .map(SyncItemResult::Payload)?, + Err(e) => return Err(e), + }; self.dest.insert_tag(&resolved).await?; let res = SyncTagResult::Synced { tag, result }; self.reporter.synced_tag(&res); @@ -239,9 +250,9 @@ impl<'src, 'dst> Syncer<'src, 'dst> { pub async fn sync_partial_digest( &self, partial: encoding::PartialDigest, - ) -> Result { - let mut res = self.src.resolve_full_digest(&partial).await; - res = match res { + ) -> Result { + let res = self.src.resolve_full_digest(&partial).await; + let found_digest = match res { Err(err) if self.policy.check_existing_objects() => { // there is a chance that this digest points to an existing object in // dest, which we don't want to fail on unless requested. In theory, @@ -257,12 +268,20 @@ impl<'src, 'dst> Syncer<'src, 'dst> { .map_err(|_| err) } res => res, - }; - let obj = self.read_object_with_fallback(res?).await?; - self.sync_object(obj).await + }?; + match found_digest { + graph::FoundDigest::Object(digest) => { + let obj_result = self.sync_object_digest(digest).await?; + Ok(SyncItemResult::Object(obj_result)) + } + graph::FoundDigest::Payload(digest) => { + let payload_result = self.sync_payload(digest).await?; + Ok(SyncItemResult::Payload(payload_result)) + } + } } - pub async fn sync_digest(&self, digest: encoding::Digest) -> Result { + pub async fn sync_object_digest(&self, digest: encoding::Digest) -> Result { // don't write the digest here, as that is the responsibility // of the function that actually handles the data copying. // a short-circuit is still nice when possible, though @@ -299,7 +318,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> { let mut futures = FuturesUnordered::new(); for digest in platform.iter_bottom_up() { - futures.push(self.sync_digest(*digest)); + futures.push(self.sync_object_digest(*digest)); } let mut results = Vec::with_capacity(futures.len()); while let Some(result) = futures.try_next().await? { @@ -405,7 +424,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> { return Ok(SyncAnnotationResult::Duplicate); } self.reporter.visit_annotation(&annotation); - let sync_result = self.sync_digest(*digest).await?; + let sync_result = self.sync_payload(*digest).await?; let res = SyncAnnotationResult::Synced { digest: *digest, result: Box::new(sync_result), @@ -447,21 +466,14 @@ impl<'src, 'dst> Syncer<'src, 'dst> { return Ok(SyncBlobResult::Duplicate); } - if self.policy.check_existing_objects() - && self.dest.has_object(*digest).await - && self.dest.has_payload(*blob.payload()).await - { + if self.policy.check_existing_objects() && self.dest.has_payload(*blob.payload()).await { self.processed_digests.insert(*digest); return Ok(SyncBlobResult::Skipped); } self.reporter.visit_blob(blob); - // Safety: sync_payload is unsafe to call unless the blob - // is synced with it, which is the purpose of this function. - let result = unsafe { - self.sync_payload_with_perms_opt(*blob.payload(), perms) - .await? - }; - self.dest.write_blob(blob.to_owned()).await?; + let result = self + .sync_payload_with_perms_opt(*blob.payload(), perms) + .await?; self.processed_digests.insert(*digest); let res = SyncBlobResult::Synced { blob: blob.to_owned(), @@ -472,26 +484,13 @@ impl<'src, 'dst> Syncer<'src, 'dst> { } /// Sync a payload with the provided digest - /// - /// # Safety - /// - /// It is unsafe to call this sync function on its own, - /// as any payload should be synced alongside its - /// corresponding Blob instance - use [`Self::sync_blob`] instead - pub async unsafe fn sync_payload(&self, digest: encoding::Digest) -> Result { - // Safety: these concerns are passed on to the caller - unsafe { self.sync_payload_with_perms_opt(digest, None).await } + pub async fn sync_payload(&self, digest: encoding::Digest) -> Result { + self.sync_payload_with_perms_opt(digest, None).await } /// Sync a payload with the provided digest and optional set /// of desired permissions. - /// - /// # Safety - /// - /// It is unsafe to call this sync function on its own, - /// as any payload should be synced alongside its - /// corresponding Blob instance - use [`Self::sync_blob`] instead - pub(crate) async unsafe fn sync_payload_with_perms_opt( + pub(crate) async fn sync_payload_with_perms_opt( &self, digest: encoding::Digest, perms: Option, @@ -515,9 +514,7 @@ impl<'src, 'dst> Syncer<'src, 'dst> { payload = Box::pin(payload.with_permissions(perms)); } - // Safety: this is the unsafe part where we actually create - // the payload without a corresponding blob - let (created_digest, size) = unsafe { self.dest.write_data(payload).await? }; + let (created_digest, size) = self.dest.write_data(payload).await?; if digest != created_digest { return Err(Error::String(format!( "Source repository provided payload that did not match the requested digest: wanted {digest}, got {created_digest}. wrote {size} bytes", diff --git a/crates/spfs/src/sync/reporter.rs b/crates/spfs/src/sync/reporter.rs index 6fd94c725f..c3e0dbdac6 100644 --- a/crates/spfs/src/sync/reporter.rs +++ b/crates/spfs/src/sync/reporter.rs @@ -438,33 +438,56 @@ impl std::iter::Sum for SyncSummary { } } +#[enum_dispatch::enum_dispatch] +pub trait Summary { + fn summary(&self) -> SyncSummary; +} + +impl Summary for &T +where + T: Summary, +{ + fn summary(&self) -> SyncSummary { + (**self).summary() + } +} + #[derive(Debug)] pub struct SyncEnvResult { pub env: tracking::EnvSpec, pub results: Vec, } -impl SyncEnvResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncEnvResult { + fn summary(&self) -> SyncSummary { self.results.iter().map(|r| r.summary()).sum() } } #[derive(Debug)] +#[enum_dispatch::enum_dispatch(Summary)] pub enum SyncEnvItemResult { Tag(SyncTagResult), Object(SyncObjectResult), + Payload(SyncPayloadResult), } -impl SyncEnvItemResult { - pub fn summary(&self) -> SyncSummary { - match self { - Self::Tag(r) => r.summary(), - Self::Object(r) => r.summary(), +impl From for SyncEnvItemResult { + fn from(value: SyncItemResult) -> Self { + match value { + SyncItemResult::Object(obj) => Self::Object(obj), + SyncItemResult::Payload(payload) => Self::Payload(payload), } } } +#[derive(Debug)] +#[enum_dispatch::enum_dispatch(Summary)] +pub enum SyncItemResult { + Object(SyncObjectResult), + Payload(SyncPayloadResult), +} + #[derive(Debug)] pub enum SyncTagResult { /// The tag did not need to be synced @@ -474,12 +497,12 @@ pub enum SyncTagResult { /// The tag was synced Synced { tag: tracking::TagSpec, - result: SyncObjectResult, + result: SyncItemResult, }, } -impl SyncTagResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncTagResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary { skipped_tags: 1, @@ -508,8 +531,8 @@ pub enum SyncObjectResult { Annotation(SyncAnnotationResult), } -impl SyncObjectResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncObjectResult { + fn summary(&self) -> SyncSummary { use SyncObjectResult as R; match self { R::Duplicate => SyncSummary { @@ -539,8 +562,8 @@ pub enum SyncPlatformResult { }, } -impl SyncPlatformResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncPlatformResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), Self::Synced { results, .. } => { @@ -565,8 +588,8 @@ pub enum SyncLayerResult { }, } -impl SyncLayerResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncLayerResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), Self::Synced { results, .. } => { @@ -591,8 +614,8 @@ pub enum SyncManifestResult { }, } -impl SyncManifestResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncManifestResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), Self::Synced { results, .. } => { @@ -610,15 +633,15 @@ pub enum SyncAnnotationResult { InternalValue, /// The annotation was already synced in this session Duplicate, - /// The annotation was stored in a blob and was synced + /// The annotation was stored in a payload and was synced Synced { digest: encoding::Digest, - result: Box, + result: Box, }, } -impl SyncAnnotationResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncAnnotationResult { + fn summary(&self) -> SyncSummary { match self { Self::InternalValue | Self::Duplicate => SyncSummary::default(), Self::Synced { @@ -639,8 +662,8 @@ pub enum SyncEntryResult { Synced { result: SyncBlobResult }, } -impl SyncEntryResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncEntryResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::default(), Self::Synced { result, .. } => { @@ -665,8 +688,8 @@ pub enum SyncBlobResult { }, } -impl SyncBlobResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncBlobResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary::skipped_one_object(), Self::Synced { result, .. } => { @@ -688,8 +711,8 @@ pub enum SyncPayloadResult { Synced { size: u64 }, } -impl SyncPayloadResult { - pub fn summary(&self) -> SyncSummary { +impl Summary for SyncPayloadResult { + fn summary(&self) -> SyncSummary { match self { Self::Skipped | Self::Duplicate => SyncSummary { skipped_payloads: 1, diff --git a/crates/spfs/src/sync_test.rs b/crates/spfs/src/sync_test.rs index c8bbea30a1..d142835122 100644 --- a/crates/spfs/src/sync_test.rs +++ b/crates/spfs/src/sync_test.rs @@ -170,7 +170,7 @@ async fn test_sync_ref_including_annotation_blob( // Set up an annotation in a blob for this test let value = "this is a test annotation blob for syncing"; let annotation_digest = repo_a - .commit_blob(Box::pin(std::io::Cursor::new( + .commit_payload(Box::pin(std::io::Cursor::new( value.to_owned().into_bytes(), ))) .await @@ -205,7 +205,7 @@ async fn test_sync_ref_including_annotation_blob( .has_object(layer_with_annotation.digest().unwrap()) .await ); - assert!(repo_b.has_object(annotation_digest).await); + assert!(repo_b.has_payload(annotation_digest).await); Syncer::new(&repo_b, &repo_a) .sync_ref("testing") @@ -218,7 +218,7 @@ async fn test_sync_ref_including_annotation_blob( .has_object(layer_with_annotation.digest().unwrap()) .await ); - assert!(repo_a.has_object(annotation_digest).await); + assert!(repo_a.has_payload(annotation_digest).await); } #[rstest] @@ -275,7 +275,7 @@ async fn test_sync_missing_from_source( let platform_digest = platform.digest().unwrap(); let partial = platform_digest[..10].into(); syncer - .sync_digest(platform_digest) + .sync_object_digest(platform_digest) .await .expect("Should not fail when object is already in destination"); syncer diff --git a/crates/spfs/src/tracking/env.rs b/crates/spfs/src/tracking/env.rs index a1cd34dce2..2866d3c21b 100644 --- a/crates/spfs/src/tracking/env.rs +++ b/crates/spfs/src/tracking/env.rs @@ -228,16 +228,25 @@ pub enum EnvSpecItem { } impl EnvSpecItem { - /// Find the object digest for this item. + /// Find the digest for this env spec item. /// - /// Any necessary lookups are done using the provided repository + /// Any necessary lookups are done using the provided repository. + /// + /// It is possible for this to succeed for tags even when no object or + /// payload exists with the digest. + /// + /// The returned digest may refer to an object, a payload, or a non-existent + /// item. pub async fn resolve_digest(&self, repo: &R) -> Result where R: crate::storage::Repository + ?Sized, { match self { Self::TagSpec(spec) => repo.resolve_tag(spec).await.map(|t| t.target), - Self::PartialDigest(part) => repo.resolve_full_digest(part).await, + Self::PartialDigest(part) => repo + .resolve_full_digest(part) + .await + .map(|found_digest| found_digest.into_digest()), Self::Digest(digest) => Ok(*digest), Self::SpecFile(_) => Err(Error::String(String::from( "Impossible operation: spfs env files do not have digests", diff --git a/crates/spk-cli/cmd-debug/src/cmd_debug.rs b/crates/spk-cli/cmd-debug/src/cmd_debug.rs index 86d44fe901..cdacd9f46e 100644 --- a/crates/spk-cli/cmd-debug/src/cmd_debug.rs +++ b/crates/spk-cli/cmd-debug/src/cmd_debug.rs @@ -96,7 +96,7 @@ impl Run for Debug { { let syncer = spfs::Syncer::new(repo, &local_repo) .with_reporter(spfs::sync::reporter::SyncReporters::console()); - syncer.sync_digest(layer).await?; + syncer.sync_object_digest(layer).await?; } rt.push_digest(layer); diff --git a/crates/spk-cli/group3/src/cmd_import.rs b/crates/spk-cli/group3/src/cmd_import.rs index 764f00a1fe..730c83f609 100644 --- a/crates/spk-cli/group3/src/cmd_import.rs +++ b/crates/spk-cli/group3/src/cmd_import.rs @@ -6,6 +6,7 @@ use clap::Args; use futures::TryStreamExt; use miette::{Context, Result}; use spfs::storage::TagStorage; +use spfs::sync::reporter::Summary; use spk_cli_common::{CommandArgs, Run}; #[cfg(test)] diff --git a/crates/spk-exec/src/exec.rs b/crates/spk-exec/src/exec.rs index e76e1e60d0..1d61fb75dc 100644 --- a/crates/spk-exec/src/exec.rs +++ b/crates/spk-exec/src/exec.rs @@ -318,7 +318,7 @@ where spec.ident().format_ident(), ); let syncer = spfs::Syncer::new(repo, &local_repo).with_reporter(reporter()); - syncer.sync_digest(digest).await?; + syncer.sync_object_digest(digest).await?; } } diff --git a/crates/spk-storage/src/fixtures.rs b/crates/spk-storage/src/fixtures.rs index 9ba97719db..b29321505f 100644 --- a/crates/spk-storage/src/fixtures.rs +++ b/crates/spk-storage/src/fixtures.rs @@ -111,7 +111,7 @@ pub async fn make_repo(kind: RepoKind) -> TempRepo { .await .expect("failed to establish temporary local repo for test"); let written = spfs_repo - .commit_blob(Box::pin(std::io::Cursor::new(b""))) + .commit_payload(Box::pin(std::io::Cursor::new(b""))) .await .expect("failed to add an empty object to spfs"); let empty_manifest = spfs::graph::Manifest::default(); diff --git a/crates/spk-storage/src/storage/spfs.rs b/crates/spk-storage/src/storage/spfs.rs index f6fa4375db..2e5f4d0678 100644 --- a/crates/spk-storage/src/storage/spfs.rs +++ b/crates/spk-storage/src/storage/spfs.rs @@ -393,7 +393,7 @@ impl Storage for SpfsRepository { })?; let digest = self .inner - .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .commit_payload(Box::pin(std::io::Cursor::new(payload.into_bytes()))) .await?; self.inner.push_tag(&tag_spec, &digest).await?; self.invalidate_caches(); @@ -442,7 +442,7 @@ impl Storage for SpfsRepository { })?; let digest = self .inner - .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .commit_payload(Box::pin(std::io::Cursor::new(payload.into_bytes()))) .await?; self.inner.push_tag(&tag_spec, &digest).await?; self.invalidate_caches(); @@ -470,7 +470,7 @@ impl Storage for SpfsRepository { })?; let digest = self .inner - .commit_blob(Box::pin(std::io::Cursor::new(payload.into_bytes()))) + .commit_payload(Box::pin(std::io::Cursor::new(payload.into_bytes()))) .await?; self.inner.push_tag(&tag_spec, &digest).await?; self.invalidate_caches(); @@ -1019,7 +1019,7 @@ impl SpfsRepository { let yaml = serde_yaml::to_string(meta).map_err(Error::InvalidRepositoryMetadata)?; let digest = self .inner - .commit_blob(Box::pin(std::io::Cursor::new(yaml.into_bytes()))) + .commit_payload(Box::pin(std::io::Cursor::new(yaml.into_bytes()))) .await?; self.inner.push_tag(&tag_spec, &digest).await?; self.invalidate_caches();