diff --git a/Cargo.lock b/Cargo.lock index f8011668d3..dcee97ebd5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7524,6 +7524,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "peakmem-alloc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb7428a977a472465aced57d8d2335d6167c0ce9c05c283fd6faed3d8d948f6" + [[package]] name = "pem" version = "3.0.5" @@ -13186,11 +13192,14 @@ dependencies = [ "anyhow", "base64 0.22.1", "bcs", + "clap", "criterion", "enum_dispatch", "fastcrypto 0.1.9 (git+https://github.com/MystenLabs/fastcrypto?rev=4db0e90c732bbf7420ca20de808b698883148d9c)", "hex", + "libc", "p256", + "peakmem-alloc", "rand 0.8.5", "reed-solomon-simd", "serde", @@ -13202,6 +13211,7 @@ dependencies = [ "tracing-subscriber", "utoipa", "walrus-test-utils", + "walrus-utils", ] [[package]] diff --git a/crates/walrus-core/Cargo.toml b/crates/walrus-core/Cargo.toml index ca4fc447d1..dbb96eab22 100644 --- a/crates/walrus-core/Cargo.toml +++ b/crates/walrus-core/Cargo.toml @@ -29,10 +29,14 @@ walrus-test-utils = { workspace = true, optional = true } [dev-dependencies] anyhow.workspace = true +clap.workspace = true criterion.workspace = true +libc.workspace = true +peakmem-alloc = "0.3" serde_test.workspace = true tracing-subscriber.workspace = true walrus-test-utils.workspace = true +walrus-utils = { path = "../walrus-utils" } [lints] workspace = true @@ -47,3 +51,10 @@ harness = false [[bench]] name = "blob_encoding" harness = false + +[[bench]] +name = "encoding_phases" +harness = false + +[[example]] +name = "profile_encoding" diff --git a/crates/walrus-core/benches/encoding_phases.rs b/crates/walrus-core/benches/encoding_phases.rs new file mode 100644 index 0000000000..5b63d6e23a --- /dev/null +++ b/crates/walrus-core/benches/encoding_phases.rs @@ -0,0 +1,248 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +// Allowing `unwrap`s in benchmarks. +#![allow(clippy::unwrap_used)] + +//! Phase-level benchmarks for the blob encoding pipeline at production parameters (n_shards=1000). +//! +//! Complements `blob_encoding.rs` (end-to-end) and `basic_encoding.rs` (individual RS/merkle ops) +//! by measuring each phase of `encode_with_metadata()` independently. + +use core::{num::NonZeroU16, time::Duration}; + +use criterion::{AxisScale, BenchmarkId, Criterion, PlotConfiguration}; +use fastcrypto::hash::Blake2b256; +use walrus_core::{ + encoding::{EncodingFactory as _, ReedSolomonEncoder, ReedSolomonEncodingConfig}, + merkle::{MerkleTree, Node, leaf_hash}, +}; +use walrus_test_utils::random_data; + +const N_SHARDS: u16 = 1000; + +const BLOB_SIZES: &[(u64, &str)] = &[(1 << 20, "1MiB"), (1 << 25, "32MiB"), (1 << 28, "256MiB")]; + +fn encoding_config() -> ReedSolomonEncodingConfig { + ReedSolomonEncodingConfig::new(NonZeroU16::new(N_SHARDS).unwrap()) +} + +/// Benchmark secondary encoding: RS-encode each row to produce repair secondary slivers. +fn secondary_encoding(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("secondary_encoding"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + let n_cols: usize = config.n_secondary_source_symbols().get().into(); + let row_len = n_cols * symbol_size; + + // Build row data (same layout as primary slivers' data). + let mut rows: Vec> = Vec::with_capacity(n_rows); + for r in 0..n_rows { + let start = r * row_len; + let end = (start + row_len).min(blob.len()); + let mut row = vec![0u8; row_len]; + if start < blob.len() { + row[..end - start].copy_from_slice(&blob[start..end]); + } + rows.push(row); + } + + group.bench_with_input( + BenchmarkId::new("encode_rows", size_str), + &rows, + |b, rows| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_secondary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + for row in rows { + let _ = enc.encode_all_repair_symbols(row); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark primary encoding: RS-encode each column to produce all primary symbols. +fn primary_encoding(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("primary_encoding"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_shards: usize = N_SHARDS.into(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + + // Build column data (each column = one secondary sliver's symbols data). + let col_len = n_rows * symbol_size; + let columns: Vec> = (0..n_shards).map(|_| random_data(col_len)).collect(); + + group.bench_with_input( + BenchmarkId::new("encode_columns", size_str), + &columns, + |b, columns| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_primary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + for col in columns { + let _ = enc.encode_all_ref(col); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark primary encoding + leaf hashing of each symbol. +fn primary_encoding_with_hashing(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("primary_encoding_with_hashing"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let encoder = config.get_blob_encoder(&blob).unwrap(); + let symbol_size = encoder.symbol_usize(); + let n_shards: usize = N_SHARDS.into(); + let n_rows: usize = config.n_primary_source_symbols().get().into(); + + let col_len = n_rows * symbol_size; + let columns: Vec> = (0..n_shards).map(|_| random_data(col_len)).collect(); + + group.bench_with_input( + BenchmarkId::new("encode_columns_and_hash", size_str), + &columns, + |b, columns| { + b.iter(|| { + let mut enc = ReedSolomonEncoder::new( + NonZeroU16::new(symbol_size.try_into().unwrap()).unwrap(), + config.n_primary_source_symbols(), + NonZeroU16::new(N_SHARDS).unwrap(), + ) + .unwrap(); + let mut hashes = vec![Node::Empty; n_shards * n_shards]; + for (col_index, col) in columns.iter().enumerate() { + let symbols = enc.encode_all_ref(col).unwrap(); + for (row_index, symbol) in symbols.to_symbols().enumerate() { + hashes[n_shards * row_index + col_index] = + leaf_hash::(symbol); + } + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark metadata computation from pre-computed symbol hashes (Merkle tree construction). +fn metadata_from_hashes(c: &mut Criterion) { + let mut group = c.benchmark_group("metadata_from_hashes"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + let n_shards: usize = N_SHARDS.into(); + // Pre-compute random hashes to isolate metadata construction time. + let hashes: Vec = (0..n_shards * n_shards) + .map(|i| { + let data = i.to_le_bytes(); + leaf_hash::(&data) + }) + .collect(); + + group.bench_with_input( + BenchmarkId::new("build_merkle_trees", size_str), + &hashes, + |b, hashes| { + b.iter(|| { + // Build 2 * n_shards Merkle trees (primary + secondary per sliver pair). + for sliver_index in 0..n_shards { + let _primary = MerkleTree::::build_from_leaf_hashes( + hashes[n_shards * sliver_index..n_shards * (sliver_index + 1)] + .iter() + .cloned(), + ); + let _secondary = MerkleTree::::build_from_leaf_hashes( + (0..n_shards).map(|symbol_index| { + hashes[n_shards * symbol_index + n_shards - 1 - sliver_index] + .clone() + }), + ); + } + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark the full `encode_with_metadata()` pipeline for comparison. +fn full_pipeline(c: &mut Criterion) { + let config = encoding_config(); + let mut group = c.benchmark_group("full_pipeline"); + group.plot_config(PlotConfiguration::default().summary_scale(AxisScale::Logarithmic)); + + for (blob_size, size_str) in BLOB_SIZES { + let blob = random_data((*blob_size).try_into().unwrap()); + group.throughput(criterion::Throughput::Bytes(*blob_size)); + + group.bench_with_input( + BenchmarkId::new("encode_with_metadata", size_str), + &blob, + |b, blob| { + b.iter(|| { + let encoder = config.get_blob_encoder(blob).unwrap(); + let _result = encoder.encode_with_metadata(); + }); + }, + ); + } + + group.finish(); +} + +fn main() { + let mut criterion = Criterion::default() + .configure_from_args() + .sample_size(10) + .warm_up_time(Duration::from_millis(10)); + + secondary_encoding(&mut criterion); + primary_encoding(&mut criterion); + primary_encoding_with_hashing(&mut criterion); + metadata_from_hashes(&mut criterion); + full_pipeline(&mut criterion); + + criterion.final_summary(); +} diff --git a/crates/walrus-core/examples/profile_encoding.rs b/crates/walrus-core/examples/profile_encoding.rs new file mode 100644 index 0000000000..1906b10991 --- /dev/null +++ b/crates/walrus-core/examples/profile_encoding.rs @@ -0,0 +1,211 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +// Allowing `unwrap`s in examples. +#![allow(clippy::unwrap_used)] + +//! Profiling binary for blob encoding. +//! +//! Runs `encode_with_metadata()` on configurable blob sizes with wall-clock timing. +//! Designed to be run under `samply record` or `cargo flamegraph` without criterion overhead. +//! +//! ```bash +//! cargo build --release --example profile_encoding +//! samply record ./target/release/examples/profile_encoding --size 32m +//! ``` + +use std::{alloc::System, num::NonZeroU16, time::Instant}; + +use clap::Parser; +use peakmem_alloc::{PeakMemAlloc, PeakMemAllocTrait}; +use walrus_core::encoding::{BlobEncoder, ReedSolomonEncodingConfig}; +use walrus_test_utils::random_data; +use walrus_utils::size::{format_size, parse_size}; + +#[global_allocator] +static PEAK_ALLOC: PeakMemAlloc = PeakMemAlloc::new(System); + +fn get_peak_rss_bytes() -> usize { + unsafe { + let mut usage: libc::rusage = std::mem::zeroed(); + libc::getrusage(libc::RUSAGE_SELF, &mut usage); + let max_rss = usage.ru_maxrss as usize; + // macOS reports bytes, Linux reports KB + if cfg!(target_os = "macos") { + max_rss + } else { + max_rss * 1024 + } + } +} + +#[derive(Parser)] +#[command(about = "Profile blob encoding pipeline")] +struct Args { + /// Blob size (e.g. 1k, 1m, 32m, 256m, 1g) + #[arg(long, default_value = "32m", value_parser = parse_size)] + size: usize, + + /// Number of shards + #[arg(long, default_value_t = 1000)] + shards: u16, + + /// Number of iterations + #[arg(long, default_value_t = 1)] + iterations: u32, + + /// Number of blobs to encode concurrently (simulates multi-blob uploads) + #[arg(long, default_value_t = 1)] + concurrent_blobs: u32, +} + +fn main() { + let args = Args::parse(); + let config = ReedSolomonEncodingConfig::new(NonZeroU16::new(args.shards).unwrap()); + + if args.concurrent_blobs > 1 { + print!( + "blob_size={} shards={} iterations={} concurrent_blobs={}", + format_size(args.size), + args.shards, + args.iterations, + args.concurrent_blobs + ); + } else { + print!( + "blob_size={} shards={} iterations={}", + format_size(args.size), + args.shards, + args.iterations + ); + } + + let blob = random_data(args.size); + let symbol_size = { + let encoder = config.get_blob_encoder(&blob).unwrap(); + encoder.symbol_usize() + }; + println!("\nsymbol_size={symbol_size}"); + + if args.concurrent_blobs <= 1 { + run_single_blob(&args, &config, &blob); + } else { + run_concurrent_blobs(&args, &config, &blob); + } +} + +fn run_single_blob(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) { + let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap()); + let mut max_peak_heap: usize = 0; + + for i in 0..args.iterations { + let blob_copy = blob.to_vec(); + let encoder = config.get_blob_encoder(&blob_copy).unwrap(); + + PEAK_ALLOC.reset_peak_memory(); + let start = Instant::now(); + let (_sliver_pairs, _metadata) = encoder.encode_with_metadata(); + let elapsed = start.elapsed(); + let peak_heap = PEAK_ALLOC.get_peak_memory(); + let peak_rss = get_peak_rss_bytes(); + + durations.push(elapsed); + max_peak_heap = max_peak_heap.max(peak_heap); + let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0); + let expansion = peak_heap as f64 / args.size as f64; + println!( + " iteration {}: {:.3}s ({:.1} MiB/s) peak_heap={} peak_rss={} expansion={:.1}x", + i + 1, + elapsed.as_secs_f64(), + throughput_mbs, + format_size(peak_heap), + format_size(peak_rss), + expansion + ); + } + + if args.iterations > 1 { + let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum(); + let avg = total / f64::from(args.iterations); + let throughput_mbs = args.size as f64 / avg / (1024.0 * 1024.0); + println!( + "average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}", + format_size(max_peak_heap) + ); + } +} + +fn run_concurrent_blobs(args: &Args, config: &ReedSolomonEncodingConfig, blob: &[u8]) { + let n: usize = args.concurrent_blobs.try_into().unwrap(); + let mut durations = Vec::with_capacity(args.iterations.try_into().unwrap()); + let mut max_peak_heap: usize = 0; + + for i in 0..args.iterations { + // Pre-generate N blob copies and N encoders. + let blob_copies: Vec> = (0..n).map(|_| blob.to_vec()).collect(); + let encoders: Vec> = blob_copies + .iter() + .map(|b| config.get_blob_encoder(b).unwrap()) + .collect(); + + PEAK_ALLOC.reset_peak_memory(); + let start = Instant::now(); + + // Use std::thread::scope to spawn N threads, each encoding one blob. + let per_blob_elapsed: Vec<_> = std::thread::scope(|s| { + let handles: Vec<_> = encoders + .into_iter() + .map(|encoder| { + s.spawn(move || { + let blob_start = Instant::now(); + let (_sliver_pairs, _metadata) = encoder.encode_with_metadata(); + blob_start.elapsed() + }) + }) + .collect(); + handles.into_iter().map(|h| h.join().unwrap()).collect() + }); + + let wall_time = start.elapsed(); + let peak_heap = PEAK_ALLOC.get_peak_memory(); + let peak_rss = get_peak_rss_bytes(); + + durations.push(wall_time); + max_peak_heap = max_peak_heap.max(peak_heap); + + println!( + " iteration {}: {:.3}s total wall time", + i + 1, + wall_time.as_secs_f64() + ); + for (j, elapsed) in per_blob_elapsed.iter().enumerate() { + let throughput_mbs = args.size as f64 / elapsed.as_secs_f64() / (1024.0 * 1024.0); + println!( + " blob {}: {:.3}s ({:.1} MiB/s)", + j + 1, + elapsed.as_secs_f64(), + throughput_mbs + ); + } + let total_data = args.size * n; + let expansion = peak_heap as f64 / total_data as f64; + println!( + " peak_heap={} peak_rss={} expansion={:.1}x (per blob: {})", + format_size(peak_heap), + format_size(peak_rss), + expansion, + format_size(peak_heap / n) + ); + } + + if args.iterations > 1 { + let total: f64 = durations.iter().map(|d| d.as_secs_f64()).sum(); + let avg = total / f64::from(args.iterations); + let total_data = args.size * n; + let throughput_mbs = total_data as f64 / avg / (1024.0 * 1024.0); + println!( + "average: {avg:.3}s ({throughput_mbs:.1} MiB/s) max_peak_heap={}", + format_size(max_peak_heap) + ); + } +} diff --git a/crates/walrus-core/src/merkle.rs b/crates/walrus-core/src/merkle.rs index cd0eef3692..f4bb7162ec 100644 --- a/crates/walrus-core/src/merkle.rs +++ b/crates/walrus-core/src/merkle.rs @@ -310,7 +310,7 @@ where } /// Computes the hash of the provided input to be used as a leaf hash of a Merkle tree. -pub(crate) fn leaf_hash(input: &[u8]) -> Node +pub fn leaf_hash(input: &[u8]) -> Node where T: HashFunction, { diff --git a/crates/walrus-e2e-tests/tests/test_client.rs b/crates/walrus-e2e-tests/tests/test_client.rs index 62dbfebe81..3a224b9188 100644 --- a/crates/walrus-e2e-tests/tests/test_client.rs +++ b/crates/walrus-e2e-tests/tests/test_client.rs @@ -806,7 +806,7 @@ async fn test_store_with_existing_storage_resource( ) }) .collect(); - let encoded_blobs = walrus_sdk::node_client::encode_blobs(unencoded_blobs, None, None)?; + let encoded_blobs = walrus_sdk::node_client::encode_blobs(unencoded_blobs, None, None, 1)?; let encoded_sizes = encoded_blobs .iter() .map(|blob| { diff --git a/crates/walrus-sdk/src/config/communication_config.rs b/crates/walrus-sdk/src/config/communication_config.rs index 0929a8d96c..6659588705 100644 --- a/crates/walrus-sdk/src/config/communication_config.rs +++ b/crates/walrus-sdk/src/config/communication_config.rs @@ -226,6 +226,10 @@ pub struct ClientCommunicationConfig { #[serde(rename = "registration_delay_millis")] #[serde_as(as = "DurationMilliSeconds")] pub registration_delay: Duration, + /// Maximum number of blobs to encode concurrently. Limits peak memory when storing + /// multiple blobs. If `None`, defaults to 1 (sequential encoding). + #[serde(default)] + pub max_concurrent_blob_encodings: Option, /// The maximum total blob size allowed to store if multiple blobs are uploaded. pub max_total_blob_size: usize, /// The configuration for the backoff after committee change is detected. @@ -266,6 +270,7 @@ impl Default for ClientCommunicationConfig { Duration::from_secs(5), Some(5), ), + max_concurrent_blob_encodings: None, sui_client_request_timeout: None, } } @@ -322,6 +327,8 @@ pub struct CommunicationLimits { pub max_data_in_flight: usize, /// Configuration for auto-tuning concurrency during writes. pub auto_tune: DataInFlightAutoTuneConfig, + /// The maximum number of blobs to encode concurrently. + pub max_concurrent_blob_encodings: usize, } impl CommunicationLimits { @@ -345,6 +352,9 @@ impl CommunicationLimits { max_concurrent_status_reads, max_data_in_flight: communication_config.max_data_in_flight, auto_tune: communication_config.data_in_flight_auto_tune.clone(), + max_concurrent_blob_encodings: communication_config + .max_concurrent_blob_encodings + .unwrap_or(1), } } diff --git a/crates/walrus-sdk/src/node_client.rs b/crates/walrus-sdk/src/node_client.rs index 115bcbdd7f..f5fe644bfc 100644 --- a/crates/walrus-sdk/src/node_client.rs +++ b/crates/walrus-sdk/src/node_client.rs @@ -27,7 +27,6 @@ use futures::{ }; use indicatif::MultiProgress; use rand::{RngCore as _, rngs::ThreadRng}; -use rayon::{iter::IntoParallelIterator, prelude::*}; use sui_types::base_types::ObjectID; use tokio::{ sync::{Mutex, Semaphore}, @@ -3415,11 +3414,17 @@ mod internal { let upload_relay_client = store_args.upload_relay_client.clone(); let encoding_event_tx = store_args.encoding_event_tx.clone(); + let max_concurrent_blob_encodings = self + .client() + .await? + .communication_limits + .max_concurrent_blob_encodings; let maybe_encoded_blobs = tokio::task::spawn_blocking(move || { encode_blobs( walrus_store_blobs, upload_relay_client, encoding_event_tx.as_ref(), + max_concurrent_blob_encodings, ) }) .await @@ -3587,6 +3592,7 @@ pub fn encode_blobs( walrus_store_blobs: Vec>, upload_relay_client: Option>, encoding_event_tx: Option<&tokio::sync::mpsc::UnboundedSender>, + max_concurrent_blob_encodings: usize, ) -> ClientResult>> { let total_blobs_count = walrus_store_blobs.len(); if total_blobs_count == 0 { @@ -3607,36 +3613,59 @@ pub fn encode_blobs( let completed_blobs_count = Arc::new(AtomicUsize::new(0)); let show_spinner = encoding_event_tx.is_none(); - // Encode each blob into sliver pairs and metadata. Filters out failed blobs and continue. - let blobs = walrus_store_blobs - .into_par_iter() - .map(|blob| { - let _entered = - tracing::info_span!(parent: parent.clone(), "encode_blobs__par_iter").entered(); - let encoding_config = blob.common.encoding_config; - let encode_fn = |blob: UnencodedBlob| { - encode_blob( - blob, - encoding_config, - multi_pb.as_ref(), - upload_relay_client.clone(), - show_spinner, - ) - }; - let encode_result = blob.map(encode_fn, "encode"); - - if let Some(tx) = encoding_event_tx.as_ref() { - let finished_blobs_count = completed_blobs_count.fetch_add(1, Ordering::SeqCst) + 1; - if let Err(err) = tx.send(EncodingProgressEvent::BlobCompleted { - completed: finished_blobs_count, - total: total_blobs_count, - }) { - tracing::warn!(%err, "failed to send encoding blob completed event"); - } + // Encode each blob into sliver pairs and metadata. Filters out failed blobs and continues. + let encode_one = |blob: WalrusStoreBlobMaybeFinished| { + let _entered = + tracing::info_span!(parent: parent.clone(), "encode_blobs__encode_one").entered(); + let encoding_config = blob.common.encoding_config; + let encode_fn = |blob: UnencodedBlob| { + encode_blob( + blob, + encoding_config, + multi_pb.as_ref(), + upload_relay_client.clone(), + show_spinner, + ) + }; + let encode_result = blob.map(encode_fn, "encode"); + + if let Some(tx) = encoding_event_tx.as_ref() { + let finished_blobs_count = completed_blobs_count.fetch_add(1, Ordering::SeqCst) + 1; + if let Err(err) = tx.send(EncodingProgressEvent::BlobCompleted { + completed: finished_blobs_count, + total: total_blobs_count, + }) { + tracing::warn!(%err, "failed to send encoding blob completed event"); } - encode_result - }) - .collect(); + } + encode_result + }; + + // Use OS threads (not rayon) for outer concurrency to avoid deadlocking the rayon pool, + // since inner encode_with_metadata() also uses rayon. + let max_concurrent = max_concurrent_blob_encodings.max(1); + let blobs = if max_concurrent <= 1 || walrus_store_blobs.len() <= 1 { + walrus_store_blobs.into_iter().map(&encode_one).collect() + } else { + let mut results = Vec::with_capacity(walrus_store_blobs.len()); + let mut remaining = walrus_store_blobs; + while !remaining.is_empty() { + let batch_size = max_concurrent.min(remaining.len()); + let batch: Vec<_> = remaining.drain(..batch_size).collect(); + let batch_results: ClientResult> = std::thread::scope(|s| { + let handles: Vec<_> = batch + .into_iter() + .map(|blob| s.spawn(|| encode_one(blob))) + .collect(); + handles + .into_iter() + .map(|h| h.join().expect("encode thread panicked")) + .collect() + }); + results.extend(batch_results?); + } + Ok(results) + }; if let Some(tx) = encoding_event_tx && let Err(error) = tx.send(EncodingProgressEvent::Finished) diff --git a/crates/walrus-utils/src/lib.rs b/crates/walrus-utils/src/lib.rs index b196120f38..d154168296 100644 --- a/crates/walrus-utils/src/lib.rs +++ b/crates/walrus-utils/src/lib.rs @@ -8,6 +8,8 @@ use std::{env, fs, path::Path}; use anyhow::Context; use serde::de::DeserializeOwned; +pub mod size; + #[cfg(feature = "backoff")] pub mod backoff; diff --git a/crates/walrus-utils/src/size.rs b/crates/walrus-utils/src/size.rs new file mode 100644 index 0000000000..f65fac0ab3 --- /dev/null +++ b/crates/walrus-utils/src/size.rs @@ -0,0 +1,89 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Human-readable size parsing and formatting. + +/// Parses a human-readable size string into bytes. +/// +/// Accepts optional suffixes `k`/`m`/`g` (case-insensitive) for KiB/MiB/GiB. +/// Without a suffix, the value is interpreted as bytes. +/// +/// # Examples +/// +/// ``` +/// use walrus_utils::size::parse_size; +/// +/// assert_eq!(parse_size("1k").unwrap(), 1024); +/// assert_eq!(parse_size("32m").unwrap(), 32 * 1024 * 1024); +/// assert_eq!(parse_size("1G").unwrap(), 1 << 30); +/// assert_eq!(parse_size("4096").unwrap(), 4096); +/// ``` +pub fn parse_size(s: &str) -> Result { + let s = s.to_lowercase(); + let (num, mult) = if let Some(n) = s.strip_suffix('g') { + (n, 1 << 30) + } else if let Some(n) = s.strip_suffix('m') { + (n, 1 << 20) + } else if let Some(n) = s.strip_suffix('k') { + (n, 1 << 10) + } else { + (s.as_str(), 1) + }; + let n: usize = num.parse().map_err(|e| format!("invalid size: {e}"))?; + Ok(n * mult) +} + +/// Formats a byte count as a human-readable size string. +/// +/// Uses binary units (GiB, MiB, KiB) with integer truncation. +/// +/// # Examples +/// +/// ``` +/// use walrus_utils::size::format_size; +/// +/// assert_eq!(format_size(1 << 30), "1GiB"); +/// assert_eq!(format_size(32 * 1024 * 1024), "32MiB"); +/// assert_eq!(format_size(512 * 1024), "512KiB"); +/// assert_eq!(format_size(100), "100B"); +/// ``` +pub fn format_size(bytes: usize) -> String { + if bytes >= 1 << 30 { + format!("{}GiB", bytes >> 30) + } else if bytes >= 1 << 20 { + format!("{}MiB", bytes >> 20) + } else if bytes >= 1 << 10 { + format!("{}KiB", bytes >> 10) + } else { + format!("{bytes}B") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_size() { + assert_eq!(parse_size("0").unwrap(), 0); + assert_eq!(parse_size("100").unwrap(), 100); + assert_eq!(parse_size("1k").unwrap(), 1024); + assert_eq!(parse_size("1K").unwrap(), 1024); + assert_eq!(parse_size("32m").unwrap(), 32 << 20); + assert_eq!(parse_size("32M").unwrap(), 32 << 20); + assert_eq!(parse_size("1g").unwrap(), 1 << 30); + assert_eq!(parse_size("1G").unwrap(), 1 << 30); + assert!(parse_size("abc").is_err()); + assert!(parse_size("m").is_err()); + } + + #[test] + fn test_format_size() { + assert_eq!(format_size(0), "0B"); + assert_eq!(format_size(512), "512B"); + assert_eq!(format_size(1024), "1KiB"); + assert_eq!(format_size(1 << 20), "1MiB"); + assert_eq!(format_size(1 << 30), "1GiB"); + assert_eq!(format_size(3 * (1 << 30)), "3GiB"); + } +}