diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index af8cd477d6c..fa1e028a8f5 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3311,28 +3311,32 @@ impl BeaconChain { .await } - /// Process a gossip-verified partial data column by attempting to merge it in the assembler. - /// Returns the merge result which indicates if a column was completed. + /// Process a gossip-verified partial data column by attempting to merge it into the appropriate + /// store for its fork (the Fulu assembler, or the Gloas pending payload cache). Returns the + /// merge result, which indicates whether any column was completed. + /// + /// `verified_header` must be `Some` for Fulu partials (the assembler needs it) and `None` + /// for Gloas partials. #[instrument(skip_all, level = "debug")] pub async fn process_gossip_partial_data_column( self: &Arc, verified_partial: KzgVerifiedPartialDataColumn, - verified_header: GossipVerifiedPartialDataColumnHeader, + verified_header: Option>, slot: Slot, ) -> Result, BlockError> { let block_root = verified_partial.block_root(); let partial = verified_partial.as_data_column(); - let index_str = partial.index.to_string(); + let index_str = partial.index().to_string(); metrics::inc_counter_vec_by( &metrics::BEACON_PARTIAL_MESSAGE_CELLS_RECEIVED_TOTAL, &[index_str.as_str()], - partial.sidecar.column.len() as u64, + partial.sidecar().column().len() as u64, ); // Check if we have custody of this column let sampling_columns = self.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch())); - let verified_partial = if sampling_columns.contains(&partial.index) { + let verified_partial = if sampling_columns.contains(partial.index()) { KzgVerifiedCustodyPartialDataColumn::from_asserted_custody(verified_partial) } else { return Ok(None); @@ -3342,19 +3346,39 @@ impl BeaconChain { return Err(BlockError::DuplicateFullyImported(block_root)); } - let Some(assembler) = self.data_availability_checker.partial_assembler() else { - // Partial messages are apparently not activated - return Ok(None); - }; + let added_cells = verified_partial.as_data_column().sidecar().column().len(); - // Merge the partial into the assembler - let merge_result = assembler - .merge_partials( - block_root, - vec![verified_partial], - verified_header.into_header(), - ) - .ok_or_else(|| BlockError::InternalError("No assembly found for block".to_string()))?; + let merge_result = match verified_header { + Some(header) => { + // Fulu: merge via the partial assembler. + let Some(assembler) = self.data_availability_checker.partial_assembler() else { + // Partial messages are apparently not activated + return Ok(None); + }; + + assembler + .merge_partials(block_root, vec![verified_partial], header.into_header()) + .ok_or_else(|| { + BlockError::InternalError("No assembly found for block".to_string()) + })? + } + None => { + // Gloas: merge directly into the pending payload cache. We don't track updated + // partials there yet, so we hand back the input partial alone as the "updated" + // set; full columns come from the cache merge. + let full_columns = self + .pending_payload_cache + .merge_partial_data_columns(block_root, std::slice::from_ref(&verified_partial)) + .map_err(BlockError::from)?; + + PartialMergeResult { + added_cells, + local_blobs: self.pending_payload_cache.has_local_blobs(&block_root), + full_columns, + updated_partials: vec![verified_partial], + } + } + }; metrics::inc_counter_vec_by( &metrics::BEACON_PARTIAL_MESSAGE_USEFUL_CELLS_TOTAL, diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index cfd8ee7d34a..a1ec386a0af 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -22,7 +22,7 @@ use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn}; use types::{ BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError, - PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize, + PartialDataColumnView, SignedBeaconBlock, Slot, new_non_zero_usize, }; mod error; @@ -194,7 +194,7 @@ impl DataAvailabilityChecker { pub fn missing_cells_for_column_sidecar<'a>( &'_ self, data_column: &'a DataColumnSidecar, - ) -> Result>, MissingCellsError> { + ) -> Result>, MissingCellsError> { let block_root = data_column.block_root(); let column_index = *data_column.index(); @@ -220,8 +220,8 @@ impl DataAvailabilityChecker { if let Some(assembler) = &self.partial_assembler { match assembler.get_partial(&block_root, column_index) { Some(AssemblyColumn::Incomplete(cached_partial)) => { - return data_column.try_filter_to_partial_ref(|idx, cell, proof| { - match cached_partial.as_data_column().sidecar.get(idx) { + return data_column.try_filter_to_partial_view(|idx, cell, proof| { + match cached_partial.as_data_column().sidecar().get(idx) { None => Ok(true), Some((cached_cell, cached_proof)) => { if cell == cached_cell && proof == cached_proof { @@ -244,7 +244,7 @@ impl DataAvailabilityChecker { } } // No cached data, all cells are "missing" (new data we want) - data_column.try_filter_to_partial_ref(|_, _, _| Ok(true)) + data_column.try_filter_to_partial_view(|_, _, _| Ok(true)) } /// Filter out all cells that are already cached for the given `block_root`. @@ -252,9 +252,9 @@ impl DataAvailabilityChecker { pub fn missing_cells_for_partial_column_sidecar<'a>( &'_ self, partial_data_column: &'a PartialDataColumn, - ) -> Result>, MissingCellsError> { - let column_index = partial_data_column.index; - let block_root = partial_data_column.block_root; + ) -> Result>, MissingCellsError> { + let column_index = *partial_data_column.index(); + let block_root = *partial_data_column.block_root(); // Check DA checker cache first - if we have a full column cached, nothing is missing. if self @@ -270,8 +270,8 @@ impl DataAvailabilityChecker { if let Some(assembler) = &self.partial_assembler { match assembler.get_partial(&block_root, column_index) { Some(AssemblyColumn::Incomplete(cached_partial)) => { - return Ok(partial_data_column.sidecar.filter(|idx| { - cached_partial.as_data_column().sidecar.get(idx).is_none() + return Ok(partial_data_column.sidecar().filter(|idx| { + cached_partial.as_data_column().sidecar().get(idx).is_none() })?); } // This can happen if the column has been marked as completed already but has not @@ -285,7 +285,7 @@ impl DataAvailabilityChecker { } } // No cached data, all cells are "missing" (new data we want) - Ok(partial_data_column.sidecar.filter(|_| true)?) + Ok(partial_data_column.sidecar().filter(|_| true)?) } /// Get a blob from the availability cache. diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 71562b376b3..f862a282b66 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -25,13 +25,13 @@ use store::DatabaseBlock; use tracing::{debug, instrument}; use tree_hash::TreeHash; use types::data::{ - ColumnIndex, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnSidecar, - PartialDataColumnSidecarError, + ColumnIndex, PartialDataColumn, PartialDataColumnFulu, PartialDataColumnGloas, + PartialDataColumnHeader, PartialDataColumnSidecarError, PartialDataColumnSidecarFulu, + PartialDataColumnSidecarGloas, }; use types::{ BeaconStateError, ChainSpec, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, - KzgCommitment, PartialDataColumnSidecarRef, SignedBeaconBlockHeader, SignedExecutionPayloadBid, - Slot, + KzgCommitment, PartialDataColumnView, SignedBeaconBlockHeader, SignedExecutionPayloadBid, Slot, }; /// An error occurred while validating a gossip data column. @@ -543,11 +543,11 @@ impl KzgVerifiedPartialDataColumn { } pub fn index(&self) -> ColumnIndex { - self.data.index + *self.data.index() } pub fn block_root(&self) -> Hash256 { - self.data.block_root + *self.data.block_root() } } @@ -754,7 +754,7 @@ impl KzgVerifiedCustodyPartialDataColumn { } pub fn index(&self) -> ColumnIndex { - self.data.index + *self.data.index() } /// Merge two verified partial data columns. @@ -766,41 +766,43 @@ impl KzgVerifiedCustodyPartialDataColumn { /// If both columns contain the same cell, the cell from `self` is used - however, as they are /// KZG verified, they will be the same. pub fn merge(&self, other: &Self) -> Result { - let self_sidecar = &self.data.sidecar; - let other_sidecar = &other.data.sidecar; + let self_sidecar = self.data.sidecar(); + let other_sidecar = other.data.sidecar(); // Check that each sidecar is internally consistent by checking the lengths. self_sidecar.verify_len()?; other_sidecar.verify_len()?; - if self.data.block_root != other.data.block_root || self.data.index != other.data.index { + if self.data.block_root() != other.data.block_root() + || self.data.index() != other.data.index() + { return Err(PartialDataColumnSidecarError::ConflictingData); } - if self_sidecar.cells_present_bitmap.len() != other_sidecar.cells_present_bitmap.len() { + if self_sidecar.cells_present_bitmap().len() != other_sidecar.cells_present_bitmap().len() { return Err(PartialDataColumnSidecarError::DifferingLengths { - lhs_len: self_sidecar.cells_present_bitmap.len(), - rhs_len: other_sidecar.cells_present_bitmap.len(), + lhs_len: self_sidecar.cells_present_bitmap().len(), + rhs_len: other_sidecar.cells_present_bitmap().len(), }); } let new_bitmap = self_sidecar - .cells_present_bitmap - .union(&other_sidecar.cells_present_bitmap); + .cells_present_bitmap() + .union(other_sidecar.cells_present_bitmap()); let len = new_bitmap.num_set_bits(); let mut new_column = Vec::with_capacity(len); let mut new_proofs = Vec::with_capacity(len); let mut self_iter = self_sidecar - .column + .column() .iter() - .zip(self_sidecar.kzg_proofs.iter()); + .zip(self_sidecar.kzg_proofs().iter()); let mut other_iter = other_sidecar - .column + .column() .iter() - .zip(other_sidecar.kzg_proofs.iter()); + .zip(other_sidecar.kzg_proofs().iter()); for presence_bits in self_sidecar - .cells_present_bitmap + .cells_present_bitmap() .iter() - .zip(other_sidecar.cells_present_bitmap.iter()) + .zip(other_sidecar.cells_present_bitmap().iter()) { match presence_bits { (false, false) => {} @@ -826,32 +828,59 @@ impl KzgVerifiedCustodyPartialDataColumn { } } - Ok(Self { - data: Arc::new(PartialDataColumn { - block_root: self.data.block_root, - index: self.data.index, - sidecar: PartialDataColumnSidecar { - cells_present_bitmap: new_bitmap, - column: new_column - .try_into() - .map_err(|_| PartialDataColumnSidecarError::UnexpectedBounds)?, - kzg_proofs: new_proofs - .try_into() - .map_err(|_| PartialDataColumnSidecarError::UnexpectedBounds)?, - header: if self_sidecar.header.is_some() { - self_sidecar.header.clone() - } else { - other_sidecar.header.clone() + let new_column = new_column + .try_into() + .map_err(|_| PartialDataColumnSidecarError::UnexpectedBounds)?; + let new_proofs = new_proofs + .try_into() + .map_err(|_| PartialDataColumnSidecarError::UnexpectedBounds)?; + + let block_root = *self.data.block_root(); + let index = *self.data.index(); + let data = match (&*self.data, &*other.data) { + (PartialDataColumn::Fulu(self_fulu), PartialDataColumn::Fulu(other_fulu)) => { + let header = if self_fulu.sidecar.header.is_some() { + self_fulu.sidecar.header.clone() + } else { + other_fulu.sidecar.header.clone() + }; + PartialDataColumnFulu { + block_root, + index, + sidecar: PartialDataColumnSidecarFulu { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + header, }, - }, - }), + } + .into() + } + (PartialDataColumn::Gloas(self_gloas), PartialDataColumn::Gloas(_)) => { + PartialDataColumnGloas { + block_root, + slot: self_gloas.slot, + index, + sidecar: PartialDataColumnSidecarGloas { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + }, + } + .into() + } + _ => return Err(PartialDataColumnSidecarError::ConflictingData), + }; + + Ok(Self { + data: Arc::new(data), latest_cell_timestamp: self.latest_cell_timestamp.max(other.latest_cell_timestamp), }) } pub fn try_clone_full( &self, - header: &PartialDataColumnHeader, + header: Option<&PartialDataColumnHeader>, ) -> Option> { self.data .try_clone_full(header) @@ -866,7 +895,7 @@ impl KzgVerifiedCustodyPartialDataColumn { /// May clone the column if the Arc cannot be unwrapped. pub fn try_into_full( self, - header: &PartialDataColumnHeader, + header: Option<&PartialDataColumnHeader>, ) -> Option> { match Arc::try_unwrap(self.data) { Ok(data) => data.try_into_full(header), @@ -885,7 +914,7 @@ impl KzgVerifiedCustodyPartialDataColumn { #[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column( data_column: Arc>, - cells_to_verify: PartialDataColumnSidecarRef, + cells_to_verify: PartialDataColumnView, kzg: &Kzg, seen_timestamp: Duration, ) -> Result, (Option, KzgError)> { @@ -910,7 +939,7 @@ pub fn verify_kzg_for_data_column( #[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_data_column_with_commitments( data_column: Arc>, - cells_to_verify: PartialDataColumnSidecarRef, + cells_to_verify: PartialDataColumnView, kzg_commitments: &[KzgCommitment], kzg: &Kzg, seen_timestamp: Duration, @@ -933,16 +962,16 @@ pub fn verify_kzg_for_data_column_with_commitments( #[instrument(skip_all, level = "debug")] pub fn verify_kzg_for_partial_data_column( data_column: Arc>, - cells_to_verify: PartialDataColumnSidecarRef, - header: &GossipVerifiedPartialDataColumnHeader, + cells_to_verify: PartialDataColumnView, + kzg_commitments: &[KzgCommitment], kzg: &Kzg, seen_timestamp: Duration, ) -> Result, GossipPartialDataColumnError> { let _timer = metrics::start_timer(&metrics::KZG_VERIFICATION_DATA_COLUMN_SINGLE_TIMES); validate_partial_data_columns( kzg, - iter::once((data_column.index, cells_to_verify)), - header.header.kzg_commitments.as_ref(), + iter::once((*data_column.index(), cells_to_verify)), + kzg_commitments, ) .map_err(|(_, e)| GossipDataColumnError::InvalidKzgProof(e))?; Ok(KzgVerifiedPartialDataColumn { @@ -1130,7 +1159,26 @@ pub fn validate_data_column_sidecar_for_gossip_gloas< #[instrument(skip_all, level = "debug")] pub fn validate_partial_data_column_sidecar_for_gossip( - mut column: Box>, + column: Box>, + chain: &BeaconChain, + seen_timestamp: Duration, +) -> PartialColumnVerificationResult { + match *column { + PartialDataColumn::Fulu(fulu) => validate_partial_data_column_sidecar_for_gossip_fulu( + Box::new(fulu), + chain, + seen_timestamp, + ), + PartialDataColumn::Gloas(gloas) => validate_partial_data_column_sidecar_for_gossip_gloas( + Box::new(gloas), + chain, + seen_timestamp, + ), + } +} + +fn validate_partial_data_column_sidecar_for_gossip_fulu( + mut column: Box>, chain: &BeaconChain, seen_timestamp: Duration, ) -> PartialColumnVerificationResult { @@ -1156,7 +1204,7 @@ pub fn validate_partial_data_column_sidecar_for_gossip( // There is no header, so we check if we have a cached one to use let Some(header) = assembler - .get_header(&column.block_root) + .get_header(&block_root) .map(GossipVerifiedPartialDataColumnHeader::new_from_cached) else { return PartialColumnVerificationResult::Err( @@ -1175,31 +1223,96 @@ pub fn validate_partial_data_column_sidecar_for_gossip( header }; - // The number of cells nad proofs must match the population count of the bitmap. - let bitmap_popcount = column.sidecar.cells_present_bitmap.num_set_bits(); - let cells_len = column.sidecar.column.len(); - let proofs_len = column.sidecar.kzg_proofs.len(); + let slot = header.as_header().slot(); + let column: PartialDataColumn = (*column).into(); + let kzg_commitments = header.as_header().kzg_commitments.clone(); + match validate_partial_data_column_kzg( + Box::new(column), + kzg_commitments.as_ref(), + chain, + seen_timestamp, + ) { + Ok(column) => PartialColumnVerificationResult::Ok { + column, + slot, + verified_header: Some(header), + }, + Err(err) => PartialColumnVerificationResult::ErrWithValidHeader { err, header }, + } +} + +fn validate_partial_data_column_sidecar_for_gossip_gloas( + column: Box>, + chain: &BeaconChain, + seen_timestamp: Duration, +) -> PartialColumnVerificationResult { + let block_root = column.block_root; + let slot = column.slot; + + // The bid carries the commitments. It is gossip-verified and inserted into the pending + // payload cache on its own path — we don't cache anything from this code path. + let bid = match load_gloas_payload_bid(block_root, chain) { + Ok(Some(bid)) => bid, + Ok(None) => { + return PartialColumnVerificationResult::Err( + GossipPartialDataColumnError::MissingHeader, + ); + } + Err(e) => { + return PartialColumnVerificationResult::Err(GossipPartialDataColumnError::from(e)); + } + }; + + // A Gloas partial column has no header to take, so it must always carry at least one cell. + if column.sidecar.column.is_empty() { + return PartialColumnVerificationResult::Err(GossipPartialDataColumnError::EmptyMessage); + } + + let column: PartialDataColumn = (*column).into(); + match validate_partial_data_column_kzg( + Box::new(column), + bid.message.blob_kzg_commitments.as_ref(), + chain, + seen_timestamp, + ) { + Ok(column) => PartialColumnVerificationResult::Ok { + column, + slot, + verified_header: None, + }, + Err(err) => PartialColumnVerificationResult::Err(err), + } +} + +/// Shared structural + KZG checks for partial data columns, agnostic to which fork the +/// commitments came from (Fulu header vs Gloas bid). +fn validate_partial_data_column_kzg( + column: Box>, + kzg_commitments: &[KzgCommitment], + chain: &BeaconChain, + seen_timestamp: Duration, +) -> Result, GossipPartialDataColumnError> { + // The number of cells and proofs must match the population count of the bitmap. + let bitmap_popcount = column.sidecar().cells_present_bitmap().num_set_bits(); + let cells_len = column.sidecar().column().len(); + let proofs_len = column.sidecar().kzg_proofs().len(); if bitmap_popcount != cells_len || bitmap_popcount != proofs_len { - return PartialColumnVerificationResult::ErrWithValidHeader { - err: GossipPartialDataColumnError::InconsistentPresentCount { - bitmap_popcount, - cells_len, - proofs_len, - }, - header, - }; + return Err(GossipPartialDataColumnError::InconsistentPresentCount { + bitmap_popcount, + cells_len, + proofs_len, + }); } - let bitmap_len = column.sidecar.cells_present_bitmap.len(); - let commitments_len = header.as_header().kzg_commitments.len(); + let bitmap_len = column.sidecar().cells_present_bitmap().len(); + let commitments_len = kzg_commitments.len(); if bitmap_len != commitments_len { - return PartialColumnVerificationResult::ErrWithValidHeader { - err: GossipPartialDataColumnError::InconsistentCommitmentsLength { + return Err( + GossipPartialDataColumnError::InconsistentCommitmentsLength { bitmap_len, commitments_len, }, - header, - }; + ); } let column = Arc::from(column); @@ -1209,53 +1322,45 @@ pub fn validate_partial_data_column_sidecar_for_gossip( { Ok(Some(cells_to_kzg_verify)) => cells_to_kzg_verify, Ok(None) => { - return PartialColumnVerificationResult::ErrWithValidHeader { - err: GossipDataColumnError::PriorKnownUnpublished.into(), - header, - }; + return Err(GossipDataColumnError::PriorKnownUnpublished.into()); } Err(MissingCellsError::MismatchesCachedColumn) => { - return PartialColumnVerificationResult::ErrWithValidHeader { - err: GossipDataColumnError::MismatchesCachedColumn.into(), - header, - }; + return Err(GossipDataColumnError::MismatchesCachedColumn.into()); } Err(MissingCellsError::UnexpectedError(e)) => todo!("handle unexpected error {:?}", e), }; // We do not have to check block related data here, as we create the verifiable column from // gossip accepted block - let kzg = &chain.kzg; - let column = match verify_kzg_for_partial_data_column( + verify_kzg_for_partial_data_column( column.clone(), cells_to_kzg_verify, - &header, - kzg, + kzg_commitments, + &chain.kzg, seen_timestamp, - ) { - Ok(column) => column, - Err(err) => { - return PartialColumnVerificationResult::ErrWithValidHeader { err, header }; - } - }; - - PartialColumnVerificationResult::Ok { column, header } + ) } -/// The result of a `validate_partial_data_column_sidecar_for_gossip` call. Any headers returned -/// herein were cached during this call or previously cached. +/// The result of a `validate_partial_data_column_sidecar_for_gossip` call. The `verified_header` +/// is only set for Fulu, where the header travels with the partial column on gossip and may be +/// newly cached during this call. For Gloas, the bid that backs the verification arrives on its +/// own gossip path and is never cached as part of partial-column processing — there is nothing +/// to return. pub enum PartialColumnVerificationResult { /// Verification succeeded fully. Ok { column: KzgVerifiedPartialDataColumn, - header: GossipVerifiedPartialDataColumnHeader, + slot: Slot, + verified_header: Option>, }, - /// Verification of the column failed, but the header is valid. + /// Verification of the column failed, but the Fulu header is valid. Gloas has no equivalent + /// variant because the bid that would correspond to the header is not produced as a result + /// of this verification. ErrWithValidHeader { err: GossipPartialDataColumnError, header: GossipVerifiedPartialDataColumnHeader, }, - /// Verification of the column or header failed, and no valid header was cached previously. + /// Verification of the column or its commitments-source failed. Err(GossipPartialDataColumnError), } @@ -1365,7 +1470,7 @@ pub(crate) fn load_gloas_payload_bid( fn missing_cells_for_column_sidecar<'a, T: BeaconChainTypes>( chain: &'_ BeaconChain, data_column: &'a DataColumnSidecar, -) -> Result>, GossipDataColumnError> { +) -> Result>, GossipDataColumnError> { let result = if chain .spec .fork_name_at_slot::(data_column.slot()) @@ -1655,8 +1760,8 @@ mod test { use std::time::UNIX_EPOCH; use types::{ Cell, CellBitmap, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSubnetId, EthSpec, - ForkName, MainnetEthSpec, PartialDataColumn, PartialDataColumnHeader, - PartialDataColumnSidecar, + ForkName, MainnetEthSpec, PartialDataColumn, PartialDataColumnFulu, + PartialDataColumnHeader, PartialDataColumnSidecarFulu, }; type E = MainnetEthSpec; @@ -1869,16 +1974,17 @@ mod test { BitList::<::MaxBlobCommitmentsPerBlock>::with_capacity(num_commitments) .unwrap(); - let column = PartialDataColumn { + let column: PartialDataColumn = PartialDataColumnFulu { block_root, index: 0, - sidecar: PartialDataColumnSidecar { + sidecar: PartialDataColumnSidecarFulu { cells_present_bitmap: empty_bitmap, column: vec![].try_into().unwrap(), kzg_proofs: vec![].try_into().unwrap(), header: None.into(), }, - }; + } + .into(); let result = validate_partial_data_column_sidecar_for_gossip( Box::new(column), @@ -1909,10 +2015,10 @@ mod test { .unwrap(); bitmap.set(0, true).unwrap(); - let column = PartialDataColumn { + let column: PartialDataColumn = PartialDataColumnFulu { block_root, index: 0, - sidecar: PartialDataColumnSidecar { + sidecar: PartialDataColumnSidecarFulu { cells_present_bitmap: bitmap, column: vec![types::Cell::::default()].try_into().unwrap(), // Provide 2 proofs but only 1 cell ← mismatch with popcount=1 @@ -1921,7 +2027,8 @@ mod test { .unwrap(), header: None.into(), }, - }; + } + .into(); let result = validate_partial_data_column_sidecar_for_gossip( Box::new(column), @@ -1951,16 +2058,17 @@ mod test { BitList::<::MaxBlobCommitmentsPerBlock>::with_capacity(3).unwrap(); bitmap.set(0, true).unwrap(); - let column = PartialDataColumn { + let column: PartialDataColumn = PartialDataColumnFulu { block_root, index: 0, - sidecar: PartialDataColumnSidecar { + sidecar: PartialDataColumnSidecarFulu { cells_present_bitmap: bitmap, column: vec![types::Cell::::default()].try_into().unwrap(), kzg_proofs: vec![types::KzgProof::empty()].try_into().unwrap(), header: None.into(), }, - }; + } + .into(); let result = validate_partial_data_column_sidecar_for_gossip( Box::new(column), @@ -2081,16 +2189,19 @@ mod test { .unwrap(); KzgVerifiedCustodyPartialDataColumn { - data: Arc::new(PartialDataColumn { - block_root: Default::default(), - index: 0, - sidecar: PartialDataColumnSidecar { - cells_present_bitmap: bitmap, - column, - kzg_proofs: proofs, - header: None.into(), - }, - }), + data: Arc::new( + PartialDataColumnFulu { + block_root: Default::default(), + index: 0, + sidecar: PartialDataColumnSidecarFulu { + cells_present_bitmap: bitmap, + column, + kzg_proofs: proofs, + header: None.into(), + }, + } + .into(), + ), latest_cell_timestamp: Default::default(), } } @@ -2107,12 +2218,12 @@ mod test { let a = make_partial(6, &[0, 2]); let b = make_partial(6, &[1, 3]); let merged = a.merge(&b).unwrap(); - assert_eq!(merged.data.sidecar.column.len(), 4); - assert_eq!(merged.data.sidecar.kzg_proofs.len(), 4); + assert_eq!(merged.data.sidecar().column().len(), 4); + assert_eq!(merged.data.sidecar().kzg_proofs().len(), 4); for i in 0..4 { - assert!(merged.data.sidecar.cells_present_bitmap.get(i).unwrap()); + assert!(merged.data.sidecar().cells_present_bitmap().get(i).unwrap()); } - assert!(!merged.data.sidecar.cells_present_bitmap.get(4).unwrap()); + assert!(!merged.data.sidecar().cells_present_bitmap().get(4).unwrap()); } #[test] @@ -2120,10 +2231,10 @@ mod test { let a = make_partial_with_marker(4, &[0, 1], 0); let b = make_partial_with_marker(4, &[1, 2], 100); let merged = a.merge(&b).unwrap(); - assert_eq!(merged.data.sidecar.column.len(), 3); + assert_eq!(merged.data.sidecar().column().len(), 3); // Cell at bitmap index 1 is the second cell in the merged column. // It should come from `a` (marker_base=0, so marker=0+1=1), not `b` (marker=100+1=101). - assert_eq!(merged.data.sidecar.column[1][0], 1); + assert_eq!(merged.data.sidecar().column()[1][0], 1); } #[test] @@ -2131,10 +2242,10 @@ mod test { let a = make_partial(4, &[0, 2]); let b = make_partial(4, &[]); let merged = a.merge(&b).unwrap(); - assert_eq!(merged.data.sidecar.column.len(), 2); + assert_eq!(merged.data.sidecar().column().len(), 2); assert_eq!( - merged.data.sidecar.cells_present_bitmap, - a.data.sidecar.cells_present_bitmap + merged.data.sidecar().cells_present_bitmap(), + a.data.sidecar().cells_present_bitmap() ); } } diff --git a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs index f5ba647fce8..e54ae017636 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/fetch_blobs_beacon_adapter.rs @@ -1,6 +1,7 @@ use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError}; use crate::observed_data_sidecars::ObservationKey; use crate::partial_data_column_assembler::PartialDataColumnAssembler; +use crate::pending_payload_cache::PendingPayloadCache; use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes}; use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2, BlobAndProofV3}; use kzg::Kzg; @@ -43,6 +44,10 @@ impl FetchBlobsBeaconAdapter { .cloned() } + pub(crate) fn pending_payload_cache(&self) -> &Arc> { + &self.chain.pending_payload_cache + } + pub(crate) async fn get_blobs_v1( &self, versioned_hashes: Vec, diff --git a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs index 351e35666a9..fcdd8250325 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/mod.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/mod.rs @@ -34,7 +34,7 @@ use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_h use std::sync::Arc; use tracing::{debug, instrument, warn}; use types::data::{BlobSidecarError, ColumnIndex, DataColumnSidecarError, PartialDataColumnHeader}; -use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, VersionedHash}; +use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, PartialHeaderOrBid, VersionedHash}; /// Result from engine get blobs to be passed onto `DataAvailabilityChecker` and published to the /// gossip network. The blobs / data columns have not been marked as observed yet, as they may not @@ -68,14 +68,14 @@ pub enum FetchEngineBlobError { pub async fn fetch_and_process_engine_blobs( chain: Arc>, block_root: Hash256, - header: Arc>, + header_or_bid: PartialHeaderOrBid, custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { fetch_and_process_engine_blobs_inner( FetchBlobsBeaconAdapter::new(chain), block_root, - header, + header_or_bid, custody_columns, publish_fn, ) @@ -87,12 +87,12 @@ pub async fn fetch_and_process_engine_blobs( async fn fetch_and_process_engine_blobs_inner( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, - header: Arc>, + header_or_bid: PartialHeaderOrBid, custody_columns: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { - let versioned_hashes = header - .kzg_commitments + let versioned_hashes = header_or_bid + .kzg_commitments() .iter() .map(kzg_commitment_to_versioned_hash) .collect::>(); @@ -108,26 +108,27 @@ async fn fetch_and_process_engine_blobs_inner( if chain_adapter .spec() - .is_peer_das_enabled_for_epoch(header.slot().epoch(T::EthSpec::slots_per_epoch())) + .is_peer_das_enabled_for_epoch(header_or_bid.slot().epoch(T::EthSpec::slots_per_epoch())) { fetch_and_process_blobs_v2_or_v3( chain_adapter, block_root, - header, + header_or_bid, versioned_hashes, custody_columns, publish_fn, ) .await } else { - fetch_and_process_blobs_v1( + todo!("ridiculous") + /*fetch_and_process_blobs_v1( chain_adapter, block_root, &header, versioned_hashes, publish_fn, ) - .await + .await*/ } } @@ -222,13 +223,13 @@ async fn fetch_and_process_blobs_v1( async fn fetch_and_process_blobs_v2_or_v3( chain_adapter: FetchBlobsBeaconAdapter, block_root: Hash256, - header: Arc>, + header_or_bid: PartialHeaderOrBid, versioned_hashes: Vec, custody_columns_indices: &[ColumnIndex], publish_fn: impl Fn(EngineGetBlobsOutput) + Send + 'static, ) -> Result, FetchEngineBlobError> { let num_expected_blobs = versioned_hashes.len(); - let slot = header.slot(); + let slot = header_or_bid.slot(); metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64); @@ -315,7 +316,7 @@ async fn fetch_and_process_blobs_v2_or_v3( let custody_columns_to_import = compute_custody_columns_to_import( &chain_adapter, block_root, - &header, + &header_or_bid, blobs_and_proofs, custody_columns_indices, ) @@ -329,26 +330,40 @@ async fn fetch_and_process_blobs_v2_or_v3( return Ok(None); } - let full_columns = match chain_adapter.partial_assembler() { - Some(assembler) => { - // Initialize the partial assembler with the columns from the engine and return any full - // columns for publishing - assembler - .merge_partials(block_root, custody_columns_to_import, header) - .ok_or_else(|| { - FetchEngineBlobError::InternalError( - "Failed to merge partials into assembler".to_string(), - ) + let full_columns = match &header_or_bid { + PartialHeaderOrBid::PartialHeader(header) => match chain_adapter.partial_assembler() { + Some(assembler) => { + // Initialize the partial assembler with the columns from the engine and return any + // full columns for publishing. + assembler + .merge_partials(block_root, custody_columns_to_import, header.clone()) + .ok_or_else(|| { + FetchEngineBlobError::InternalError( + "Failed to merge partials into assembler".to_string(), + ) + })? + .full_columns + } + None => { + // Partial columns are disabled, so let's try to directly convert the columns we got + // from the EL into full columns. + custody_columns_to_import + .into_iter() + .filter_map(|col| col.try_into_full(Some(header))) + .collect() + } + }, + PartialHeaderOrBid::Bid(_bid) => { + // Merge partials into the pending payload cache and return any full columns for + // publishing. + chain_adapter + .pending_payload_cache() + .merge_partial_data_columns(block_root, &custody_columns_to_import) + .map_err(|e| { + FetchEngineBlobError::InternalError(format!( + "Failed to merge partials into pending payload cache: {e:?}" + )) })? - .full_columns - } - None => { - // Partial columns are disabled, so let's try to directly convert the columns we got - // from the EL into full columns. - custody_columns_to_import - .into_iter() - .filter_map(|col| col.try_into_full(&header)) - .collect() } }; @@ -380,7 +395,7 @@ async fn fetch_and_process_blobs_v2_or_v3( async fn compute_custody_columns_to_import( chain_adapter: &Arc>, block_root: Hash256, - header: &PartialDataColumnHeader, + header_or_bid: &PartialHeaderOrBid, blobs_and_proofs: Vec>, custody_columns_indices: &[ColumnIndex], ) -> Result>, FetchEngineBlobError> { @@ -388,7 +403,7 @@ async fn compute_custody_columns_to_import( let spec = chain_adapter.spec().clone(); let chain_adapter_cloned = chain_adapter.clone(); let custody_columns_indices = custody_columns_indices.to_vec(); - let header = header.clone(); + let header_or_bid = header_or_bid.clone(); chain_adapter .executor() .spawn_blocking_handle( @@ -406,9 +421,14 @@ async fn compute_custody_columns_to_import( .map(|BlobAndProofV2 { blob, proofs }| (blob, proofs.as_ref())) }) .collect::>(); - let data_columns_result = - blobs_to_partial_data_columns(blob_and_proof_refs, &header, &kzg, &spec) - .discard_timer_on_break(&mut timer); + let data_columns_result = blobs_to_partial_data_columns( + block_root, + blob_and_proof_refs, + &header_or_bid, + &kzg, + &spec, + ) + .discard_timer_on_break(&mut timer); drop(timer); // This filtering ensures we only import and publish the custody columns. @@ -418,7 +438,7 @@ async fn compute_custody_columns_to_import( .map(|data_columns| { data_columns .into_iter() - .filter(|col| custody_columns_indices.contains(&col.index)) + .filter(|col| custody_columns_indices.contains(&col.index())) .map(|col| { KzgVerifiedCustodyPartialDataColumn::from_asserted_custody( KzgVerifiedPartialDataColumn::from_execution_verified( @@ -431,8 +451,15 @@ async fn compute_custody_columns_to_import( .map_err(FetchEngineBlobError::DataColumnSidecarError)?; // Only consider columns that are not already observed on gossip. - let observation_key = - ObservationKey::from_partial_column_header(&header, block_root, &spec); + let observation_key = match &header_or_bid { + PartialHeaderOrBid::PartialHeader(header) => ObservationKey::new_proposer_key( + header.signed_block_header.message.proposer_index, + header.slot(), + ), + PartialHeaderOrBid::Bid(bid) => { + ObservationKey::new_block_root_key(block_root, bid.message.slot) + } + }; if let Some(observed_columns) = chain_adapter_cloned.data_column_known_for_observation_key(observation_key) @@ -444,8 +471,8 @@ async fn compute_custody_columns_to_import( } // Only consider columns that are not already known to data availability. - if let Some(known_columns) = - chain_adapter_cloned.cached_data_column_indexes(&block_root, header.slot()) + if let Some(known_columns) = chain_adapter_cloned + .cached_data_column_indexes(&block_root, header_or_bid.slot()) { custody_columns.retain(|col| !known_columns.contains(&col.index())); if custody_columns.is_empty() { diff --git a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs index 37d40f3a270..6f613666da1 100644 --- a/beacon_node/beacon_chain/src/fetch_blobs/tests.rs +++ b/beacon_node/beacon_chain/src/fetch_blobs/tests.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, Mutex}; use task_executor::test_utils::TestRuntime; use types::{ BeaconBlock, BeaconBlockFulu, EmptyBlock, EthSpec, ForkName, Hash256, MainnetEthSpec, - SignedBeaconBlock, SignedBeaconBlockFulu, + PartialHeaderOrBid, SignedBeaconBlock, SignedBeaconBlockFulu, }; type E = MainnetEthSpec; @@ -43,7 +43,7 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new((&block).try_into().unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new((&block).try_into().unwrap())), &custody_columns, publish_fn, ) @@ -68,7 +68,9 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -96,7 +98,9 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -129,7 +133,9 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -168,7 +174,9 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -210,7 +218,9 @@ mod get_blobs_v2 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -278,7 +288,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(&block_no_blobs).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(&block_no_blobs).unwrap(), + )), &custody_columns, publish_fn, ) @@ -305,7 +317,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -351,7 +365,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -391,7 +407,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -439,7 +457,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) @@ -483,7 +503,9 @@ mod get_blobs_v1 { let processing_status = fetch_and_process_engine_blobs_inner( mock_adapter, block_root, - Arc::new(PartialDataColumnHeader::try_from(block.as_ref()).unwrap()), + PartialHeaderOrBid::PartialHeader(Arc::new( + PartialDataColumnHeader::try_from(block.as_ref()).unwrap(), + )), &custody_columns, publish_fn, ) diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index bc803efe932..fab80c0ade7 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -8,13 +8,13 @@ use tracing::instrument; use tree_hash::TreeHash; use types::data::{ Cell, CellBitmap, ColumnIndex, DataColumn, DataColumnSidecarError, PartialDataColumn, - PartialDataColumnHeader, PartialDataColumnSidecarRef, + PartialDataColumnHeader, PartialDataColumnView, }; use types::kzg_ext::KzgCommitments; use types::{ Blob, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas, DataColumnSidecarList, EthSpec, Hash256, KzgCommitment, KzgProof, - SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlindedBeaconBlock, Slot, + PartialHeaderOrBid, SignedBeaconBlock, SignedBeaconBlockHeader, SignedBlindedBeaconBlock, Slot, }; /// Converts a blob ssz FixedVector to a reference to a fixed-size array @@ -167,7 +167,7 @@ pub fn validate_data_columns_with_commitments<'a, E: EthSpec>( /// Partial columns may have missing cells, indicated by a bitmap. We only verify present cells. pub fn validate_partial_data_columns<'a, E: EthSpec>( kzg: &Kzg, - data_column_iter: impl Iterator)>, + data_column_iter: impl Iterator)>, kzg_commitments: &[KzgCommitment], ) -> Result<(), (Option, KzgError)> { let mut cells = Vec::new(); @@ -176,14 +176,14 @@ pub fn validate_partial_data_columns<'a, E: EthSpec>( let mut commitments = Vec::new(); for (col_index, sidecar) in data_column_iter { - if sidecar.column.is_empty() { + if sidecar.column().is_empty() { return Err((Some(col_index), KzgError::KzgVerificationFailed)); } // Partial columns have a bitmap indicating present cells // We iterate over the bitmap and only process present cells - let mut present_iterator = sidecar.column.iter().zip(sidecar.kzg_proofs.iter()); - for (present, commitment) in sidecar.cells_present_bitmap.iter().zip(kzg_commitments) { + let mut present_iterator = sidecar.column().iter().zip(sidecar.kzg_proofs().iter()); + for (present, commitment) in sidecar.cells_present_bitmap().iter().zip(kzg_commitments) { if present { let (cell, proof) = present_iterator.next().ok_or(( Some(col_index), @@ -379,8 +379,9 @@ pub fn blobs_to_data_column_sidecars_gloas( /// Build data column sidecars from a signed beacon block and its blobs. #[instrument(skip_all, level = "debug", fields(blob_count = blobs_and_proofs.len()))] pub fn blobs_to_partial_data_columns( + block_root: Hash256, blobs_and_proofs: Vec, &[KzgProof])>>, - header: &PartialDataColumnHeader, + header_or_bid: &PartialHeaderOrBid, kzg: &Kzg, spec: &ChainSpec, ) -> Result>, DataColumnSidecarError> { @@ -412,8 +413,19 @@ pub fn blobs_to_partial_data_columns( }) .collect::, KzgError>>()?; - build_partial_data_columns(header, blob_cells_and_proofs_vec, spec) - .map_err(DataColumnSidecarError::BuildSidecarFailed) + match header_or_bid { + PartialHeaderOrBid::PartialHeader(header) => { + build_partial_data_columns_fulu(header, blob_cells_and_proofs_vec, spec) + .map_err(DataColumnSidecarError::BuildSidecarFailed) + } + PartialHeaderOrBid::Bid(bid) => build_partial_data_columns_gloas( + block_root, + bid.message.slot, + blob_cells_and_proofs_vec, + spec, + ) + .map_err(DataColumnSidecarError::BuildSidecarFailed), + } } pub fn compute_cells(blobs: &[&Blob], kzg: &Kzg) -> Result, KzgError> { @@ -570,14 +582,94 @@ pub(crate) fn build_data_column_sidecars_gloas( sidecars } -pub(crate) fn build_partial_data_columns( +pub(crate) fn build_partial_data_columns_fulu( header: &PartialDataColumnHeader, blob_cells_and_proofs_vec: Vec>, spec: &ChainSpec, ) -> Result>, String> { + if spec.fork_name_at_slot::(header.slot()).gloas_enabled() { + return Err("Attempting to construct Fulu partial data columns post-Gloas".to_owned()); + } + let number_of_columns = E::number_of_columns(); let max_blobs_per_block = spec.max_blobs_per_block(header.slot().epoch(E::slots_per_epoch())) as usize; + let (bitmap, columns, column_kzg_proofs) = build_partial_column_cells::( + blob_cells_and_proofs_vec, + number_of_columns, + max_blobs_per_block, + )?; + + let block_root = header.signed_block_header.message.canonical_root(); + + columns + .into_iter() + .zip(column_kzg_proofs) + .enumerate() + .map(|(index, (col, proofs))| { + Ok(types::data::PartialDataColumnFulu { + block_root, + index: index as u64, + sidecar: types::data::PartialDataColumnSidecarFulu { + cells_present_bitmap: bitmap.clone(), + column: VariableList::try_from(col) + .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, + kzg_proofs: VariableList::try_from(proofs) + .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, + header: None.into(), + }, + } + .into()) + }) + .collect() +} + +pub(crate) fn build_partial_data_columns_gloas( + beacon_block_root: Hash256, + slot: Slot, + blob_cells_and_proofs_vec: Vec>, + spec: &ChainSpec, +) -> Result>, String> { + if !spec.fork_name_at_slot::(slot).gloas_enabled() { + return Err("Attempting to construct Gloas partial data columns pre-Gloas".to_owned()); + } + + let number_of_columns = E::number_of_columns(); + let max_blobs_per_block = spec.max_blobs_per_block(slot.epoch(E::slots_per_epoch())) as usize; + let (bitmap, columns, column_kzg_proofs) = build_partial_column_cells::( + blob_cells_and_proofs_vec, + number_of_columns, + max_blobs_per_block, + )?; + + columns + .into_iter() + .zip(column_kzg_proofs) + .enumerate() + .map(|(index, (col, proofs))| { + Ok(types::data::PartialDataColumnGloas { + block_root: beacon_block_root, + slot, + index: index as u64, + sidecar: types::data::PartialDataColumnSidecarGloas { + cells_present_bitmap: bitmap.clone(), + column: VariableList::try_from(col) + .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, + kzg_proofs: VariableList::try_from(proofs) + .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, + }, + } + .into()) + }) + .collect() +} + +#[allow(clippy::type_complexity)] +fn build_partial_column_cells( + blob_cells_and_proofs_vec: Vec>, + number_of_columns: usize, + max_blobs_per_block: usize, +) -> Result<(CellBitmap, Vec>>, Vec>), String> { let mut bitmap = CellBitmap::::with_capacity(blob_cells_and_proofs_vec.len()).map_err(|_| { format!( @@ -625,30 +717,7 @@ pub(crate) fn build_partial_data_columns( } } - let block_root = header.signed_block_header.message.canonical_root(); - - let sidecars: Result>, String> = columns - .into_iter() - .zip(column_kzg_proofs) - .enumerate() - .map(|(index, (col, proofs))| { - let column = PartialDataColumn { - block_root, - index: index as u64, - sidecar: types::data::PartialDataColumnSidecar { - cells_present_bitmap: bitmap.clone(), - column: VariableList::try_from(col) - .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, - kzg_proofs: VariableList::try_from(proofs) - .map_err(|e| format!("MaxBlobCommitmentsPerBlock exceeded: {e:?}"))?, - header: None.into(), - }, - }; - Ok(column) - }) - .collect(); - - sidecars + Ok((bitmap, columns, column_kzg_proofs)) } // TODO(gloas) blob reconstruction will fail post gloas. We should just return `Blob`s diff --git a/beacon_node/beacon_chain/src/partial_data_column_assembler.rs b/beacon_node/beacon_chain/src/partial_data_column_assembler.rs index 0ce754c8a0b..99452921b57 100644 --- a/beacon_node/beacon_chain/src/partial_data_column_assembler.rs +++ b/beacon_node/beacon_chain/src/partial_data_column_assembler.rs @@ -91,7 +91,7 @@ impl PartialDataColumnAssembler { for partial in partials { let partial_column = partial.as_data_column(); - let column_index = partial_column.index; + let column_index = *partial_column.index(); let merged = if let Some(existing) = assembly.columns.get(&column_index) { let AssemblyColumn::Incomplete(existing) = existing else { @@ -100,7 +100,7 @@ impl PartialDataColumnAssembler { }; let column = existing.as_data_column(); - let old_len = column.sidecar.column.len(); + let old_len = column.sidecar().column().len(); // Merge with existing partial let merged = match existing.merge(&partial) { @@ -113,8 +113,8 @@ impl PartialDataColumnAssembler { let adding_cells = merged .as_data_column() - .sidecar - .column + .sidecar() + .column() .len() .saturating_sub(old_len); @@ -126,13 +126,13 @@ impl PartialDataColumnAssembler { merged } else { - added_cells += partial_column.sidecar.column.len(); + added_cells += partial_column.sidecar().column().len(); // First time seeing this column index for this block partial }; // Check if merged column is now complete by trying to convert into full - let column = if let Some(full_column) = merged.try_clone_full(&header) { + let column = if let Some(full_column) = merged.try_clone_full(Some(&header)) { full_columns.push(full_column.clone()); AssemblyColumn::Complete(full_column) } else { @@ -279,7 +279,7 @@ mod tests { use types::core::{EthSpec, Hash256, MinimalEthSpec, Slot}; use types::data::{ Cell, CellBitmap, DataColumnSidecar, DataColumnSidecarFulu, PartialDataColumn, - PartialDataColumnSidecar, + PartialDataColumnFulu, PartialDataColumnSidecarFulu, }; type E = MinimalEthSpec; @@ -348,16 +348,17 @@ mod tests { let header = include_header.then(|| make_header(total_blobs)).into(); - let partial = PartialDataColumn { + let partial: PartialDataColumn = PartialDataColumnFulu { block_root, index: column_index, - sidecar: PartialDataColumnSidecar { + sidecar: PartialDataColumnSidecarFulu { cells_present_bitmap: bitmap, column, kzg_proofs: proofs, header, }, - }; + } + .into(); KzgVerifiedCustodyPartialDataColumn::from_asserted_custody( KzgVerifiedPartialDataColumn::__new_for_testing(Arc::new(partial)), ) diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs index 2100a5fe9f7..5d575fc8165 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/mod.rs @@ -26,14 +26,15 @@ use std::sync::Arc; use tracing::{Span, debug, error, instrument}; use types::{ ChainSpec, ColumnIndex, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256, - PartialDataColumnSidecarRef, + PartialDataColumn, PartialDataColumnView, }; mod pending_column; mod pending_components; use crate::data_column_verification::{ - GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedCustodyPartialDataColumn, + KzgVerifiedDataColumn, KzgVerifiedPartialDataColumn, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -151,17 +152,17 @@ impl PendingPayloadCache { pub fn missing_cells_for_column_sidecar<'a>( &'_ self, data_column: &'a DataColumnSidecar, - ) -> Result>, MissingCellsError> { + ) -> Result>, MissingCellsError> { let block_root = data_column.block_root(); let column_index = *data_column.index(); self.peek_pending_components(&block_root, |components| { let Some(cached) = components.and_then(|c| c.verified_data_columns.get(&column_index)) else { - return data_column.try_filter_to_partial_ref(|_, _, _| Ok(true)); + return data_column.try_filter_to_partial_view(|_, _, _| Ok(true)); }; - data_column.try_filter_to_partial_ref(|cell_idx, cell, proof| { + data_column.try_filter_to_partial_view(|cell_idx, cell, proof| { match cached.cell_matches(cell_idx, cell, proof) { None => Ok(true), Some(true) => Ok(false), @@ -264,6 +265,75 @@ impl PendingPayloadCache { self.put_kzg_verified_custody_data_columns(block_root, &custody_columns) } + /// Merge KZG verified partial custody columns into the cache. Returns the columns that became + /// fully populated as a result of this merge. + #[instrument(skip_all, level = "trace")] + pub fn merge_partial_data_columns( + &self, + block_root: Hash256, + kzg_verified_partial_data_columns: &[KzgVerifiedCustodyPartialDataColumn], + ) -> Result>, AvailabilityCheckError> { + let bid = self + .get_bid(&block_root) + .ok_or(AvailabilityCheckError::MissingBid(block_root))?; + + let mut newly_complete = Vec::new(); + let pending_components = + self.update_pending_components(block_root, bid.clone(), |pending_components| { + newly_complete = pending_components + .merge_partial_data_columns(kzg_verified_partial_data_columns); + })?; + + let slot = bid.message.slot; + let full_columns = newly_complete + .into_iter() + .filter_map(|col_idx| { + let col = pending_components.verified_data_columns.get(&col_idx)?; + col.to_full_sidecar(col_idx, slot, block_root) + }) + .map(|data| { + KzgVerifiedCustodyDataColumn::from_asserted_custody( + KzgVerifiedDataColumn::from_execution_verified(data), + ) + }) + .collect(); + + Ok(full_columns) + } + + /// Returns the partial columns currently held for `block_root` so the caller can republish + /// them after a local `getBlobs` fetch, and marks the entry as locally-fetched so subsequent + /// merges don't trigger another republish wave. + /// + /// Returns an empty vec if the block is unknown to the cache. + #[instrument(skip_all, level = "trace")] + pub fn get_partials_and_mark_as_local_fetched( + &self, + block_root: Hash256, + ) -> Vec> { + let mut write_lock = self.availability_cache.write(); + let Some(components) = write_lock.get_mut(&block_root) else { + return Vec::new(); + }; + components + .take_incomplete_partials_and_mark_local() + .into_iter() + .map(|partial| { + let column = Arc::new(PartialDataColumn::Gloas(partial)); + KzgVerifiedCustodyPartialDataColumn::from_asserted_custody( + KzgVerifiedPartialDataColumn::from_execution_verified(column), + ) + }) + .collect() + } + + /// Returns whether `has_local_blobs` is set for `block_root`. + pub fn has_local_blobs(&self, block_root: &Hash256) -> bool { + self.peek_pending_components(block_root, |components| { + components.is_some_and(|c| c.has_local_blobs) + }) + } + /// Insert KZG verified columns into the cache. /// After insertion check if the envelope becomes available. pub fn put_kzg_verified_custody_data_columns( diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs index 890c17ba67e..9ea026d74d5 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_column.rs @@ -1,6 +1,7 @@ use kzg::KzgProof; use ssz_types::VariableList; use std::sync::Arc; +use types::data::{CellBitmap, PartialDataColumnGloas, PartialDataColumnSidecarGloas}; use types::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarGloas, EthSpec, Hash256, Slot}; #[derive(Clone)] @@ -32,6 +33,49 @@ impl PendingColumn { .map(|(c, p)| c == cell && p == proof) } + /// Returns `true` if all cells of this column are present. + pub fn is_complete(&self) -> bool { + self.cells.iter().all(|c| c.is_some()) + } + + /// Build a partial Gloas data column from the cells currently populated. Returns `None` if no + /// cells are present. + pub fn to_partial_gloas( + &self, + index: ColumnIndex, + slot: Slot, + block_root: Hash256, + ) -> Option> { + let total = self.cells.len(); + let mut bitmap = CellBitmap::::with_capacity(total).ok()?; + let mut column = Vec::with_capacity(total); + let mut kzg_proofs = Vec::with_capacity(total); + + for (idx, cell) in self.cells.iter().enumerate() { + let Some((cell, proof)) = cell.as_ref() else { + continue; + }; + bitmap.set(idx, true).ok()?; + column.push(cell.clone()); + kzg_proofs.push(*proof); + } + + if column.is_empty() { + return None; + } + + Some(PartialDataColumnGloas { + block_root, + slot, + index, + sidecar: PartialDataColumnSidecarGloas { + cells_present_bitmap: bitmap, + column: VariableList::try_from(column).ok()?, + kzg_proofs: VariableList::try_from(kzg_proofs).ok()?, + }, + }) + } + /// Returns a full `DataColumnSidecar` if all cells are present, or `None` if any are missing. pub fn to_full_sidecar( &self, diff --git a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs index e7b9009577a..378f86b2f04 100644 --- a/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs +++ b/beacon_node/beacon_chain/src/pending_payload_cache/pending_components.rs @@ -1,5 +1,7 @@ use crate::data_availability_checker::AvailabilityCheckError; -use crate::data_column_verification::KzgVerifiedCustodyDataColumn; +use crate::data_column_verification::{ + KzgVerifiedCustodyDataColumn, KzgVerifiedCustodyPartialDataColumn, +}; use crate::payload_envelope_verification::AvailabilityPendingExecutedEnvelope; use crate::payload_envelope_verification::AvailableEnvelope; use crate::payload_envelope_verification::AvailableExecutedEnvelope; @@ -9,6 +11,7 @@ use std::collections::HashMap; use std::sync::Arc; use tracing::{Span, debug, debug_span}; use types::DataColumnSidecar; +use types::data::PartialDataColumnGloas; use types::{ColumnIndex, EthSpec, Hash256, SignedExecutionPayloadBid}; /// This represents the components of a payload pending data availability. @@ -23,6 +26,9 @@ pub struct PendingComponents { /// A column entry in this map may only have some cells filled in (i.e. a partial data column) pub verified_data_columns: HashMap>, pub reconstruction_started: bool, + /// Set once we have fetched the blobs locally (via `getBlobs` from the EL). Suppresses + /// republishing partials that would race with the local fetch. + pub has_local_blobs: bool, pub(crate) span: Span, } @@ -80,7 +86,51 @@ impl PendingComponents { } } - // TODO(gloas): merge partial columns + /// Merges a given set of partial data columns into the cache. Returns the indices of columns + /// that became fully populated as a result of this merge. + pub(crate) fn merge_partial_data_columns( + &mut self, + kzg_verified_partial_data_columns: &[KzgVerifiedCustodyPartialDataColumn], + ) -> Vec { + let num_blobs_expected = self.num_blobs_expected(); + let mut newly_complete = Vec::new(); + for partial in kzg_verified_partial_data_columns { + let partial = partial.as_data_column(); + let col_index = *partial.index(); + let sidecar = partial.sidecar(); + let bitmap = sidecar.cells_present_bitmap(); + let column = sidecar.column(); + let proofs = sidecar.kzg_proofs(); + + let was_complete = self + .verified_data_columns + .get(&col_index) + .is_some_and(PendingColumn::is_complete); + + let col = self + .verified_data_columns + .entry(col_index) + .or_insert_with(|| PendingColumn::new_with_capacity(num_blobs_expected)); + + let mut storage_idx = 0; + for (blob_idx, present) in bitmap.iter().enumerate() { + if !present { + continue; + } + let (Some(cell), Some(proof)) = (column.get(storage_idx), proofs.get(storage_idx)) + else { + break; + }; + col.insert(blob_idx, cell, proof); + storage_idx += 1; + } + + if !was_complete && col.is_complete() { + newly_complete.push(col_index); + } + } + newly_complete + } /// Inserts an executed payload envelope into the cache. pub fn insert_executed_payload_envelope( @@ -156,10 +206,26 @@ impl PendingComponents { envelope: None, verified_data_columns: HashMap::new(), reconstruction_started: false, + has_local_blobs: false, span, } } + /// Returns the incomplete partial columns currently in the cache, suitable for re-publishing. + /// Sets `has_local_blobs` so subsequent merges won't trigger another republish wave. + pub(crate) fn take_incomplete_partials_and_mark_local( + &mut self, + ) -> Vec> { + self.has_local_blobs = true; + let slot = self.bid.message.slot; + let block_root = self.block_root; + self.verified_data_columns + .iter() + .filter(|(_, col)| !col.is_complete()) + .filter_map(|(idx, col)| col.to_partial_gloas(*idx, slot, block_root)) + .collect() + } + pub fn status_str(&self, num_expected_columns: usize) -> String { format!( "envelope {}, data_columns {}/{}", diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index e96c86b17f0..c4f0c245e80 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -15,7 +15,7 @@ use eth2::types::{ }; use execution_layer::{ProvenancedPayload, SubmitBlindedBlockResponse}; use futures::TryFutureExt; -use lighthouse_network::PubsubMessage; +use lighthouse_network::{PubsubMessage, PubsubPartialMessage}; use logging::crit; use network::NetworkMessage; use rand::prelude::SliceRandom; @@ -30,8 +30,8 @@ use tracing::{Span, debug, debug_span, error, field, info, instrument, warn}; use tree_hash::TreeHash; use types::{ AbstractExecPayload, BeaconBlockRef, BlobSidecar, BlobsList, BlockImportSource, - DataColumnSidecar, DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, - FullPayload, FullPayloadBellatrix, Hash256, KzgProofs, SignedBeaconBlock, + DataColumnSubnetId, EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, + FullPayloadBellatrix, Hash256, KzgProofs, PartialDataColumn, SignedBeaconBlock, SignedBlindedBeaconBlock, }; use warp::{Rejection, Reply, reply::Response}; @@ -517,18 +517,26 @@ pub(crate) fn publish_column_sidecars( debug!(indices = ?dropped_indices, "Dropping data columns from publishing"); } let mut full_messages = Vec::new(); - let mut partial_columns = Vec::new(); - let mut partial_header = None; + let mut partial_messages = Vec::new(); for data_col in data_column_sidecars { - if chain.config.enable_partial_columns - && let DataColumnSidecar::Fulu(fulu_data_col) = data_col.as_ref() - { - let mut partial = fulu_data_col.to_partial(); - if let Some(header) = partial.sidecar.header.take() { - partial_header = Some(header); + if chain.config.enable_partial_columns { + match data_col.to_partial() { + PartialDataColumn::Fulu(mut fulu) => match fulu.sidecar.header.take() { + Some(header) => partial_messages.push(PubsubPartialMessage::DataColumnFulu { + column: Arc::new(fulu), + header: Arc::new(header), + }), + None => { + crit!("Converting from full to partial yielded headerless partial"); + } + }, + PartialDataColumn::Gloas(gloas) => { + partial_messages.push(PubsubPartialMessage::DataColumnGloas { + column: Box::new(gloas), + }); + } } - partial_columns.push(Arc::new(partial)); } let subnet = DataColumnSubnetId::from_column_index(*data_col.index(), &chain.spec); @@ -545,21 +553,14 @@ pub(crate) fn publish_column_sidecars( } // Publish partial messages - if !partial_columns.is_empty() { - if let Some(header) = partial_header { - crate::utils::publish_network_message( - sender_clone, - NetworkMessage::PublishPartialColumns { - columns: partial_columns, - header: Arc::new(header), - }, - ) - .map_err(|_| { - BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)) - })?; - } else { - crit!("Unable to extract header from full columns") - } + if !partial_messages.is_empty() { + crate::utils::publish_network_message( + sender_clone, + NetworkMessage::PublishPartial { + messages: partial_messages, + }, + ) + .map_err(|_| BlockError::BeaconChainError(Box::new(BeaconChainError::UnableToPublish)))?; } Ok(()) diff --git a/beacon_node/lighthouse_network/src/lib.rs b/beacon_node/lighthouse_network/src/lib.rs index fdb6ff095e5..7719ee8540b 100644 --- a/beacon_node/lighthouse_network/src/lib.rs +++ b/beacon_node/lighthouse_network/src/lib.rs @@ -98,8 +98,8 @@ impl std::fmt::Display for ClearDialError<'_> { } pub use crate::types::{ - Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, Subnet, - SubnetDiscovery, decode_partial, + Enr, EnrSyncCommitteeBitfield, GossipTopic, NetworkGlobals, PubsubMessage, + PubsubPartialMessage, Subnet, SubnetDiscovery, decode_partial, }; pub use prometheus_client; diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 41d937e3245..9c2d7fefe8e 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -16,9 +16,9 @@ use crate::rpc::{ }; use crate::service::partial_column_header_tracker::PartialColumnHeaderTracker; use crate::types::{ - GossipEncoding, GossipKind, GossipTopic, OutgoingPartialColumn, SnappyTransform, Subnet, - SubnetDiscovery, all_topics_at_fork, core_topics_to_subscribe, is_fork_non_core_topic, - subnet_from_topic_hash, + GossipEncoding, GossipKind, GossipTopic, OutgoingPartialColumnFulu, OutgoingPartialColumnGloas, + PubsubPartialMessage, SnappyTransform, Subnet, SubnetDiscovery, all_topics_at_fork, + core_topics_to_subscribe, is_fork_non_core_topic, subnet_from_topic_hash, }; use crate::{Enr, NetworkGlobals, PubsubMessage, TopicHash, decode_partial, metrics}; use api_types::{AppRequestId, Response}; @@ -44,7 +44,7 @@ use std::time::Duration; use tracing::{debug, error, info, trace, warn}; use types::{ ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, ForkName, PartialDataColumn, - PartialDataColumnHeader, Slot, SubnetId, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, + PartialDataColumnRef, Slot, SubnetId, consts::altair::SYNC_COMMITTEE_SUBNET_COUNT, }; use utils::{Context as ServiceContext, build_transport, strip_peer_id}; @@ -921,38 +921,40 @@ impl Network { } /// Publishes partial data column sidecars to the gossipsub network. - pub fn publish_partial( - &mut self, - columns: Vec>>, - header: Arc>, - ) { - if !self.network_globals.config.enable_partial_columns { - return; - } - - debug!( - count = columns.len(), - "Sending partial data column sidecars" - ); - - for column in columns { + pub fn publish_partial(&mut self, messages: Vec>) { + for message in messages { + // Determine topic hash, same logic for both variants. + let column: PartialDataColumnRef = match &message { + PubsubPartialMessage::DataColumnFulu { column, .. } => column.as_ref().into(), + PubsubPartialMessage::DataColumnGloas { column } => column.as_ref().into(), + }; let subnet = - DataColumnSubnetId::from_column_index(column.index, &self.fork_context.spec); + DataColumnSubnetId::from_column_index(*column.index(), &self.fork_context.spec); let topic = GossipTopic::new( GossipKind::DataColumnSidecar(subnet), GossipEncoding::default(), self.enr_fork_id.fork_digest, ); - let header_sent_set = self - .partial_column_header_tracker - .get_for_block(column.block_root); - let partial_message = OutgoingPartialColumn::new(column, &header, header_sent_set); let publish_topic: Topic = topic.clone().into(); - if let Err(e) = self - .gossipsub_mut() - .publish_partial(publish_topic, partial_message) - { + let result = match message { + PubsubPartialMessage::DataColumnFulu { column, header } => { + let header_sent_set = self + .partial_column_header_tracker + .get_for_block(column.block_root); + let partial_message = + OutgoingPartialColumnFulu::new(column, &header, header_sent_set); + self.gossipsub_mut() + .publish_partial(publish_topic, partial_message) + } + PubsubPartialMessage::DataColumnGloas { column } => { + let partial_message = OutgoingPartialColumnGloas::new(column); + self.gossipsub_mut() + .publish_partial(publish_topic, partial_message) + } + }; + + if let Err(e) = result { match e { PublishError::NoPeersSubscribedToTopic => { debug!( @@ -1441,10 +1443,10 @@ impl Network { } Ok(column) => { debug!( - block_root = %column.block_root, - index = column.index, + block_root = %column.block_root(), + index = column.index(), %peer_id, - cells_present = %column.sidecar.cells_present_bitmap, + cells_present = %column.sidecar().cells_present_bitmap(), "Decoded partial message" ); // Notify the network diff --git a/beacon_node/lighthouse_network/src/types/mod.rs b/beacon_node/lighthouse_network/src/types/mod.rs index d0173e5b9ac..9a15ab81a12 100644 --- a/beacon_node/lighthouse_network/src/types/mod.rs +++ b/beacon_node/lighthouse_network/src/types/mod.rs @@ -15,8 +15,8 @@ pub type Enr = discv5::enr::Enr; pub use eth2::lighthouse::sync_state::{BackFillState, CustodyBackFillState, SyncState}; pub use globals::NetworkGlobals; pub use partial::HeaderSentSet; -pub use partial::OutgoingPartialColumn; -pub use pubsub::{PubsubMessage, SnappyTransform, decode_partial}; +pub use partial::{OutgoingPartialColumnFulu, OutgoingPartialColumnGloas}; +pub use pubsub::{PubsubMessage, PubsubPartialMessage, SnappyTransform, decode_partial}; pub use subnet::{Subnet, SubnetDiscovery}; pub use topics::{ GossipEncoding, GossipKind, GossipTopic, TopicConfig, all_topics_at_fork, diff --git a/beacon_node/lighthouse_network/src/types/partial.rs b/beacon_node/lighthouse_network/src/types/partial.rs index f25ce9ec36f..29db373d98e 100644 --- a/beacon_node/lighthouse_network/src/types/partial.rs +++ b/beacon_node/lighthouse_network/src/types/partial.rs @@ -9,25 +9,27 @@ use std::sync::Arc; use tracing::{debug, error}; use types::core::{EthSpec, Hash256}; use types::data::{ - CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, - PartialDataColumnSidecar, PartialDataColumnSidecarRef, + CellBitmap, PartialDataColumnFulu, PartialDataColumnHeader, PartialDataColumnPartsMetadata, + PartialDataColumnSidecar, PartialDataColumnViewFulu, }; +use types::{PartialDataColumnGloas, PartialDataColumnGroupId, PartialDataColumnRef}; -const PARTIAL_COLUMNS_VERSION_BYTE: u8 = 0; +pub(crate) const PARTIAL_COLUMNS_VERSION_BYTE_FULU: u8 = 0; +pub(crate) const PARTIAL_COLUMNS_VERSION_BYTE_GLOAS: u8 = 1; pub type HeaderSentSet = Arc>>; #[derive(Debug, Clone)] -pub struct OutgoingPartialColumn { - partial_column: Arc>, +pub struct OutgoingPartialColumnFulu { + partial_column: Arc>, metadata: MaybeKnownMetadata, header_message: Vec, header_sent_set: HeaderSentSet, } -impl OutgoingPartialColumn { +impl OutgoingPartialColumnFulu { pub fn new( - partial_column: Arc>, + partial_column: Arc>, header: &PartialDataColumnHeader, header_sent_set: HeaderSentSet, ) -> Self { @@ -44,7 +46,7 @@ impl OutgoingPartialColumn { } .into(); - let header_message = PartialDataColumnSidecarRef { + let header_message = PartialDataColumnViewFulu { cells_present_bitmap: CellBitmap::::with_capacity( partial_column.sidecar.cells_present_bitmap.len(), ) @@ -55,7 +57,7 @@ impl OutgoingPartialColumn { } .as_ssz_bytes(); - OutgoingPartialColumn { + OutgoingPartialColumnFulu { partial_column, metadata, header_message, @@ -133,8 +135,8 @@ impl Metadata for MaybeKnownMetadata { .map_err(|_| PartialError::InvalidFormat)?; self.do_update(PartialDataColumnPartsMetadata { - available: sidecar.cells_present_bitmap.clone(), - requests: sidecar.cells_present_bitmap, + available: sidecar.cells_present_bitmap().clone(), + requests: sidecar.cells_present_bitmap().clone(), }) .map(|_| ()) } @@ -149,10 +151,10 @@ impl From> for MaybeKnownMetadata< } } -impl Partial for OutgoingPartialColumn { +impl Partial for OutgoingPartialColumnFulu { fn group_id(&self) -> Vec { let mut group_id = Vec::with_capacity(Hash256::len_bytes() + 1); - group_id.push(PARTIAL_COLUMNS_VERSION_BYTE); + group_id.push(PARTIAL_COLUMNS_VERSION_BYTE_FULU); group_id.extend_from_slice(self.partial_column.block_root.as_slice()); group_id } @@ -192,64 +194,131 @@ impl Partial for OutgoingPartialColumn { Some(metadata) => { // The peer is apparently aware of the header, make sure we track that: self.header_sent_set.lock().insert(peer_id); + action_from_present_metadata(peer_id, metadata, self.partial_column.as_ref().into()) + } + } + } +} + +fn action_from_present_metadata( + peer_id: PeerId, + metadata: &[u8], + partial_column: PartialDataColumnRef, +) -> Result { + let peer_metadata = PartialDataColumnPartsMetadata::::from_ssz_bytes(metadata) + .map_err(|_| PartialError::InvalidFormat)?; + let expected_len = partial_column.sidecar().cells_present_bitmap().len(); + if peer_metadata.available.len() != expected_len || peer_metadata.requests.len() != expected_len + { + return Err(PartialError::InvalidFormat); + } + + let need = !peer_metadata + .available + .is_subset(partial_column.sidecar().cells_present_bitmap()); + let want = peer_metadata.requests.difference(&peer_metadata.available); + + let send = partial_column + .sidecar() + .filter(|idx| want.get(idx).expect("Bound checked above")) + .map_err(|err| { + error!(?err, "Unexpected error filtering sidecar"); + PartialError::InvalidFormat + })? + .map(|sidecar| { + debug!( + peer=%peer_id, + group_id=%partial_column.block_root(), + column_index=partial_column.index(), + metadata=%peer_metadata, + sending=%sidecar.cells_present_bitmap(), + "Partial send: Sending" + ); + ( + sidecar.as_ssz_bytes(), + Box::new(MaybeKnownMetadata::::from( + PartialDataColumnPartsMetadata { + available: peer_metadata + .available + .union(sidecar.cells_present_bitmap()), + requests: peer_metadata.requests.union(sidecar.cells_present_bitmap()), + }, + )) as Box, + ) + }); + + if send.is_none() { + debug!( + peer=%peer_id, + group_id=%partial_column.block_root(), + column_index=partial_column.index(), + metadata=%peer_metadata, + "Partial send: Nothing to send" + ); + } + + Ok(PartialAction { need, send }) +} + +#[derive(Debug, Clone)] +pub struct OutgoingPartialColumnGloas { + partial_column: Box>, + group_id: Vec, + metadata: MaybeKnownMetadata, +} + +impl OutgoingPartialColumnGloas { + pub fn new(partial_column: Box>) -> Self { + // For now, always request all cells + let mut requests = partial_column.sidecar.cells_present_bitmap.clone(); + for idx in 0..requests.len() { + requests + .set(idx, true) + .expect("Bound asserted via `len` above"); + } + let metadata = PartialDataColumnPartsMetadata:: { + available: partial_column.sidecar.cells_present_bitmap.clone(), + requests, + } + .into(); + + let group_id = PartialDataColumnGroupId { + slot: partial_column.slot, + beacon_block_root: partial_column.block_root, + }; + let mut encoded_group_id = Vec::with_capacity(group_id.ssz_bytes_len() + 1); + encoded_group_id.push(PARTIAL_COLUMNS_VERSION_BYTE_GLOAS); + group_id.ssz_append(&mut encoded_group_id); + + OutgoingPartialColumnGloas { + partial_column, + group_id: encoded_group_id, + metadata, + } + } +} + +impl Partial for OutgoingPartialColumnGloas { + fn group_id(&self) -> Vec { + self.group_id.clone() + } + + fn metadata(&self) -> Box { + Box::new(self.metadata.clone()) + } - let peer_metadata = PartialDataColumnPartsMetadata::::from_ssz_bytes(metadata) - .map_err(|_| PartialError::InvalidFormat)?; - let expected_len = self.partial_column.sidecar.cells_present_bitmap.len(); - if peer_metadata.available.len() != expected_len - || peer_metadata.requests.len() != expected_len - { - return Err(PartialError::InvalidFormat); - } - - let need = !peer_metadata - .available - .is_subset(&self.partial_column.sidecar.cells_present_bitmap); - let want = peer_metadata.requests.difference(&peer_metadata.available); - - let send = self - .partial_column - .sidecar - .filter(|idx| want.get(idx).expect("Bound checked above")) - .map_err(|err| { - error!(?err, "Unexpected error filtering sidecar"); - PartialError::InvalidFormat - })? - .map(|sidecar| { - debug!( - peer=%peer_id, - group_id=%self.partial_column.block_root, - column_index=self.partial_column.index, - metadata=%peer_metadata, - sending=%sidecar.cells_present_bitmap, - "Partial send: Sending" - ); - ( - sidecar.as_ssz_bytes(), - Box::new(MaybeKnownMetadata::::from( - PartialDataColumnPartsMetadata { - available: peer_metadata - .available - .union(&sidecar.cells_present_bitmap), - requests: peer_metadata - .requests - .union(&sidecar.cells_present_bitmap), - }, - )) as Box, - ) - }); - - if send.is_none() { - debug!( - peer=%peer_id, - group_id=%self.partial_column.block_root, - column_index=self.partial_column.index, - metadata=%peer_metadata, - "Partial send: Nothing to send" - ); - } - - Ok(PartialAction { need, send }) + fn partial_action_from_metadata( + &self, + peer_id: PeerId, + metadata: Option<&[u8]>, + ) -> Result { + match metadata { + None | Some([]) => Ok(PartialAction { + need: false, + send: None, + }), + Some(metadata) => { + action_from_present_metadata(peer_id, metadata, self.partial_column.as_ref().into()) } } } @@ -265,6 +334,8 @@ mod tests { use types::block::{BeaconBlockHeader, SignedBeaconBlockHeader}; use types::core::{MinimalEthSpec, Slot}; use types::data::PartialDataColumnHeader; + use types::data::PartialDataColumnSidecarFulu; + use types::data::PartialDataColumnSidecarGloas; type E = MinimalEthSpec; @@ -300,16 +371,16 @@ mod tests { block_root: Hash256, total_blobs: usize, present_indices: &[usize], - ) -> Arc> { + ) -> Arc> { let mut bitmap = CellBitmap::::with_capacity(total_blobs).unwrap(); for &idx in present_indices { bitmap.set(idx, true).unwrap(); } - Arc::new(PartialDataColumn { + Arc::new(PartialDataColumnFulu { block_root, index: 0, - sidecar: PartialDataColumnSidecar { + sidecar: PartialDataColumnSidecarFulu { cells_present_bitmap: bitmap, column: present_indices .iter() @@ -328,6 +399,39 @@ mod tests { }) } + fn make_partial_column_gloas( + block_root: Hash256, + slot: Slot, + total_blobs: usize, + present_indices: &[usize], + ) -> Box> { + let mut bitmap = CellBitmap::::with_capacity(total_blobs).unwrap(); + for &idx in present_indices { + bitmap.set(idx, true).unwrap(); + } + + Box::new(PartialDataColumnGloas { + block_root, + slot, + index: 0, + sidecar: PartialDataColumnSidecarGloas { + cells_present_bitmap: bitmap, + column: present_indices + .iter() + .map(|&idx| make_cell(idx as u8)) + .collect::>() + .try_into() + .unwrap(), + kzg_proofs: present_indices + .iter() + .map(|_| types::KzgProof::empty()) + .collect::>() + .try_into() + .unwrap(), + }, + }) + } + fn random_peer_id() -> PeerId { let keypair = Keypair::generate_ed25519(); PeerId::from(keypair.public()) @@ -428,7 +532,7 @@ mod tests { let header = make_header(4); let partial = make_partial_column(root, 4, &[0, 1]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let outgoing = OutgoingPartialColumnFulu::new(partial, &header, header_sent_set); let peer = random_peer_id(); @@ -448,7 +552,7 @@ mod tests { // We have cells [0, 2, 3] let partial = make_partial_column(root, 4, &[0, 2, 3]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let outgoing = OutgoingPartialColumnFulu::new(partial, &header, header_sent_set); let peer = random_peer_id(); @@ -480,7 +584,83 @@ mod tests { // We have cells [0] let partial = make_partial_column(root, 4, &[0]); let header_sent_set: HeaderSentSet = Arc::new(Mutex::new(HashSet::new())); - let outgoing = OutgoingPartialColumn::new(partial, &header, header_sent_set); + let outgoing = OutgoingPartialColumnFulu::new(partial, &header, header_sent_set); + + let peer = random_peer_id(); + + // Peer has [0, 1, 2] — cells [1, 2] are unknown to us + let mut peer_available = CellBitmap::::with_capacity(4).unwrap(); + peer_available.set(0, true).unwrap(); + peer_available.set(1, true).unwrap(); + peer_available.set(2, true).unwrap(); + let peer_meta = PartialDataColumnPartsMetadata:: { + available: peer_available.clone(), + requests: peer_available, + }; + let encoded = peer_meta.as_ssz_bytes(); + + let action = outgoing + .partial_action_from_metadata(peer, Some(&encoded)) + .unwrap(); + assert!(action.need); + } + + // -- OutgoingPartialColumnGloas::partial_action_from_metadata tests -- + + #[test] + fn gloas_no_metadata_does_not_send() { + let root = Hash256::repeat_byte(2); + let partial = make_partial_column_gloas(root, Slot::new(7), 4, &[0, 1]); + let outgoing = OutgoingPartialColumnGloas::new(partial); + + let peer = random_peer_id(); + + let action = outgoing.partial_action_from_metadata(peer, None).unwrap(); + assert!(!action.need); + assert!(action.send.is_none()); + + let action_empty = outgoing + .partial_action_from_metadata(peer, Some(&[])) + .unwrap(); + assert!(!action_empty.need); + assert!(action_empty.send.is_none()); + } + + #[test] + fn gloas_metadata_filters_cells_to_send() { + let root = Hash256::repeat_byte(2); + // We have cells [0, 2, 3] + let partial = make_partial_column_gloas(root, Slot::new(7), 4, &[0, 2, 3]); + let outgoing = OutgoingPartialColumnGloas::new(partial); + + let peer = random_peer_id(); + + // Peer has [0, 1], wants [0, 1, 2, 3] + let mut peer_available = CellBitmap::::with_capacity(4).unwrap(); + peer_available.set(0, true).unwrap(); + peer_available.set(1, true).unwrap(); + let mut peer_request = CellBitmap::::with_capacity(4).unwrap(); + for i in 0..4 { + peer_request.set(i, true).unwrap(); + } + let peer_meta = PartialDataColumnPartsMetadata:: { + available: peer_available, + requests: peer_request, + }; + let encoded = peer_meta.as_ssz_bytes(); + + let action = outgoing + .partial_action_from_metadata(peer, Some(&encoded)) + .unwrap(); + assert!(action.send.is_some()); + } + + #[test] + fn gloas_metadata_sets_need_when_peer_has_unknown_cells() { + let root = Hash256::repeat_byte(2); + // We have cells [0] + let partial = make_partial_column_gloas(root, Slot::new(7), 4, &[0]); + let outgoing = OutgoingPartialColumnGloas::new(partial); let peer = random_peer_id(); diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index e5a703ff1e5..04a3b000040 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -1,5 +1,6 @@ //! Handles the encoding and decoding of pubsub messages. +use super::partial::{PARTIAL_COLUMNS_VERSION_BYTE_FULU, PARTIAL_COLUMNS_VERSION_BYTE_GLOAS}; use crate::types::{GossipEncoding, GossipKind, GossipTopic}; use gossipsub::TopicHash; use snap::raw::{Decoder, Encoder, decompress_len}; @@ -10,9 +11,10 @@ use types::{ AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, Hash256, LightClientFinalityUpdate, LightClientOptimisticUpdate, PartialDataColumn, - PartialDataColumnSidecar, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, - SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock, - SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, + PartialDataColumnFulu, PartialDataColumnGloas, PartialDataColumnGroupId, + PartialDataColumnHeader, PartialDataColumnSidecar, PayloadAttestationMessage, ProposerSlashing, + SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, + SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockElectra, SignedBeaconBlockFulu, SignedBeaconBlockGloas, SignedBlsToExecutionChange, SignedContributionAndProof, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, @@ -58,6 +60,17 @@ pub enum PubsubMessage { LightClientOptimisticUpdate(Box>), } +#[derive(Debug, Clone, PartialEq)] +pub enum PubsubPartialMessage { + DataColumnFulu { + column: Arc>, + header: Arc>, + }, + DataColumnGloas { + column: Box>, + }, +} + // Implements the `DataTransform` trait of gossipsub to employ snappy compression pub struct SnappyTransform { /// Sets the maximum size we allow gossipsub messages to decompress to. @@ -464,6 +477,45 @@ impl PubsubMessage { } } +impl PubsubPartialMessage { + /// Returns the kind of gossipsub topic associated with the message. + pub fn kind(&self) -> GossipKind { + let index = match self { + PubsubPartialMessage::DataColumnFulu { column, .. } => column.index, + PubsubPartialMessage::DataColumnGloas { column, .. } => column.index, + }; + // Partial messages are spec'd under the assumption that there is one column per subnet. + GossipKind::DataColumnSidecar(DataColumnSubnetId::new(index)) + } + + /// Build a [`PubsubPartialMessage`] from a [`PartialDataColumn`]. The provided `header` is only + /// used for the Fulu variant — for Gloas the slot and block root live inside the column itself. + pub fn from_partial( + partial: PartialDataColumn, + header: &Arc>, + ) -> Self { + match partial { + PartialDataColumn::Fulu(fulu) => PubsubPartialMessage::DataColumnFulu { + column: Arc::new(fulu), + header: header.clone(), + }, + PartialDataColumn::Gloas(gloas) => PubsubPartialMessage::DataColumnGloas { + column: Box::new(gloas), + }, + } + } + + /// Like [`from_partial`], but accepts an `Arc`. If the `Arc` has a unique + /// reference, the inner column is moved; otherwise it is cloned. + pub fn from_partial_arc( + partial: Arc>, + header: &Arc>, + ) -> Self { + let partial = Arc::try_unwrap(partial).unwrap_or_else(|arc| (*arc).clone()); + Self::from_partial(partial, header) + } +} + /// Decodes incoming partial data column sidecar from gossipsub partial protocol. /// Note: Currently, data columns are the only supported partial messages. In future this could /// return an enum. @@ -474,20 +526,39 @@ pub fn decode_partial( ) -> Result, String> { match topic.kind() { GossipKind::DataColumnSidecar(id) => { - if group.first() != Some(&0) { - return Err(format!("Unknown data column format: {:?}", group.first())); - } - let block_root = Hash256::from_ssz_bytes(&group[1..]) - .map_err(|e| format!("Error decoding group: {:?}", e))?; + // Partial messages are spec'd under the assumption that there is one column per subnet. + let index = **id; + let version = group + .first() + .ok_or_else(|| "Empty partial group id".to_string())?; let sidecar = PartialDataColumnSidecar::from_ssz_bytes(data) .map_err(|e| format!("Error decoding sidecar: {:?}", e))?; - let data_column = PartialDataColumn { - block_root, - // Partial messages are spec'd under the assumption that there is one column per subnet. - index: **id, - sidecar, - }; - Ok(data_column) + match (version, sidecar) { + (&PARTIAL_COLUMNS_VERSION_BYTE_FULU, PartialDataColumnSidecar::Fulu(sidecar)) => { + let block_root = Hash256::from_ssz_bytes(&group[1..]) + .map_err(|e| format!("Error decoding Fulu group: {:?}", e))?; + Ok(PartialDataColumnFulu { + block_root, + index, + sidecar, + } + .into()) + } + (&PARTIAL_COLUMNS_VERSION_BYTE_GLOAS, PartialDataColumnSidecar::Gloas(sidecar)) => { + let group_id = PartialDataColumnGroupId::from_ssz_bytes(&group[1..]) + .map_err(|e| format!("Error decoding Gloas group: {:?}", e))?; + Ok(PartialDataColumnGloas { + block_root: group_id.beacon_block_root, + slot: group_id.slot, + index, + sidecar, + } + .into()) + } + (version, _) => Err(format!( + "Partial data column version/payload mismatch: {version}" + )), + } } other => Err(format!("Partial message unsupported for topic: {other}")), } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index d34668b1387..f8ce0b7443f 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -36,7 +36,7 @@ use beacon_chain::{ use beacon_processor::{Work, WorkEvent}; use lighthouse_network::{ Client, GossipTopic, MessageAcceptance, MessageId, PeerAction, PeerId, PubsubMessage, - ReportSource, + PubsubPartialMessage, ReportSource, }; use logging::crit; use operation_pool::ReceivedPreCapella; @@ -53,11 +53,11 @@ use types::{ Attestation, AttestationData, AttestationRef, AttesterSlashing, BlobSidecar, ColumnIndex, DataColumnSidecar, DataColumnSubnetId, EthSpec, Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, PartialDataColumn, - PartialDataColumnHeader, PayloadAttestationMessage, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, - SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, SignedProposerPreferences, - SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, - block::BlockImportSource, + PartialDataColumnHeader, PartialHeaderOrBid, PayloadAttestationMessage, ProposerSlashing, + SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, + SignedContributionAndProof, SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, + SignedProposerPreferences, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, + SyncCommitteeMessage, SyncSubnetId, block::BlockImportSource, }; use beacon_processor::work_reprocessing_queue::QueuedColumnReconstruction; @@ -838,7 +838,7 @@ impl NetworkBeaconProcessor { parent = None, level = "debug", skip_all, - fields(block_root = ?column.block_root, index = column.index), + fields(block_root = ?column.block_root(), index = column.index()), )] pub async fn process_gossip_partial_data_column_sidecar( self: &Arc, @@ -847,21 +847,28 @@ impl NetworkBeaconProcessor { seen_duration: Duration, topic: GossipTopic, ) { - let block_root = column.block_root; - let index = column.index; + let block_root = *column.block_root(); + let index = *column.index(); let result = self .chain .verify_partial_data_column_sidecar_for_gossip(column, seen_duration); - let header = match result { - PartialColumnVerificationResult::Ok { header, column } => { + // Tracks the slot + (optional) Fulu header so we can: + // - run delay metrics after error handling + // - trigger `getBlobs` if we just learned about a Fulu header + // For Gloas the bid is gossip-validated on its own path (and triggers `getBlobs` there), + // so a partial column never needs to re-trigger it. + let post_processing = match result { + PartialColumnVerificationResult::Ok { + column, + slot, + verified_header, + } => { metrics::inc_counter( &metrics::BEACON_PROCESSOR_GOSSIP_PARTIAL_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL, ); - let slot = header.as_header().slot(); - debug!( %slot, %block_root, @@ -884,15 +891,16 @@ impl NetworkBeaconProcessor { self.process_gossip_verified_partial_data_column( peer_id, column, - header.clone(), + verified_header.clone(), slot, ) .await; - Some(header) + Some((slot, verified_header)) } PartialColumnVerificationResult::ErrWithValidHeader { header, err } => { + let slot = header.as_header().slot(); self.handle_partial_verification_error(peer_id, err, block_root, index, topic); - Some(header) + Some((slot, Some(header))) } PartialColumnVerificationResult::Err(err) => { self.handle_partial_verification_error(peer_id, err, block_root, index, topic); @@ -900,8 +908,7 @@ impl NetworkBeaconProcessor { } }; - if let Some(header) = header { - let slot = header.as_header().slot(); + if let Some((slot, verified_header)) = post_processing { let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock); // Log metrics to track delay from other nodes on the network. metrics::observe_duration( @@ -909,12 +916,18 @@ impl NetworkBeaconProcessor { delay, ); - if !header.was_cached() { + if let Some(header) = verified_header + && !header.was_cached() + { debug!(block = %block_root, "Triggering getBlobs after receiving partial header"); // We want to publish immediately when this finishes let publish_blobs = true; - self.fetch_engine_blobs_and_publish(header.into_header(), block_root, publish_blobs) - .await + self.fetch_engine_blobs_and_publish( + PartialHeaderOrBid::PartialHeader(header.into_header()), + block_root, + publish_blobs, + ) + .await } } } @@ -1368,8 +1381,8 @@ impl NetworkBeaconProcessor { let data_column_slot = verified_data_column.slot(); let data_column_index = verified_data_column.index(); - // TODO(gloas): implement partial messages - if let DataColumnSidecar::Fulu(col) = verified_data_column.as_data_column() + let data_col = verified_data_column.as_data_column(); + if matches!(data_col, DataColumnSidecar::Fulu(_)) && self .chain .data_availability_checker @@ -1381,16 +1394,27 @@ impl NetworkBeaconProcessor { &[&data_column_index.to_string()], ); - let mut column = col.to_partial(); - let header = column.sidecar.header.take(); - if let Some(header) = header { - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns: vec![Arc::new(column)], - header: Arc::new(header), + let message = match data_col.to_partial() { + PartialDataColumn::Fulu(mut fulu) => { + fulu.sidecar + .header + .take() + .map(|header| PubsubPartialMessage::DataColumnFulu { + column: Arc::new(fulu), + header: Arc::new(header), + }) + } + PartialDataColumn::Gloas(gloas) => Some(PubsubPartialMessage::DataColumnGloas { + column: Box::new(gloas), + }), + }; + if let Some(message) = message { + self.send_network_message(NetworkMessage::PublishPartial { + messages: vec![message], }); } else { crit!("Converting from full to partial yielded headerless partial") - }; + } } let result = self @@ -1455,12 +1479,15 @@ impl NetworkBeaconProcessor { } } - /// Process a gossip-verified partial data column by merging it in the assembler + /// Process a gossip-verified partial data column by merging it into the right per-fork store + /// (the Fulu assembler or the Gloas pending payload cache) via `process_gossip_partial_data_column`. + /// + /// `verified_header` is `Some` for Fulu and `None` for Gloas. async fn process_gossip_verified_partial_data_column( self: &Arc, _peer_id: PeerId, verified_partial: KzgVerifiedPartialDataColumn, - verified_header: GossipVerifiedPartialDataColumnHeader, + verified_header: Option>, slot: Slot, ) { let processing_start_time = Instant::now(); @@ -1507,7 +1534,7 @@ impl NetworkBeaconProcessor { .into_iter() .map(|partial| partial.into_inner()) .filter(|partial| { - !only_send_completed_partials || partial.sidecar.is_complete() + !only_send_completed_partials || partial.sidecar().is_complete() }) .collect::>(); @@ -1518,10 +1545,9 @@ impl NetworkBeaconProcessor { "Not publishing incomplete partials before getBlobs" ); } - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns, - header: verified_header.into_header(), - }); + let messages = + build_partial_publish_messages(columns, verified_header.as_ref()); + self.send_network_message(NetworkMessage::PublishPartial { messages }); } Ok(avail) } @@ -1997,9 +2023,17 @@ impl NetworkBeaconProcessor { let current_span = Span::current(); self.executor.spawn( async move { - if let Ok(header) = PartialDataColumnHeader::try_from(block_clone.as_ref()) { + let header_or_bid = + if let Ok(bid) = block_clone.message().body().signed_execution_payload_bid() { + Some(PartialHeaderOrBid::Bid(Arc::new(bid.clone()))) + } else { + PartialDataColumnHeader::try_from(block_clone.as_ref()) + .ok() + .map(|h| PartialHeaderOrBid::PartialHeader(Arc::new(h))) + }; + if let Some(header_or_bid) = header_or_bid { self_clone - .fetch_engine_blobs_and_publish(Arc::new(header), block_root, publish_blobs) + .fetch_engine_blobs_and_publish(header_or_bid, block_root, publish_blobs) .await } } @@ -4267,3 +4301,33 @@ impl NetworkBeaconProcessor { } } } + +/// Build `PubsubPartialMessage`s for republishing partial columns. The Fulu header is attached +/// when present; Gloas columns carry slot/block root inside the column itself, so no header is +/// needed. Columns whose variant does not match the presence of a header are skipped (they +/// should not occur in practice since a single fork drives the merge). +fn build_partial_publish_messages( + columns: Vec>>, + verified_header: Option<&GossipVerifiedPartialDataColumnHeader>, +) -> Vec> { + columns + .into_iter() + .filter_map(|column| match verified_header { + Some(header) => Some(PubsubPartialMessage::from_partial_arc( + column, + &header.clone().into_header(), + )), + None => { + let inner = Arc::try_unwrap(column).unwrap_or_else(|arc| (*arc).clone()); + match inner { + PartialDataColumn::Gloas(gloas) => { + Some(PubsubPartialMessage::DataColumnGloas { + column: Box::new(gloas), + }) + } + PartialDataColumn::Fulu(_) => None, + } + } + }) + .collect() +} diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7bf969db106..9bb976a2583 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -20,7 +20,7 @@ use lighthouse_network::rpc::methods::{ }; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use lighthouse_network::{ - Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage, + Client, GossipTopic, MessageId, NetworkGlobals, PeerId, PubsubMessage, PubsubPartialMessage, rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, }; use rand::prelude::SliceRandom; @@ -936,14 +936,14 @@ impl NetworkBeaconProcessor { pub async fn fetch_engine_blobs_and_publish( self: &Arc, - header: Arc>, + header_or_bid: PartialHeaderOrBid, block_root: Hash256, publish_blobs: bool, ) { if self.chain.config.disable_get_blobs { return; } - let epoch = header.slot().epoch(T::EthSpec::slots_per_epoch()); + let epoch = header_or_bid.slot().epoch(T::EthSpec::slots_per_epoch()); let custody_columns = self.chain.sampling_columns_for_epoch(epoch); let self_cloned = self.clone(); let publish_fn = move |blobs_or_data_column| { @@ -968,7 +968,7 @@ impl NetworkBeaconProcessor { match fetch_and_process_engine_blobs( self.chain.clone(), block_root, - header.clone(), + header_or_bid.clone(), custody_columns, publish_fn, ) @@ -1013,21 +1013,56 @@ impl NetworkBeaconProcessor { } } - // Publish partial columns without eager send - // TODO(gloas): implement publish partial columns without eager send - if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { - let columns = assembler.get_partials_and_mark_as_local_fetched(block_root, &header); - if !columns.is_empty() { - debug!(block = %block_root, "Publishing all partials after getBlobs"); - self.send_network_message(NetworkMessage::PublishPartialColumns { - columns: columns + // Publish the partial columns the relevant cache has accumulated for this block, now that + // we have fetched blobs locally and can mark them as such. Each fork uses a different + // backing store (Fulu: assembler, Gloas: pending payload cache). + match &header_or_bid { + PartialHeaderOrBid::PartialHeader(header) => { + if let Some(assembler) = self.chain.data_availability_checker.partial_assembler() { + let columns = + assembler.get_partials_and_mark_as_local_fetched(block_root, header); + if !columns.is_empty() { + debug!(block = %block_root, "Publishing all partials after getBlobs"); + let messages = columns + .into_iter() + .map(|partial| { + PubsubPartialMessage::from_partial_arc(partial.into_inner(), header) + }) + .collect(); + self.send_network_message(NetworkMessage::PublishPartial { messages }); + } else { + debug!(block = %block_root, "No partials to publish after getBlobs"); + } + } + } + PartialHeaderOrBid::Bid(_) => { + let columns = self + .chain + .pending_payload_cache + .get_partials_and_mark_as_local_fetched(block_root); + if !columns.is_empty() { + debug!(block = %block_root, "Publishing all partials after getBlobs"); + let messages = columns .into_iter() - .map(|partial| partial.into_inner()) - .collect(), - header, - }); - } else { - debug!(block = %block_root, "No partials to publish after getBlobs"); + .filter_map(|partial| { + // We constructed these as Gloas variants above, so the `Fulu` arm + // is unreachable in practice — fail closed and skip it. + let inner = Arc::try_unwrap(partial.into_inner()) + .unwrap_or_else(|arc| (*arc).clone()); + match inner { + PartialDataColumn::Gloas(gloas) => { + Some(PubsubPartialMessage::DataColumnGloas { + column: Box::new(gloas), + }) + } + PartialDataColumn::Fulu(_) => None, + } + }) + .collect(); + self.send_network_message(NetworkMessage::PublishPartial { messages }); + } else { + debug!(block = %block_root, "No partials to publish after getBlobs"); + } } } } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 988a68c9dd2..23f79485066 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -28,7 +28,10 @@ use store::KzgCommitment; use tracing::{debug, debug_span, error, info, instrument, warn}; use types::data::FixedBlobSidecarList; use types::kzg_ext::format_kzg_commitments; -use types::{BlockImportSource, DataColumnSidecarList, Epoch, Hash256}; +use types::{ + BlockImportSource, DataColumnSidecarList, Epoch, Hash256, PartialDataColumnHeader, + PartialHeaderOrBid, +}; /// Id associated to a batch processing request, either a sync batch or a parent lookup. #[derive(Clone, Debug, PartialEq)] @@ -218,14 +221,22 @@ impl NetworkBeaconProcessor { // Block is valid, we can now attempt fetching blobs from EL using version hashes // derived from kzg commitments from the block, without having to wait for all blobs // to be sent from the peers if we already have them. - if let Ok(header) = signed_beacon_block.as_ref().try_into() { + let header_or_bid = if let Ok(bid) = signed_beacon_block + .as_ref() + .message() + .body() + .signed_execution_payload_bid() + { + Some(PartialHeaderOrBid::Bid(Arc::new(bid.clone()))) + } else { + PartialDataColumnHeader::try_from(signed_beacon_block.as_ref()) + .ok() + .map(|h| PartialHeaderOrBid::PartialHeader(Arc::new(h))) + }; + if let Some(header_or_bid) = header_or_bid { let publish_blobs = false; - self.fetch_engine_blobs_and_publish( - Arc::new(header), - block_root, - publish_blobs, - ) - .await; + self.fetch_engine_blobs_and_publish(header_or_bid, block_root, publish_blobs) + .await; } } _ => {} diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ce54ffc38fd..ddcaaf8700f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -17,7 +17,7 @@ use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::rpc::RequestType; use lighthouse_network::rpc::methods::RpcResponse; use lighthouse_network::service::Network; -use lighthouse_network::types::GossipKind; +use lighthouse_network::types::{GossipKind, PubsubPartialMessage}; use lighthouse_network::{ Context, PeerAction, PubsubMessage, ReportSource, Response, Subnet, rpc::{GoodbyeReason, RpcErrorResponse}, @@ -39,8 +39,8 @@ use tokio::time::Sleep; use tracing::{debug, error, info, trace, warn}; use typenum::Unsigned; use types::{ - EthSpec, ForkContext, PartialDataColumn, PartialDataColumnHeader, Slot, SubnetId, - SyncCommitteeSubscription, SyncSubnetId, ValidatorSubscription, + EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, + ValidatorSubscription, }; mod tests; @@ -84,9 +84,8 @@ pub enum NetworkMessage { /// Publish a list of messages to the gossipsub protocol. Publish { messages: Vec> }, /// Publish partial data column sidecars via the partial gossipsub protocol. - PublishPartialColumns { - columns: Vec>>, - header: Arc>, + PublishPartial { + messages: Vec>, }, /// Validates a received gossipsub message. This will propagate the message on the network. ValidationResult { @@ -683,8 +682,16 @@ impl NetworkService { ); self.libp2p.publish(messages); } - NetworkMessage::PublishPartialColumns { columns, header } => { - self.libp2p.publish_partial(columns, header); + NetworkMessage::PublishPartial { messages } => { + if !self.network_globals.config.enable_partial_columns { + return; + } + let mut topic_kinds = HashSet::new(); + for message in &messages { + topic_kinds.insert(message.kind()); + } + debug!(count = messages.len(), "Sending partial messages"); + self.libp2p.publish_partial(messages); } NetworkMessage::ReportPeer { peer_id, diff --git a/consensus/types/src/data/data_column_sidecar.rs b/consensus/types/src/data/data_column_sidecar.rs index 170aa996666..92ee368f55c 100644 --- a/consensus/types/src/data/data_column_sidecar.rs +++ b/consensus/types/src/data/data_column_sidecar.rs @@ -15,12 +15,16 @@ use superstruct::superstruct; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; +use crate::data::partial_data_column_sidecar::{ + PartialDataColumnFulu, PartialDataColumnGloas, PartialDataColumnSidecarFulu, + PartialDataColumnSidecarGloas, PartialDataColumnViewFulu, PartialDataColumnViewGloas, +}; use crate::{ block::{BLOB_KZG_COMMITMENTS_INDEX, BeaconBlockHeader, SignedBeaconBlockHeader}, core::{Epoch, EthSpec, Hash256, Slot}, data::{ - CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnSidecar, - PartialDataColumnSidecarError, PartialDataColumnSidecarRef, + CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnSidecarError, + PartialDataColumnView, }, fork::ForkName, kzg_ext::{KzgCommitments, KzgError}, @@ -142,10 +146,10 @@ impl DataColumnSidecar { /// The header will NOT be set. /// /// Uses the supplied filter to determine which cells to include in the partial sidecar. - pub fn try_filter_to_partial_ref( + pub fn try_filter_to_partial_view( &self, filter: F, - ) -> Result>, Err> + ) -> Result>, Err> where F: Fn(usize, &Cell, &KzgProof) -> Result, Err: From, @@ -173,13 +177,65 @@ impl DataColumnSidecar { return Ok(None); } - Ok(Some(PartialDataColumnSidecarRef { - cells_present_bitmap: new_bitmap, - column: new_column, - kzg_proofs: new_proofs, - header: None.into(), + Ok(Some(match self { + DataColumnSidecar::Fulu(_) => PartialDataColumnView::Fulu(PartialDataColumnViewFulu { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + header: None.into(), + }), + DataColumnSidecar::Gloas(_) => { + PartialDataColumnView::Gloas(PartialDataColumnViewGloas { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + }) + } })) } + + /// Convert this full data column into a verifiable partial data column. + pub fn to_partial(&self) -> PartialDataColumn { + let cell_count = self.column().len(); + let mut bitmap = + CellBitmap::::with_capacity(cell_count).expect("our column has the same bound"); + for idx in 0..cell_count { + bitmap + .set(idx, true) + .expect("The correct size is initialized right above"); + } + + let block_root = self.block_root(); + match self { + DataColumnSidecar::Fulu(fulu) => PartialDataColumn::Fulu(PartialDataColumnFulu { + block_root, + index: fulu.index, + sidecar: PartialDataColumnSidecarFulu { + cells_present_bitmap: bitmap, + column: fulu.column.clone(), + kzg_proofs: fulu.kzg_proofs.clone(), + header: Some(PartialDataColumnHeader { + kzg_commitments: fulu.kzg_commitments.clone(), + signed_block_header: fulu.signed_block_header.clone(), + kzg_commitments_inclusion_proof: fulu + .kzg_commitments_inclusion_proof + .clone(), + }) + .into(), + }, + }), + DataColumnSidecar::Gloas(gloas) => PartialDataColumn::Gloas(PartialDataColumnGloas { + block_root, + slot: gloas.slot, + index: gloas.index, + sidecar: PartialDataColumnSidecarGloas { + cells_present_bitmap: bitmap, + column: gloas.column.clone(), + kzg_proofs: gloas.kzg_proofs.clone(), + }, + }), + } + } } impl DataColumnSidecarFulu { @@ -248,36 +304,6 @@ impl DataColumnSidecarFulu { .as_ssz_bytes() .len() } - - /// Convert this full data column into a verifiable partial data column. - pub fn to_partial(&self) -> PartialDataColumn { - let cell_count = self.column.len(); - let mut bitmap = - CellBitmap::::with_capacity(cell_count).expect("our column has the same bound"); - for idx in 0..cell_count { - bitmap - .set(idx, true) - .expect("The correct size is initialized right above"); - } - - let block_root = self.block_root(); - - PartialDataColumn { - block_root, - index: self.index, - sidecar: PartialDataColumnSidecar { - cells_present_bitmap: bitmap, - column: self.column.clone(), - kzg_proofs: self.kzg_proofs.clone(), - header: Some(PartialDataColumnHeader { - kzg_commitments: self.kzg_commitments.clone(), - signed_block_header: self.signed_block_header.clone(), - kzg_commitments_inclusion_proof: self.kzg_commitments_inclusion_proof.clone(), - }) - .into(), - }, - } - } } impl DataColumnSidecarGloas { diff --git a/consensus/types/src/data/mod.rs b/consensus/types/src/data/mod.rs index 9c7eb426260..5efb4619495 100644 --- a/consensus/types/src/data/mod.rs +++ b/consensus/types/src/data/mod.rs @@ -19,8 +19,11 @@ pub use data_column_sidecar::{ }; pub use data_column_subnet_id::{DataColumnSubnetId, all_data_column_sidecar_subnets_from_spec}; pub use partial_data_column_sidecar::{ - CellBitmap, PartialDataColumn, PartialDataColumnHeader, PartialDataColumnPartsMetadata, - PartialDataColumnSidecar, PartialDataColumnSidecarError, PartialDataColumnSidecarRef, + CellBitmap, PartialDataColumn, PartialDataColumnFulu, PartialDataColumnGloas, + PartialDataColumnGroupId, PartialDataColumnHeader, PartialDataColumnPartsMetadata, + PartialDataColumnRef, PartialDataColumnSidecar, PartialDataColumnSidecarError, + PartialDataColumnSidecarFulu, PartialDataColumnSidecarGloas, PartialDataColumnView, + PartialDataColumnViewFulu, PartialDataColumnViewGloas, PartialHeaderOrBid, }; use crate::core::EthSpec; diff --git a/consensus/types/src/data/partial_data_column_sidecar.rs b/consensus/types/src/data/partial_data_column_sidecar.rs index c0e713b4b81..b0252e839db 100644 --- a/consensus/types/src/data/partial_data_column_sidecar.rs +++ b/consensus/types/src/data/partial_data_column_sidecar.rs @@ -1,7 +1,8 @@ use crate::{ + SignedExecutionPayloadBid, block::{BLOB_KZG_COMMITMENTS_INDEX, SignedBeaconBlock, SignedBeaconBlockHeader}, core::{EthSpec, Hash256, Slot}, - data::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarFulu}, + data::{Cell, ColumnIndex, DataColumnSidecar, DataColumnSidecarFulu, DataColumnSidecarGloas}, execution::AbstractExecPayload, kzg_ext::KzgCommitments, state::BeaconStateError, @@ -13,11 +14,29 @@ use ssz::BitList; use ssz_derive::{Decode, Encode}; use ssz_types::{FixedVector, ListEncodedOption, VariableList}; use std::fmt::Display; +use std::sync::Arc; +use superstruct::superstruct; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; pub type CellBitmap = BitList<::MaxBlobCommitmentsPerBlock>; +#[superstruct( + variants(Fulu, Gloas), + variant_attributes( + derive(Debug, Clone, Encode, Decode, TreeHash, Educe), + educe(PartialEq, Eq, Hash(bound = "E: EthSpec")), + cfg_attr( + feature = "arbitrary", + derive(arbitrary::Arbitrary), + arbitrary(bound = "E: EthSpec") + ), + ), + ref_attributes( + derive(Debug, PartialEq, TreeHash), + tree_hash(enum_behaviour = "transparent") + ) +)] #[cfg_attr( feature = "arbitrary", derive(arbitrary::Arbitrary), @@ -25,22 +44,32 @@ pub type CellBitmap = BitList<::MaxBlobCommitmentsPerBlock>; )] #[derive(Debug, Clone, Encode, Decode, TreeHash, Educe)] #[educe(PartialEq, Eq, Hash(bound = "E: EthSpec"))] +#[tree_hash(enum_behaviour = "transparent")] +#[ssz(enum_behaviour = "transparent")] pub struct PartialDataColumnSidecar { pub cells_present_bitmap: CellBitmap, pub column: VariableList, E::MaxBlobCommitmentsPerBlock>, pub kzg_proofs: VariableList, + #[superstruct(only(Fulu))] pub header: ListEncodedOption>, } /// Equivalent to `PartialDataColumnSidecar`, but containing references to the cells. This is done /// so that we can get a part of a sidecar without expensively cloning all the contents. +#[superstruct( + variants(Fulu, Gloas), + variant_attributes(derive(Debug, Clone, Encode),), + ref_attributes(derive(Debug),) +)] #[derive(Debug, Clone, Encode)] -pub struct PartialDataColumnSidecarRef<'a, E: EthSpec> { +#[ssz(enum_behaviour = "transparent")] +pub struct PartialDataColumnView<'a, E: EthSpec> { pub cells_present_bitmap: CellBitmap, // It is fine to use `Vec` here as we never decode directly into this type, and only create // this from the `PartialDataColumnSidecar` type above. This avoids a few ugly `expect` calls. pub column: Vec<&'a Cell>, pub kzg_proofs: Vec<&'a KzgProof>, + #[superstruct(only(Fulu))] pub header: ListEncodedOption<&'a PartialDataColumnHeader>, } @@ -52,24 +81,26 @@ pub enum PartialDataColumnSidecarError { ConflictingData, } -impl PartialDataColumnSidecar { +impl<'a, E: EthSpec> PartialDataColumnSidecarRef<'a, E> { pub fn is_complete(&self) -> bool { - self.cells_present_bitmap.num_set_bits() == self.cells_present_bitmap.len() + self.cells_present_bitmap().num_set_bits() == self.cells_present_bitmap().len() } - pub fn get(&self, idx: usize) -> Option<(&Cell, &KzgProof)> { - if !self.cells_present_bitmap.get(idx).unwrap_or(false) { + pub fn get(&self, idx: usize) -> Option<(&'a Cell, &'a KzgProof)> { + if !self.cells_present_bitmap().get(idx).unwrap_or(false) { return None; } let storage_idx = self - .cells_present_bitmap + .cells_present_bitmap() .iter() .take(idx) .filter(|b| *b) .count(); - self.column - .get(storage_idx) - .and_then(|cell| self.kzg_proofs.get(storage_idx).map(|proof| (cell, proof))) + self.column().get(storage_idx).and_then(|cell| { + self.kzg_proofs() + .get(storage_idx) + .map(|proof| (cell, proof)) + }) } /// Creates a reference to this sidecar containing only the blob indices for which the passed @@ -77,18 +108,18 @@ impl PartialDataColumnSidecar { pub fn filter( &self, filter: F, - ) -> Result>, PartialDataColumnSidecarError> + ) -> Result>, PartialDataColumnSidecarError> where F: Fn(usize) -> bool, { let len = self.verify_len()?; - let mut new_bitmap = self.cells_present_bitmap.clone(); + let mut new_bitmap = self.cells_present_bitmap().clone(); let mut new_column = Vec::with_capacity(len); let mut new_proofs = Vec::with_capacity(len); - let mut iter = self.column.iter().zip(self.kzg_proofs.iter()); + let mut iter = self.column().iter().zip(self.kzg_proofs().iter()); - for (blob_idx, present) in self.cells_present_bitmap.iter().enumerate() { + for (blob_idx, present) in self.cells_present_bitmap().iter().enumerate() { if present { let (cell, proof) = iter .next() @@ -110,23 +141,57 @@ impl PartialDataColumnSidecar { return Ok(None); } - Ok(Some(PartialDataColumnSidecarRef { - cells_present_bitmap: new_bitmap, - column: new_column, - kzg_proofs: new_proofs, - header: self.header.as_ref().into(), + Ok(Some(if let Ok(header) = self.header() { + PartialDataColumnViewFulu { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + header: header.as_ref().into(), + } + .into() + } else { + PartialDataColumnViewGloas { + cells_present_bitmap: new_bitmap, + column: new_column, + kzg_proofs: new_proofs, + } + .into() })) } pub fn verify_len(&self) -> Result { - let len = self.cells_present_bitmap.num_set_bits(); - if len != self.kzg_proofs.len() || len != self.column.len() { + let len = self.cells_present_bitmap().num_set_bits(); + if len != self.kzg_proofs().len() || len != self.column().len() { return Err(PartialDataColumnSidecarError::InternallyInconsistent); } Ok(len) } } +impl PartialDataColumnSidecar { + pub fn is_complete(&self) -> bool { + self.to_ref().is_complete() + } + + pub fn get(&self, idx: usize) -> Option<(&Cell, &KzgProof)> { + self.to_ref().get(idx) + } + + pub fn filter( + &self, + filter: F, + ) -> Result>, PartialDataColumnSidecarError> + where + F: Fn(usize) -> bool, + { + self.to_ref().filter(filter) + } + + pub fn verify_len(&self) -> Result { + self.to_ref().verify_len() + } +} + #[cfg_attr( feature = "arbitrary", derive(arbitrary::Arbitrary), @@ -191,50 +256,130 @@ impl Display for PartialDataColumnPartsMetadata { } } +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +pub struct PartialDataColumnGroupId { + pub slot: Slot, + pub beacon_block_root: Hash256, +} + +#[superstruct( + variants(Fulu, Gloas), + variant_attributes(derive(Debug, Clone, PartialEq)), + ref_attributes(derive(Debug)) +)] #[derive(Debug, Clone, PartialEq)] pub struct PartialDataColumn { pub block_root: Hash256, + #[superstruct(only(Gloas))] + pub slot: Slot, pub index: ColumnIndex, - pub sidecar: PartialDataColumnSidecar, + #[superstruct(only(Fulu), partial_getter(rename = "sidecar_fulu"))] + pub sidecar: PartialDataColumnSidecarFulu, + #[superstruct(only(Gloas), partial_getter(rename = "sidecar_gloas"))] + pub sidecar: PartialDataColumnSidecarGloas, } impl PartialDataColumn { + pub fn sidecar(&self) -> PartialDataColumnSidecarRef<'_, E> { + self.to_ref().sidecar() + } + /// Equivalent to a call to `clone` followed by `try_into_full`, but returns early if conversion /// is not possible. pub fn try_clone_full( &self, - header: &PartialDataColumnHeader, + header: Option<&PartialDataColumnHeader>, ) -> Option> { - if !self.sidecar.is_complete() { + if !self.sidecar().is_complete() { return None; } - - Some(DataColumnSidecar::Fulu(DataColumnSidecarFulu { - index: self.index, - column: self.sidecar.column.clone(), - kzg_commitments: header.kzg_commitments.clone(), - kzg_proofs: self.sidecar.kzg_proofs.clone(), - signed_block_header: header.signed_block_header.clone(), - kzg_commitments_inclusion_proof: header.kzg_commitments_inclusion_proof.clone(), - })) + match self { + PartialDataColumn::Fulu(fulu) => { + let header = header?; + Some(DataColumnSidecar::Fulu(DataColumnSidecarFulu { + index: fulu.index, + column: fulu.sidecar.column.clone(), + kzg_commitments: header.kzg_commitments.clone(), + kzg_proofs: fulu.sidecar.kzg_proofs.clone(), + signed_block_header: header.signed_block_header.clone(), + kzg_commitments_inclusion_proof: header.kzg_commitments_inclusion_proof.clone(), + })) + } + PartialDataColumn::Gloas(gloas) => { + Some(DataColumnSidecar::Gloas(DataColumnSidecarGloas { + index: gloas.index, + column: gloas.sidecar.column.clone(), + kzg_proofs: gloas.sidecar.kzg_proofs.clone(), + slot: gloas.slot, + beacon_block_root: gloas.block_root, + })) + } + } } pub fn try_into_full( self, - header: &PartialDataColumnHeader, + header: Option<&PartialDataColumnHeader>, ) -> Option> { - if !self.sidecar.is_complete() { + if !self.sidecar().is_complete() { return None; } + match self { + PartialDataColumn::Fulu(fulu) => { + let header = header?; + Some(DataColumnSidecar::Fulu(DataColumnSidecarFulu { + index: fulu.index, + column: fulu.sidecar.column, + kzg_commitments: header.kzg_commitments.clone(), + kzg_proofs: fulu.sidecar.kzg_proofs, + signed_block_header: header.signed_block_header.clone(), + kzg_commitments_inclusion_proof: header.kzg_commitments_inclusion_proof.clone(), + })) + } + PartialDataColumn::Gloas(gloas) => { + Some(DataColumnSidecar::Gloas(DataColumnSidecarGloas { + index: gloas.index, + column: gloas.sidecar.column, + kzg_proofs: gloas.sidecar.kzg_proofs, + slot: gloas.slot, + beacon_block_root: gloas.block_root, + })) + } + } + } +} - Some(DataColumnSidecar::Fulu(DataColumnSidecarFulu { - index: self.index, - column: self.sidecar.column, - kzg_commitments: header.kzg_commitments.clone(), - kzg_proofs: self.sidecar.kzg_proofs, - signed_block_header: header.signed_block_header.clone(), - kzg_commitments_inclusion_proof: header.kzg_commitments_inclusion_proof.clone(), - })) +impl<'a, E: EthSpec> PartialDataColumnRef<'a, E> { + pub fn sidecar(&self) -> PartialDataColumnSidecarRef<'a, E> { + match self { + PartialDataColumnRef::Fulu(f) => PartialDataColumnSidecarRef::Fulu(&f.sidecar), + PartialDataColumnRef::Gloas(g) => PartialDataColumnSidecarRef::Gloas(&g.sidecar), + } + } +} + +// TODO(dknopik): good location? +/// This enum lives during the transitional period of Fulu and Gloas support for partial messages. +/// After Gloas has been activated on all supported networks, it can be removed in favor of the bid only. +#[derive(Debug, Clone)] +pub enum PartialHeaderOrBid { + PartialHeader(Arc>), + Bid(Arc>), +} + +impl PartialHeaderOrBid { + pub fn kzg_commitments(&self) -> &KzgCommitments { + match self { + PartialHeaderOrBid::PartialHeader(header) => &header.kzg_commitments, + PartialHeaderOrBid::Bid(bid) => &bid.message.blob_kzg_commitments, + } + } + + pub fn slot(&self) -> Slot { + match self { + PartialHeaderOrBid::PartialHeader(header) => header.slot(), + PartialHeaderOrBid::Bid(bid) => bid.message.slot, + } } } @@ -278,12 +423,13 @@ mod tests { .try_into() .unwrap(); - PartialDataColumnSidecar { + PartialDataColumnSidecarFulu { cells_present_bitmap: bitmap, column, kzg_proofs: proofs, header: None.into(), } + .into() } fn make_sidecar(total_blobs: usize, present_indices: &[usize]) -> PartialDataColumnSidecar { @@ -318,11 +464,11 @@ mod tests { fn filter_keeps_matching_cells() { let sidecar = make_sidecar(6, &[0, 2, 4]); let filtered = sidecar.filter(|idx| idx == 0 || idx == 4).unwrap().unwrap(); - assert_eq!(filtered.column.len(), 2); - assert_eq!(filtered.kzg_proofs.len(), 2); - assert!(filtered.cells_present_bitmap.get(0).unwrap()); - assert!(!filtered.cells_present_bitmap.get(2).unwrap()); - assert!(filtered.cells_present_bitmap.get(4).unwrap()); + assert_eq!(filtered.column().len(), 2); + assert_eq!(filtered.kzg_proofs().len(), 2); + assert!(filtered.cells_present_bitmap().get(0).unwrap()); + assert!(!filtered.cells_present_bitmap().get(2).unwrap()); + assert!(filtered.cells_present_bitmap().get(4).unwrap()); } #[test] @@ -340,9 +486,12 @@ mod tests { fn filter_preserves_all_when_all_match() { let sidecar = make_sidecar(6, &[0, 2, 4]); let filtered = sidecar.filter(|_| true).unwrap().unwrap(); - assert_eq!(filtered.column.len(), 3); - assert_eq!(filtered.kzg_proofs.len(), 3); - assert_eq!(filtered.cells_present_bitmap, sidecar.cells_present_bitmap); + assert_eq!(filtered.column().len(), 3); + assert_eq!(filtered.kzg_proofs().len(), 3); + assert_eq!( + filtered.cells_present_bitmap(), + sidecar.cells_present_bitmap() + ); // Also, check that the encoded version matches assert_eq!(filtered.as_ssz_bytes(), sidecar.as_ssz_bytes()); @@ -364,16 +513,24 @@ mod tests { // -- try_clone_full tests (on PartialDataColumn) -- + fn into_fulu(sidecar: PartialDataColumnSidecar) -> PartialDataColumnSidecarFulu { + match sidecar { + PartialDataColumnSidecar::Fulu(s) => s, + PartialDataColumnSidecar::Gloas(_) => panic!("expected Fulu sidecar"), + } + } + #[test] fn try_clone_full_succeeds_when_complete() { let sidecar = make_sidecar(3, &[0, 1, 2]); let header = make_header(3); - let partial = PartialDataColumn { + let partial: PartialDataColumn = PartialDataColumnFulu { block_root: Hash256::zero(), index: 5, - sidecar, - }; - let full = partial.try_clone_full(&header).unwrap(); + sidecar: into_fulu(sidecar), + } + .into(); + let full = partial.try_clone_full(Some(&header)).unwrap(); assert_eq!(*full.index(), 5); assert_eq!(full.column().len(), 3); } @@ -382,12 +539,13 @@ mod tests { fn try_clone_full_returns_none_when_incomplete() { let sidecar = make_sidecar(4, &[0, 2]); let header = make_header(4); - let partial = PartialDataColumn { + let partial: PartialDataColumn = PartialDataColumnFulu { block_root: Hash256::zero(), index: 0, - sidecar, - }; - assert!(partial.try_clone_full(&header).is_none()); + sidecar: into_fulu(sidecar), + } + .into(); + assert!(partial.try_clone_full(Some(&header)).is_none()); } // -- get tests --