diff --git a/Cargo.lock b/Cargo.lock index dcee97ebd5..f90fa86953 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13192,8 +13192,10 @@ dependencies = [ "anyhow", "base64 0.22.1", "bcs", + "blake2", "clap", "criterion", + "digest 0.10.7", "enum_dispatch", "fastcrypto 0.1.9 (git+https://github.com/MystenLabs/fastcrypto?rev=4db0e90c732bbf7420ca20de808b698883148d9c)", "hex", @@ -13201,6 +13203,7 @@ dependencies = [ "p256", "peakmem-alloc", "rand 0.8.5", + "rayon", "reed-solomon-simd", "serde", "serde_test", diff --git a/crates/walrus-core/Cargo.toml b/crates/walrus-core/Cargo.toml index dbb96eab22..1771dafc46 100644 --- a/crates/walrus-core/Cargo.toml +++ b/crates/walrus-core/Cargo.toml @@ -13,11 +13,14 @@ test-utils = ["walrus-test-utils"] [dependencies] base64.workspace = true bcs.workspace = true +blake2 = "0.10.6" +digest = "0.10.7" enum_dispatch = { workspace = true } fastcrypto.workspace = true hex.workspace = true p256 = { workspace = true, features = ["pem", "pkcs8"] } rand.workspace = true +rayon.workspace = true reed-solomon-simd.workspace = true serde.workspace = true serde_with.workspace = true diff --git a/crates/walrus-core/benches/encoding_phases.rs b/crates/walrus-core/benches/encoding_phases.rs index 5b63d6e23a..a1c7d0917d 100644 --- a/crates/walrus-core/benches/encoding_phases.rs +++ b/crates/walrus-core/benches/encoding_phases.rs @@ -151,7 +151,7 @@ fn primary_encoding_with_hashing(c: &mut Criterion) { for (col_index, col) in columns.iter().enumerate() { let symbols = enc.encode_all_ref(col).unwrap(); for (row_index, symbol) in symbols.to_symbols().enumerate() { - hashes[n_shards * row_index + col_index] = + hashes[col_index * n_shards + row_index] = leaf_hash::(symbol); } } @@ -188,15 +188,13 @@ fn metadata_from_hashes(c: &mut Criterion) { // Build 2 * n_shards Merkle trees (primary + secondary per sliver pair). for sliver_index in 0..n_shards { let _primary = MerkleTree::::build_from_leaf_hashes( - hashes[n_shards * sliver_index..n_shards * (sliver_index + 1)] - .iter() - .cloned(), + (0..n_shards).map(|col| hashes[col * n_shards + sliver_index].clone()), ); + let sec_col = n_shards - 1 - sliver_index; let _secondary = MerkleTree::::build_from_leaf_hashes( - (0..n_shards).map(|symbol_index| { - hashes[n_shards * symbol_index + n_shards - 1 - sliver_index] - .clone() - }), + hashes[sec_col * n_shards..(sec_col + 1) * n_shards] + .iter() + .cloned(), ); } }); diff --git a/crates/walrus-core/examples/profile_encoding.rs b/crates/walrus-core/examples/profile_encoding.rs index 1906b10991..c712f9b78c 100644 --- a/crates/walrus-core/examples/profile_encoding.rs +++ b/crates/walrus-core/examples/profile_encoding.rs @@ -29,7 +29,7 @@ fn get_peak_rss_bytes() -> usize { unsafe { let mut usage: libc::rusage = std::mem::zeroed(); libc::getrusage(libc::RUSAGE_SELF, &mut usage); - let max_rss = usage.ru_maxrss as usize; + let max_rss = usize::try_from(usage.ru_maxrss).unwrap(); // macOS reports bytes, Linux reports KB if cfg!(target_os = "macos") { max_rss @@ -54,6 +54,10 @@ struct Args { #[arg(long, default_value_t = 1)] iterations: u32, + /// Number of rayon threads (0 = use rayon default, which is num CPUs) + #[arg(long, default_value_t = 0)] + threads: usize, + /// Number of blobs to encode concurrently (simulates multi-blob uploads) #[arg(long, default_value_t = 1)] concurrent_blobs: u32, @@ -61,22 +65,33 @@ struct Args { fn main() { let args = Args::parse(); + + if args.threads > 0 { + rayon::ThreadPoolBuilder::new() + .num_threads(args.threads) + .build_global() + .unwrap(); + } + let thread_count = rayon::current_num_threads(); + let config = ReedSolomonEncodingConfig::new(NonZeroU16::new(args.shards).unwrap()); if args.concurrent_blobs > 1 { print!( - "blob_size={} shards={} iterations={} concurrent_blobs={}", + "blob_size={} shards={} iterations={} threads={} concurrent_blobs={}", format_size(args.size), args.shards, args.iterations, + thread_count, args.concurrent_blobs ); } else { print!( - "blob_size={} shards={} iterations={}", + "blob_size={} shards={} iterations={} threads={}", format_size(args.size), args.shards, - args.iterations + args.iterations, + thread_count ); } diff --git a/crates/walrus-core/src/encoding/blob_encoding.rs b/crates/walrus-core/src/encoding/blob_encoding.rs index ddb11a5138..0d1e2f61dd 100644 --- a/crates/walrus-core/src/encoding/blob_encoding.rs +++ b/crates/walrus-core/src/encoding/blob_encoding.rs @@ -2,9 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use alloc::{collections::BTreeSet, vec, vec::Vec}; -use core::{cmp, marker::PhantomData, num::NonZeroU16, ops::Range, slice::Chunks}; +use core::{cell::RefCell, cmp, marker::PhantomData, num::NonZeroU16, ops::Range, slice::Chunks}; use fastcrypto::hash::Blake2b256; +use rayon::prelude::*; use tracing::{Level, Span}; use super::{ @@ -26,7 +27,7 @@ use crate::{ SliverPairIndex, encoding::{ReedSolomonEncoder, config::EncodingFactory as _}, ensure, - merkle::{MerkleTree, Node, leaf_hash}, + merkle::{MerkleTree, Node, leaf_hash_blake2b256}, metadata::{SliverPairMetadata, VerifiedBlobMetadataWithId}, }; @@ -152,7 +153,8 @@ impl BlobEncoderData { /// Computes the blob metadata from the provided leaf hashes of all symbols. /// /// The provided slice *must* be of length `n_shards * n_shards`, where `n_shards` is the number - /// of shards. The slice is interpreted as a matrix in row-major order. + /// of shards. The slice is interpreted as a matrix in column-major order: + /// `symbol_hashes[col * n_shards + row]`. /// /// # Panics /// @@ -168,25 +170,30 @@ impl BlobEncoderData { let n_shards = config.n_shards_as_usize(); assert_eq!(symbol_hashes.len(), n_shards * n_shards); - let mut metadata = Vec::with_capacity(n_shards); - for sliver_index in 0..n_shards { - let primary_hash = MerkleTree::::build_from_leaf_hashes( - symbol_hashes[n_shards * sliver_index..n_shards * (sliver_index + 1)] - .iter() - .cloned(), - ) - .root(); - let secondary_hash = MerkleTree::::build_from_leaf_hashes( - (0..n_shards).map(|symbol_index| { - symbol_hashes[n_shards * symbol_index + n_shards - 1 - sliver_index].clone() - }), - ) - .root(); - metadata.push(SliverPairMetadata { - primary_hash, - secondary_hash, + let metadata: Vec = (0..n_shards) + .into_par_iter() + .map(|sliver_index| { + // Column-major: primary tree gathers row `sliver_index` across all columns + // (strided access). + let primary_hash = MerkleTree::::build_from_leaf_hashes_fast( + (0..n_shards).map(|col| symbol_hashes[col * n_shards + sliver_index].clone()), + ) + .root(); + // Column-major: secondary tree reads column `n_shards - 1 - sliver_index` + // as a contiguous slice. + let sec_col = n_shards - 1 - sliver_index; + let secondary_hash = MerkleTree::::build_from_leaf_hashes_fast( + symbol_hashes[sec_col * n_shards..(sec_col + 1) * n_shards] + .iter() + .cloned(), + ) + .root(); + SliverPairMetadata { + primary_hash, + secondary_hash, + } }) - } + .collect(); VerifiedBlobMetadataWithId::new_verified_from_metadata( metadata, @@ -334,25 +341,67 @@ impl<'a> BlobEncoder<'a> { self.inner.n_rows.get()..self.inner.config.n_shards().get(), )); - let mut primary_encoder = self.inner.get_encoder::(); - for (col_index, column) in secondary_slivers.iter().enumerate() { - let symbols = primary_encoder - .encode_all_ref(column.symbols.data()) - .expect("size has already been checked"); - for (row_index, symbol) in symbols.to_symbols().enumerate() { - symbol_hashes[n_shards * row_index + col_index] = leaf_hash::(symbol); - } - if col_index < self.inner.n_columns_usize() { - for (symbol, sliver) in symbols - .to_symbols() - .zip(primary_slivers.iter_mut()) - .skip(self.inner.n_rows_usize()) - { - sliver.copy_symbol_to(col_index, symbol); - } + let n_columns = self.inner.n_columns_usize(); + let n_rows = self.inner.n_rows_usize(); + let symbol_usize = self.inner.symbol_usize(); + + // Thread-local encoder pool: reuse ReedSolomonEncoder across rayon tasks within this + // call, avoiding ~1000 FFT table constructions (reduced to ~8-16, one per thread). + std::thread_local! { + static PRIMARY_ENCODER: RefCell> = + const { RefCell::new(None) }; + } + + // Parallel phase: par_chunks_mut on column-major hashes gives each task an exclusive + // &mut [Node] slice, enabling direct writes with no intermediate allocation. + // Only repair symbol data is collected for the sequential scatter below. + let repair_results: Vec<(usize, Vec)> = symbol_hashes + .par_chunks_mut(n_shards) + .zip(secondary_slivers.par_iter()) + .enumerate() + .filter_map(|(col_index, (hash_col, column))| { + PRIMARY_ENCODER.with(|cell| { + let mut opt = cell.borrow_mut(); + let encoder = if opt + .as_ref() + .is_some_and(|e| usize::from(e.symbol_size().get()) == symbol_usize) + { + opt.as_mut().expect("checked above") + } else { + opt.insert(self.inner.get_encoder::()) + }; + let symbols = encoder + .encode_all(column.symbols.data()) + .expect("size has already been checked"); + + // Write hashes directly into the column-major slice. + for (row_index, symbol) in symbols.to_symbols().enumerate() { + hash_col[row_index] = leaf_hash_blake2b256(symbol); + } + + // Collect repair data only for systematic columns. + if col_index < n_columns { + let n_repair = n_shards - n_rows; + let mut data = vec![0u8; n_repair * symbol_usize]; + for (i, symbol) in symbols.to_symbols().skip(n_rows).enumerate() { + data[i * symbol_usize..i * symbol_usize + symbol.len()] + .copy_from_slice(symbol); + } + Some((col_index, data)) + } else { + None + } + }) + }) + .collect(); + + // Sequential scatter for repair symbols only. + for (col_index, data) in repair_results { + for (i, sliver) in primary_slivers.iter_mut().skip(n_rows).enumerate() { + let start = i * symbol_usize; + sliver.copy_symbol_to(col_index, &data[start..start + symbol_usize]); } } - drop(primary_encoder); let sliver_pairs = primary_slivers .into_iter() @@ -410,16 +459,19 @@ impl<'a> BlobEncoder<'a> { u64::try_from(self.blob.len()).expect("any valid blob size fits into a `u64`"); let n_shards = self.inner.n_shards_usize(); - let n_non_systematic_secondary_slivers = n_shards - self.inner.n_columns_usize(); - // Use a dummy sliver index for the non-systematic secondary slivers. - let mut non_systematic_secondary_slivers = - vec![ - self.inner.empty_sliver::(SliverIndex(0)); - n_non_systematic_secondary_slivers - ]; + // Materialize all secondary slivers (systematic + non-systematic). + let mut secondary_slivers = self.inner.empty_slivers::(); - // Compute the non-systematic secondary slivers by encoding the rows (i.e., primary - // slivers). + // Copy systematic columns from blob data. + for (column, sliver) in self.column_symbols().zip(secondary_slivers.iter_mut()) { + sliver + .symbols + .to_symbols_mut() + .zip(column) + .for_each(|(dest, src)| dest[..src.len()].copy_from_slice(src)) + } + + // Compute non-systematic secondary slivers by encoding rows (sequential). let mut secondary_encoder = self.inner.get_encoder::(); let mut buffer = Symbols::zeros(self.inner.n_columns_usize(), self.inner.symbol_size); let row_length_bytes = @@ -436,47 +488,48 @@ impl<'a> BlobEncoder<'a> { let encode_result = secondary_encoder .encode(data) .expect("size has already been checked"); - for (symbol, sliver) in encode_result - .recovery_iter() - .zip(non_systematic_secondary_slivers.iter_mut()) - { + for (symbol, sliver) in encode_result.recovery_iter().zip( + secondary_slivers + .iter_mut() + .skip(self.inner.n_columns_usize()), + ) { sliver.copy_symbol_to(r, symbol); } } drop(secondary_encoder); - // Now we can encode all secondary slivers, computing all symbol hashes. + // Parallel primary encoding + hashing over all secondary slivers. let mut symbol_hashes = vec![Node::Empty; n_shards * n_shards]; - // First encode the systematic secondary slivers. - let mut primary_encoder = self.inner.get_encoder::(); - let mut buffer = Symbols::zeros(self.inner.n_rows_usize(), self.inner.symbol_size); - for (col_index, column_symbols) in self.column_symbols().enumerate() { - buffer.data_mut().fill(0); - buffer - .to_symbols_mut() - .zip(column_symbols) - .for_each(|(dest, src)| dest[..src.len()].copy_from_slice(src)); - let symbols = primary_encoder - .encode_all_ref(buffer.data()) - .expect("size has already been checked"); - for (row_index, symbol) in symbols.to_symbols().enumerate() { - symbol_hashes[n_shards * row_index + col_index] = leaf_hash::(symbol); - } - } + let symbol_usize = self.inner.symbol_usize(); - // Then encode the non-systematic secondary slivers. - for (col_index, column) in non_systematic_secondary_slivers.iter().enumerate() { - let col_index = col_index + self.inner.n_columns_usize(); - let symbols = primary_encoder - .encode_all_ref(column.symbols.data()) - .expect("size has already been checked"); - for (row_index, symbol) in symbols.to_symbols().enumerate() { - symbol_hashes[n_shards * row_index + col_index] = leaf_hash::(symbol); - } + std::thread_local! { + static PRIMARY_ENCODER: RefCell> = + const { RefCell::new(None) }; } - drop(primary_encoder); + symbol_hashes + .par_chunks_mut(n_shards) + .zip(secondary_slivers.par_iter()) + .for_each(|(hash_col, column)| { + PRIMARY_ENCODER.with(|cell| { + let mut opt = cell.borrow_mut(); + let encoder = if opt + .as_ref() + .is_some_and(|e| usize::from(e.symbol_size().get()) == symbol_usize) + { + opt.as_mut().expect("checked above") + } else { + opt.insert(self.inner.get_encoder::()) + }; + let symbols = encoder + .encode_all(column.symbols.data()) + .expect("size has already been checked"); + for (row_index, symbol) in symbols.to_symbols().enumerate() { + hash_col[row_index] = leaf_hash_blake2b256(symbol); + } + }); + }); BlobEncoderData::compute_metadata_from_symbol_hashes( self.inner.config, @@ -720,9 +773,10 @@ impl<'a> ExpandedMessageMatrix<'a> { let n_shards = self.config.n_shards_as_usize(); let mut symbol_hashes = Vec::with_capacity(n_shards * n_shards); - for row in 0..n_shards { - for col in 0..n_shards { - symbol_hashes.push(leaf_hash::(&self.matrix[row][col])); + // Column-major layout: symbol_hashes[col * n_shards + row]. + for col in 0..n_shards { + for row in 0..n_shards { + symbol_hashes.push(leaf_hash_blake2b256(&self.matrix[row][col])); } } diff --git a/crates/walrus-core/src/merkle.rs b/crates/walrus-core/src/merkle.rs index f4bb7162ec..1f63c84768 100644 --- a/crates/walrus-core/src/merkle.rs +++ b/crates/walrus-core/src/merkle.rs @@ -5,6 +5,8 @@ use alloc::{format, vec::Vec}; use core::{fmt::Debug, marker::PhantomData}; +use blake2::Blake2b; +use digest::typenum::U32; use fastcrypto::hash::{Blake2b256, Digest, HashFunction}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -12,6 +14,34 @@ use tracing::Level; use crate::ensure; +type Blake2b256Inner = Blake2b; + +static LEAF_HASHER: std::sync::LazyLock = std::sync::LazyLock::new(|| { + let mut h = ::new(); + digest::Digest::update(&mut h, LEAF_PREFIX); + h +}); + +static INNER_HASHER: std::sync::LazyLock = std::sync::LazyLock::new(|| { + let mut h = ::new(); + digest::Digest::update(&mut h, INNER_PREFIX); + h +}); + +/// Leaf hash using pre-initialized Blake2b state (avoids `new_with_params` per call). +pub(crate) fn leaf_hash_blake2b256(input: &[u8]) -> Node { + let mut h = LEAF_HASHER.clone(); + digest::Digest::update(&mut h, input); + Node::Digest(digest::Digest::finalize(h).into()) +} + +fn inner_hash_blake2b256(left: &Node, right: &Node) -> Node { + let mut h = INNER_HASHER.clone(); + digest::Digest::update(&mut h, left.bytes()); + digest::Digest::update(&mut h, right.bytes()); + Node::Digest(digest::Digest::finalize(h).into()) +} + /// The length of the digests used in the merkle tree. pub const DIGEST_LEN: usize = 32; @@ -224,6 +254,14 @@ where /// Create the [`MerkleTree`] as a commitment to the provided data hashes. pub fn build_from_leaf_hashes(iter: I) -> Self + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + Self::build_tree(iter, inner_hash::) + } + + fn build_tree(iter: I, hash_fn: fn(&Node, &Node) -> Node) -> Self where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -252,7 +290,7 @@ where (prev_level_index..new_level_index) .step_by(2) - .for_each(|index| nodes.push(inner_hash::(&nodes[index], &nodes[index + 1]))); + .for_each(|index| nodes.push(hash_fn(&nodes[index], &nodes[index + 1]))); prev_level_index = new_level_index; level_nodes /= 2; @@ -309,6 +347,17 @@ where } } +impl MerkleTree { + /// Create the [`MerkleTree`] using pre-initialized Blake2b hashers for inner nodes. + pub(crate) fn build_from_leaf_hashes_fast(iter: I) -> Self + where + I: IntoIterator, + I::IntoIter: ExactSizeIterator, + { + Self::build_tree(iter, inner_hash_blake2b256) + } +} + /// Computes the hash of the provided input to be used as a leaf hash of a Merkle tree. pub fn leaf_hash(input: &[u8]) -> Node where