Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ use std::fs;
use std::io::Write;
use std::sync::Arc;
use store::{Error as DBError, KeyValueStore};
use strum::AsRefStr;
use strum::{AsRefStr, IntoStaticStr};
use task_executor::JoinHandle;
use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument};
use types::{
Expand All @@ -115,7 +115,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files");
///
/// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`.
/// - We encountered an error whilst trying to verify the block (a `BeaconChainError`).
#[derive(Debug, AsRefStr)]
#[derive(Debug, AsRefStr, IntoStaticStr)]
pub enum BlockError {
/// The parent block was unknown.
///
Expand Down Expand Up @@ -345,7 +345,7 @@ impl From<AvailabilityCheckError> for BlockError {

/// Returned when block validation failed due to some issue verifying
/// the execution payload.
#[derive(Debug)]
#[derive(Debug, IntoStaticStr)]
pub enum ExecutionPayloadError {
/// There's no eth1 connection (mandatory after merge)
///
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use kzg::{Error as KzgError, KzgCommitment};
use strum::IntoStaticStr;
use types::{BeaconStateError, ColumnIndex, Hash256};

#[derive(Debug)]
#[derive(Debug, IntoStaticStr)]
pub enum Error {
InvalidBlobs(KzgError),
MissingBid(Hash256),
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ use tokio::sync::mpsc::{self, error::TrySendError};
use tracing::{debug, error, instrument, trace, warn};
use types::*;

pub use sync_methods::ChainSegmentProcessId;
pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId};
use types::data::FixedBlobSidecarList;

pub type Error<T> = TrySendError<BeaconWorkEvent<T>>;

mod gossip_methods;
mod rpc_methods;
mod sync_methods;
pub(crate) mod sync_methods;
mod tests;

pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1;
Expand Down
144 changes: 141 additions & 3 deletions beacon_node/network/src/network_beacon_processor/sync_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces
use crate::sync::BatchProcessResult;
use crate::sync::manager::CustodyBatchProcessResult;
use crate::sync::{
ChainId,
ChainId, PeerGroup, SyncNetworkContext,
manager::{BlockProcessType, SyncMessage},
};
use beacon_chain::block_verification_types::LookupBlock;
use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock};
use beacon_chain::data_availability_checker::AvailabilityCheckError;
use beacon_chain::data_availability_checker::{
AvailabilityCheckError, AvailabilityCheckErrorCategory,
};
use beacon_chain::historical_data_columns::HistoricalDataColumnError;
use beacon_chain::{
AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult,
Expand All @@ -20,6 +22,7 @@ use beacon_processor::{
};
use beacon_processor::{Work, WorkEvent};
use lighthouse_network::PeerAction;
use lighthouse_network::PeerId;
use lighthouse_network::service::api_types::CustodyBackfillBatchId;
use logging::crit;
use std::sync::Arc;
Expand Down Expand Up @@ -90,10 +93,17 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);
// A closure which will ignore the block.
let ignore_fn = move || {
warn!(
?process_type,
"Block processing task dropped, cpu might be overloaded"
);
// Sync handles these results
self.send_sync_message(SyncMessage::BlockComponentProcessed {
process_type,
result: crate::sync::manager::BlockProcessingResult::Ignored,
result: BlockProcessingResult::Error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we just retrying when the cpu is overloaded? does it simplify things somehow? seems wasteful to retry something we already have and just failed to process

penalty: None,
reason: "ignored_processor_overloaded".to_string(),
},
});
};
(process_fn, Box::new(ignore_fn))
Expand Down Expand Up @@ -1003,3 +1013,131 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}
}

