Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 42 additions & 18 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3311,28 +3311,32 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
}

/// Process a gossip-verified partial data column by attempting to merge it in the assembler.
/// Returns the merge result which indicates if a column was completed.
/// Process a gossip-verified partial data column by attempting to merge it into the appropriate
/// store for its fork (the Fulu assembler, or the Gloas pending payload cache). Returns the
/// merge result, which indicates whether any column was completed.
///
/// `verified_header` must be `Some` for Fulu partials (the assembler needs it) and `None`
/// for Gloas partials.
#[instrument(skip_all, level = "debug")]
pub async fn process_gossip_partial_data_column(
self: &Arc<Self>,
verified_partial: KzgVerifiedPartialDataColumn<T::EthSpec>,
verified_header: GossipVerifiedPartialDataColumnHeader<T::EthSpec>,
verified_header: Option<GossipVerifiedPartialDataColumnHeader<T::EthSpec>>,
slot: Slot,
) -> Result<ProcessedPartialColumnStatus<T::EthSpec>, BlockError> {
let block_root = verified_partial.block_root();
let partial = verified_partial.as_data_column();
let index_str = partial.index.to_string();
let index_str = partial.index().to_string();
metrics::inc_counter_vec_by(
&metrics::BEACON_PARTIAL_MESSAGE_CELLS_RECEIVED_TOTAL,
&[index_str.as_str()],
partial.sidecar.column.len() as u64,
partial.sidecar().column().len() as u64,
);

// Check if we have custody of this column
let sampling_columns =
self.sampling_columns_for_epoch(slot.epoch(T::EthSpec::slots_per_epoch()));
let verified_partial = if sampling_columns.contains(&partial.index) {
let verified_partial = if sampling_columns.contains(partial.index()) {
KzgVerifiedCustodyPartialDataColumn::from_asserted_custody(verified_partial)
} else {
return Ok(None);
Expand All @@ -3342,19 +3346,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Err(BlockError::DuplicateFullyImported(block_root));
}

let Some(assembler) = self.data_availability_checker.partial_assembler() else {
// Partial messages are apparently not activated
return Ok(None);
};
let added_cells = verified_partial.as_data_column().sidecar().column().len();

// Merge the partial into the assembler
let merge_result = assembler
.merge_partials(
block_root,
vec![verified_partial],
verified_header.into_header(),
)
.ok_or_else(|| BlockError::InternalError("No assembly found for block".to_string()))?;
let merge_result = match verified_header {
Some(header) => {
// Fulu: merge via the partial assembler.
let Some(assembler) = self.data_availability_checker.partial_assembler() else {
// Partial messages are apparently not activated
return Ok(None);
};

assembler
.merge_partials(block_root, vec![verified_partial], header.into_header())
.ok_or_else(|| {
BlockError::InternalError("No assembly found for block".to_string())
})?
}
None => {
// Gloas: merge directly into the pending payload cache. We don't track updated
// partials there yet, so we hand back the input partial alone as the "updated"
// set; full columns come from the cache merge.
let full_columns = self
.pending_payload_cache
.merge_partial_data_columns(block_root, std::slice::from_ref(&verified_partial))
.map_err(BlockError::from)?;

PartialMergeResult {
added_cells,
local_blobs: self.pending_payload_cache.has_local_blobs(&block_root),
full_columns,
updated_partials: vec![verified_partial],
}
}
};

metrics::inc_counter_vec_by(
&metrics::BEACON_PARTIAL_MESSAGE_USEFUL_CELLS_TOTAL,
Expand Down
22 changes: 11 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use types::data::{BlobIdentifier, FixedBlobSidecarList, PartialDataColumn};
use types::{
BlobSidecar, BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar,
DataColumnSidecarList, Epoch, EthSpec, Hash256, PartialDataColumnSidecarError,
PartialDataColumnSidecarRef, SignedBeaconBlock, Slot, new_non_zero_usize,
PartialDataColumnView, SignedBeaconBlock, Slot, new_non_zero_usize,
};

mod error;
Expand Down Expand Up @@ -194,7 +194,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
pub fn missing_cells_for_column_sidecar<'a>(
&'_ self,
data_column: &'a DataColumnSidecar<T::EthSpec>,
) -> Result<Option<PartialDataColumnSidecarRef<'a, T::EthSpec>>, MissingCellsError> {
) -> Result<Option<PartialDataColumnView<'a, T::EthSpec>>, MissingCellsError> {
let block_root = data_column.block_root();
let column_index = *data_column.index();

Expand All @@ -220,8 +220,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if let Some(assembler) = &self.partial_assembler {
match assembler.get_partial(&block_root, column_index) {
Some(AssemblyColumn::Incomplete(cached_partial)) => {
return data_column.try_filter_to_partial_ref(|idx, cell, proof| {
match cached_partial.as_data_column().sidecar.get(idx) {
return data_column.try_filter_to_partial_view(|idx, cell, proof| {
match cached_partial.as_data_column().sidecar().get(idx) {
None => Ok(true),
Some((cached_cell, cached_proof)) => {
if cell == cached_cell && proof == cached_proof {
Expand All @@ -244,17 +244,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}
}
// No cached data, all cells are "missing" (new data we want)
data_column.try_filter_to_partial_ref(|_, _, _| Ok(true))
data_column.try_filter_to_partial_view(|_, _, _| Ok(true))
}

/// Filter out all cells that are already cached for the given `block_root`.
/// Returns input for kzg verification, or None if all cells are already cached.
pub fn missing_cells_for_partial_column_sidecar<'a>(
&'_ self,
partial_data_column: &'a PartialDataColumn<T::EthSpec>,
) -> Result<Option<PartialDataColumnSidecarRef<'a, T::EthSpec>>, MissingCellsError> {
let column_index = partial_data_column.index;
let block_root = partial_data_column.block_root;
) -> Result<Option<PartialDataColumnView<'a, T::EthSpec>>, MissingCellsError> {
let column_index = *partial_data_column.index();
let block_root = *partial_data_column.block_root();

// Check DA checker cache first - if we have a full column cached, nothing is missing.
if self
Expand All @@ -270,8 +270,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
if let Some(assembler) = &self.partial_assembler {
match assembler.get_partial(&block_root, column_index) {
Some(AssemblyColumn::Incomplete(cached_partial)) => {
return Ok(partial_data_column.sidecar.filter(|idx| {
cached_partial.as_data_column().sidecar.get(idx).is_none()
return Ok(partial_data_column.sidecar().filter(|idx| {
cached_partial.as_data_column().sidecar().get(idx).is_none()
})?);
}
// This can happen if the column has been marked as completed already but has not
Expand All @@ -285,7 +285,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
}
}
// No cached data, all cells are "missing" (new data we want)
Ok(partial_data_column.sidecar.filter(|_| true)?)
Ok(partial_data_column.sidecar().filter(|_| true)?)
}

/// Get a blob from the availability cache.
Expand Down
Loading
Loading