/// The classified outcome of submitting a block / blob / column for processing, ready for the
/// lookup state machine to act on without re-inspecting `BlockError`.
#[derive(Debug)]
pub enum BlockProcessingResult {
/// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the
/// lookup must keep fetching). `info` is a stable label for logs / metrics.
Imported(bool, &'static str),
ParentUnknown {
parent_root: Hash256,
},
/// Processing failed. `penalty` is `Some` when an attributable peer should be downscored;
/// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only.
Error {
penalty: Option<(PeerAction, WhichPeerToPenalize, &'static str)>,
reason: String,
},
}

impl From<Result<AvailabilityProcessingStatus, BlockError>> for BlockProcessingResult {
fn from(result: Result<AvailabilityProcessingStatus, BlockError>) -> Self {
fn block_peer_penalty<E: Into<&'static str>>(
err: E,
) -> Option<(PeerAction, WhichPeerToPenalize, &'static str)> {
Some((
PeerAction::MidToleranceError,
WhichPeerToPenalize::BlockPeer,
err.into(),
))
}
match result {
Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"),
Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => {
Self::Imported(false, "missing_components")
}
Err(e) => {
let penalty = match &e {
BlockError::DuplicateFullyImported(_) => {
return Self::Imported(true, "duplicate");
}
BlockError::GenesisBlock => return Self::Imported(true, "genesis"),
BlockError::ParentUnknown { parent_root, .. } => {
return Self::ParentUnknown {
parent_root: *parent_root,
};
}
BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None,
BlockError::DuplicateImportStatusUnknown(_) => None,
BlockError::AvailabilityCheck(inner) => match inner {
AvailabilityCheckError::InvalidColumn((Some(idx), _)) => Some((
Copy link
Copy Markdown
Member

@jimmygchen jimmygchen May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note here - if column_index is None, it falls to the Malicious category below and penalize all column peers - on unstable, no peer is penalised:

// If no index supplied this is an un-attributable fault. In practice
// this should never happen.
None => vec![],
}

However, it doesn't look like the None column index case is reachable?

return Err((

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The None column index case reaches block_peer_penalty, which penalizes all column peers it seems, the enum variant and mapping here is slightly confusing - we're using BlockPeer variant to penalise all column peers it seems? I could be wrong

WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(),

PeerAction::MidToleranceError,
WhichPeerToPenalize::CustodyPeerForColumn(*idx),
(&e).into(),
)),
inner => match inner.category() {
AvailabilityCheckErrorCategory::Internal => None,
AvailabilityCheckErrorCategory::Malicious => block_peer_penalty(inner),
},
},
BlockError::ExecutionPayloadError(epe) => {
if epe.penalize_peer() {
block_peer_penalty(epe)
} else {
None
}
}
// Remaining invalid blocks: penalize the block peer. Listed explicitly so a
// new `BlockError` variant forces a compile error here.
BlockError::FutureSlot { .. }
| BlockError::StateRootMismatch { .. }
| BlockError::WouldRevertFinalizedSlot { .. }
| BlockError::NotFinalizedDescendant { .. }
| BlockError::BlockSlotLimitReached
| BlockError::IncorrectBlockProposer { .. }
| BlockError::UnknownValidator(_)
| BlockError::InvalidSignature(_)
| BlockError::BlockIsNotLaterThanParent { .. }
| BlockError::NonLinearParentRoots
| BlockError::NonLinearSlots
| BlockError::PerBlockProcessingError(_)
| BlockError::WeakSubjectivityConflict
| BlockError::InconsistentFork(_)
| BlockError::ParentExecutionPayloadInvalid { .. }
| BlockError::KnownInvalidExecutionPayload(_)
| BlockError::Slashable
| BlockError::EnvelopeBlockRootUnknown(_)
| BlockError::OptimisticSyncNotSupported { .. }
| BlockError::BlobNotRequired(_)
| BlockError::InvalidBlobCount { .. }
| BlockError::BidParentRootMismatch { .. } => block_peer_penalty(&e),
};
Self::Error {
penalty,
reason: format!("{e:?}"),
}
}
}
}
}

/// Selector for which peer(s) in a `PeerGroup` to downscore.
#[derive(Debug, Clone, Copy)]
pub enum WhichPeerToPenalize {
/// All peers in the group (block peer, or all data peers).
BlockPeer,
/// Only the peer(s) that served the given column index.
CustodyPeerForColumn(u64),
}

impl WhichPeerToPenalize {
pub fn apply<T: BeaconChainTypes>(
self,
action: PeerAction,
peer_group: &PeerGroup,
msg: &'static str,
cx: &mut SyncNetworkContext<T>,
) {
let peers: Vec<PeerId> = match self {
WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(),
WhichPeerToPenalize::CustodyPeerForColumn(idx) => {
peer_group.of_index(idx as usize).copied().collect()
}
};
for peer in peers {
cx.report_peer(peer, action, msg);
}
}
}
Loading
Loading