From 27ac4f05a3913af16353ed35a791b3131287403d Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 20 May 2026 04:46:49 +0200 Subject: [PATCH 1/5] Move processing-result classification to the producer side MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reshape BlockProcessingResult from the AC-verdict-passthrough Ok/Err/Ignored enum to Imported(info) | Error { penalty, reason }. The producer (network_beacon_processor) translates beacon-chain Result into this shape via a new classify_processing_result(), so the consumer only has to resolve the symbolic WhichPeerToPenalize against an in-scope PeerGroup. - New WhichPeerToPenalize { BlockPeer, CustodyPeerForColumn(u64) } with an `apply(action, &peer_group, reason, cx)` helper. Penalty policy lives once in classify_processing_result instead of being duplicated across consumer arms. - Producer emits stable identifiers via two const modules (processing_result_info, processing_result_reason) so producer and consumer never trade ad-hoc string literals. - Ignored becomes Error { penalty: None, reason: "processor_overloaded" } with a producer-side warn!; the consumer drops the lookup as before. - DuplicateFullyImported and GenesisBlock map to Imported (which makes the consumer's "successfully imported" branch fall out naturally and removes the per-BlockError policy block — net -88 lines in mod.rs). - on_processing_result_inner now captures the downloaded block's parent_root up-front, before borrowing request_state mutably, so the parent_unknown branch keeps working for any R. Test rig: three tests construct the new variants directly. Extracted from #9155 (gloas-lookup-sync) onto bare sigp/unstable. --- .../network_beacon_processor/sync_methods.rs | 130 +++++++++++++- .../network/src/sync/block_lookups/mod.rs | 160 +++++------------- beacon_node/network/src/sync/manager.rs | 69 +++++--- beacon_node/network/src/sync/tests/lookups.rs | 32 +++- 4 files changed, 233 insertions(+), 158 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 988a68c9dd2..63d6d77eb5c 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -4,11 +4,13 @@ use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ ChainId, - manager::{BlockProcessType, SyncMessage}, + manager::{BlockProcessType, BlockProcessingResult, SyncMessage, WhichPeerToPenalize}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; -use beacon_chain::data_availability_checker::AvailabilityCheckError; +use beacon_chain::data_availability_checker::{ + AvailabilityCheckError, AvailabilityCheckErrorCategory, +}; use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, @@ -90,10 +92,17 @@ impl NetworkBeaconProcessor { ); // A closure which will ignore the block. let ignore_fn = move || { + warn!( + ?process_type, + "Block processing task dropped, cpu might be overloaded" + ); // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { process_type, - result: crate::sync::manager::BlockProcessingResult::Ignored, + result: BlockProcessingResult::Error { + penalty: None, + reason: "processor_overloaded", + }, }); }; (process_fn, Box::new(ignore_fn)) @@ -233,8 +242,8 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { + result: classify_processing_result(result, &process_type, block_root), process_type, - result: result.into(), }); // Drop the handle to remove the entry from the cache @@ -344,8 +353,8 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { + result: classify_processing_result(result, &process_type, block_root), process_type, - result: result.into(), }); } @@ -421,8 +430,8 @@ impl NetworkBeaconProcessor { } self.send_sync_message(SyncMessage::BlockComponentProcessed { + result: classify_processing_result(result, &process_type, block_root), process_type, - result: result.into(), }); } @@ -1003,3 +1012,112 @@ impl NetworkBeaconProcessor { } } } + +/// Translate the beacon-chain processing outcome into a `BlockProcessingResult` the lookup state +/// machine can act on directly. The policy decisions about *whether* and *which peer-class* to +/// penalize live here, on the producer side, so consumers only need to resolve the symbolic +/// `WhichPeerToPenalize` to an actual peer id at penalty time. +fn classify_processing_result( + result: Result, + process_type: &BlockProcessType, + block_root: Hash256, +) -> BlockProcessingResult { + // Emit the full `BlockError` debug repr before consuming `result` so downstream debugging + // isn't limited to the symbolic `reason` strings carried by `BlockProcessingResult`. + if let Err(e) = &result { + match e { + BlockError::BeaconChainError(_) | BlockError::InternalError(_) => { + error!(?block_root, ?process_type, error = ?e, "Internal error processing lookup component"); + } + BlockError::AvailabilityCheck(inner) + if inner.category() == AvailabilityCheckErrorCategory::Internal => + { + warn!(?block_root, ?process_type, error = ?e, "Internal availability check failure"); + } + _ => { + debug!(?block_root, ?process_type, error = ?e, "Lookup component processing failed"); + } + } + } + + let no_penalty = |reason| BlockProcessingResult::Error { + penalty: None, + reason, + }; + let block_peer_penalty = || BlockProcessingResult::Error { + penalty: Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + )), + reason: match process_type { + BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure", + BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { + "lookup_data_processing_failure" + } + }, + }; + match result { + Ok(AvailabilityProcessingStatus::Imported(_)) => { + BlockProcessingResult::Imported("imported") + } + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + BlockProcessingResult::Imported("missing_components") + } + Err(BlockError::DuplicateFullyImported(_)) => BlockProcessingResult::Imported("duplicate"), + Err(BlockError::GenesisBlock) => BlockProcessingResult::Imported("genesis"), + Err(BlockError::ParentUnknown { parent_root, .. }) => { + BlockProcessingResult::ParentUnknown { parent_root } + } + Err(BlockError::BeaconChainError(_)) | Err(BlockError::InternalError(_)) => { + no_penalty("internal_error") + } + Err(BlockError::DuplicateImportStatusUnknown(_)) => no_penalty("duplicate_unknown_status"), + Err(BlockError::AvailabilityCheck(ref inner)) + if inner.category() == AvailabilityCheckErrorCategory::Internal => + { + no_penalty("availability_internal") + } + Err(BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { + no_penalty("execution_payload") + } + // Bad-column attribution: only meaningful for the data path, but classify uniformly — + // block-side processing won't produce this variant. + Err(BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn(( + Some(idx), + _, + )))) => BlockProcessingResult::Error { + penalty: Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::CustodyPeerForColumn(idx), + )), + reason: "lookup_data_processing_failure", + }, + // All remaining invalid blocks attribute to the block peer (which is also the data peer + // pre-Gloas). Listed explicitly to keep this match exhaustive so future `BlockError` + // variants force a compile error and a deliberate classification choice. + Err(BlockError::FutureSlot { .. }) + | Err(BlockError::StateRootMismatch { .. }) + | Err(BlockError::WouldRevertFinalizedSlot { .. }) + | Err(BlockError::NotFinalizedDescendant { .. }) + | Err(BlockError::BlockSlotLimitReached) + | Err(BlockError::IncorrectBlockProposer { .. }) + | Err(BlockError::UnknownValidator(_)) + | Err(BlockError::InvalidSignature(_)) + | Err(BlockError::BlockIsNotLaterThanParent { .. }) + | Err(BlockError::NonLinearParentRoots) + | Err(BlockError::NonLinearSlots) + | Err(BlockError::PerBlockProcessingError(_)) + | Err(BlockError::WeakSubjectivityConflict) + | Err(BlockError::InconsistentFork(_)) + | Err(BlockError::ExecutionPayloadError(_)) + | Err(BlockError::ParentExecutionPayloadInvalid { .. }) + | Err(BlockError::KnownInvalidExecutionPayload(_)) + | Err(BlockError::Slashable) + | Err(BlockError::AvailabilityCheck(_)) + | Err(BlockError::EnvelopeBlockRootUnknown(_)) + | Err(BlockError::OptimisticSyncNotSupported { .. }) + | Err(BlockError::BlobNotRequired(_)) + | Err(BlockError::InvalidBlobCount { .. }) + | Err(BlockError::BidParentRootMismatch { .. }) => block_peer_penalty(), + } +} diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 3929f74aa04..e67c2312068 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -27,17 +27,13 @@ use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERA use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; use crate::sync::SyncMessage; -use crate::sync::block_lookups::common::ResponseType; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; +use beacon_chain::BeaconChainTypes; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::data_availability_checker::{ - AvailabilityCheckError, AvailabilityCheckErrorCategory, -}; -use beacon_chain::{AvailabilityProcessingStatus, BeaconChainTypes, BlockError}; pub use common::RequestState; use fnv::FnvHashMap; +use lighthouse_network::PeerId; use lighthouse_network::service::api_types::SingleLookupReqId; -use lighthouse_network::{PeerAction, PeerId}; use lru_cache::LRUTimeCache; pub use single_block_lookup::{BlobRequestState, BlockRequestState, CustodyRequestState}; use std::collections::hash_map::Entry; @@ -109,7 +105,6 @@ pub type SingleLookupId = u32; enum Action { Retry, ParentUnknown { parent_root: Hash256 }, - Drop(/* reason: */ String), Continue, } @@ -588,126 +583,51 @@ impl BlockLookups { ); let action = match result { - BlockProcessingResult::Ok(AvailabilityProcessingStatus::Imported(_)) - | BlockProcessingResult::Err(BlockError::DuplicateFullyImported(..)) - | BlockProcessingResult::Err(BlockError::GenesisBlock) => { - // Successfully imported - request_state.on_processing_success()?; - Action::Continue - } - - BlockProcessingResult::Ok(AvailabilityProcessingStatus::MissingComponents { - .. - }) => { - // `on_processing_success` is called here to ensure the request state is updated prior to checking - // if both components have been processed. + BlockProcessingResult::Imported(info) => { + // `on_processing_success` is called here to ensure the request state is updated + // prior to checking if all components have been processed (relevant for + // MissingComponents). request_state.on_processing_success()?; - if lookup.all_components_processed() { - // We don't request for other block components until being sure that the block has - // data. If we request blobs / columns to a peer we are sure those must exist. - // Therefore if all components are processed and we still receive `MissingComponents` - // it indicates an internal bug. - return Err(LookupRequestError::MissingComponentsAfterAllProcessed); + if info == "missing_components" { + if lookup.all_components_processed() { + // We don't request for other block components until being sure that the + // block has data. If we request blobs / columns to a peer we are sure + // those must exist. Therefore if all components are processed and we + // still receive `MissingComponents` it indicates an internal bug. + return Err(LookupRequestError::MissingComponentsAfterAllProcessed); + } else { + Action::Retry + } } else { - // Continue request, potentially request blobs - Action::Retry + Action::Continue } } - BlockProcessingResult::Err(BlockError::DuplicateImportStatusUnknown(..)) => { - // This is unreachable because RPC blocks do not undergo gossip verification, and - // this error can *only* come from gossip verification. - error!(?block_root, "Single block lookup hit unreachable condition"); - Action::Drop("DuplicateImportStatusUnknown".to_owned()) + BlockProcessingResult::ParentUnknown { parent_root } => { + // `BlockError::ParentUnknown` is only returned when processing blocks. Reverts + // the status of this request to `AwaitingProcessing` holding the downloaded + // data. A future call to `continue_requests` will re-submit it once there are + // no pending parent requests. + request_state.revert_to_awaiting_processing()?; + Action::ParentUnknown { parent_root } } - BlockProcessingResult::Ignored => { - // Beacon processor signalled to ignore the block processing result. - // This implies that the cpu is overloaded. Drop the request. - warn!( + BlockProcessingResult::Error { penalty, reason } => { + // Retry on every processing error: `on_processing_failure` increments the + // per-component failure counter, so `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS` bounds the + // retry loop and eventually drops the lookup if the failure persists. Whether the + // peer should be downscored is the producer's call (encoded in `penalty`). + debug!( + ?block_root, component = ?R::response_type(), - "Lookup component processing ignored, cpu might be overloaded" + reason, + ?penalty, + "Lookup component processing failed; retrying" ); - Action::Drop("Block processing ignored".to_owned()) - } - BlockProcessingResult::Err(e) => { - match e { - BlockError::BeaconChainError(e) => { - // Internal error - error!(%block_root, error = ?e, "Beacon chain error processing lookup component"); - Action::Drop(format!("{e:?}")) - } - BlockError::ParentUnknown { parent_root, .. } => { - // Reverts the status of this request to `AwaitingProcessing` holding the - // downloaded data. A future call to `continue_requests` will re-submit it - // once there are no pending parent requests. - // Note: `BlockError::ParentUnknown` is only returned when processing - // blocks, not blobs. - request_state.revert_to_awaiting_processing()?; - Action::ParentUnknown { parent_root } - } - ref e @ BlockError::ExecutionPayloadError(ref epe) if !epe.penalize_peer() => { - // These errors indicate that the execution layer is offline - // and failed to validate the execution payload. Do not downscore peer. - debug!( - ?block_root, - error = ?e, - "Single block lookup failed. Execution layer is offline / unsynced / misconfigured" - ); - Action::Drop(format!("{e:?}")) - } - BlockError::AvailabilityCheck(e) - if e.category() == AvailabilityCheckErrorCategory::Internal => - { - // There errors indicate internal problems and should not downscore the peer - warn!(?block_root, error = ?e, "Internal availability check failure"); - - // Here we choose *not* to call `on_processing_failure` because this could result in a bad - // lookup state transition. This error invalidates both blob and block requests, and we don't know the - // state of both requests. Blobs may have already successfullly processed for example. - // We opt to drop the lookup instead. - Action::Drop(format!("{e:?}")) - } - other => { - debug!( - ?block_root, - component = ?R::response_type(), - error = ?other, - "Invalid lookup component" - ); - let peer_group = request_state.on_processing_failure()?; - let peers_to_penalize: Vec<_> = match other { - // Note: currenlty only InvalidColumn errors have index granularity, - // but future errors may follow the same pattern. Generalize this - // pattern with https://github.com/sigp/lighthouse/pull/6321 - BlockError::AvailabilityCheck( - AvailabilityCheckError::InvalidColumn((index_opt, _)), - ) => { - match index_opt { - Some(index) => peer_group.of_index(index as usize).collect(), - // If no index supplied this is an un-attributable fault. In practice - // this should never happen. - None => vec![], - } - } - _ => peer_group.all().collect(), - }; - for peer in peers_to_penalize { - cx.report_peer( - *peer, - PeerAction::MidToleranceError, - match R::response_type() { - ResponseType::Block => "lookup_block_processing_failure", - ResponseType::Blob => "lookup_blobs_processing_failure", - ResponseType::CustodyColumn => { - "lookup_custody_column_processing_failure" - } - }, - ); - } - - Action::Retry - } + let peer_group = request_state.on_processing_failure()?; + if let Some((action_kind, whom)) = penalty { + whom.apply(action_kind, &peer_group, reason, cx); } + Action::Retry } }; @@ -742,10 +662,6 @@ impl BlockLookups { ))) } } - Action::Drop(reason) => { - // Drop with noop - Err(LookupRequestError::Failed(reason)) - } Action::Continue => { // Drop this completed lookup only Ok(LookupResult::Completed) diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index 347b018a939..f41670382aa 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -49,9 +49,7 @@ use crate::sync::block_lookups::{ use crate::sync::custody_backfill_sync::CustodyBackFillSync; use crate::sync::network_context::{PeerGroup, RpcResponseResult}; use beacon_chain::block_verification_types::AsBlock; -use beacon_chain::{ - AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, BlockError, EngineState, -}; +use beacon_chain::{BeaconChain, BeaconChainTypes, EngineState}; use futures::StreamExt; use lighthouse_network::SyncInfo; use lighthouse_network::rpc::RPCError; @@ -205,11 +203,55 @@ impl BlockProcessType { } } +/// The classified outcome of submitting a block / blob / column for processing. The producer +/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this +/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. #[derive(Debug)] pub enum BlockProcessingResult { - Ok(AvailabilityProcessingStatus), - Err(BlockError), - Ignored, + /// Data was imported (or already present, or otherwise satisfies the lookup). `info` is a + /// short stable identifier suitable for debug logs / metrics. + Imported(&'static str), + /// Block processing reported an unknown parent. The lookup re-arms itself behind a parent + /// lookup for `parent_root` rather than retrying or penalizing. + ParentUnknown { parent_root: Hash256 }, + /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored. + Error { + penalty: Option<(PeerAction, WhichPeerToPenalize)>, + reason: &'static str, + }, +} + +/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer +/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer +/// group for data processing) and `apply` selects from it. +#[derive(Debug, Clone, Copy)] +pub enum WhichPeerToPenalize { + /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer + /// or the blob peer — i.e. the peer responsible for the component as a whole). + BlockPeer, + /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. + CustodyPeerForColumn(u64), +} + +impl WhichPeerToPenalize { + /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s). + pub fn apply( + self, + action: PeerAction, + peer_group: &PeerGroup, + reason: &'static str, + cx: &mut SyncNetworkContext, + ) { + let peers: Vec = match self { + WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), + WhichPeerToPenalize::CustodyPeerForColumn(idx) => { + peer_group.of_index(idx as usize).copied().collect() + } + }; + for peer in peers { + cx.report_peer(peer, action, reason); + } + } } /// The result of processing multiple blocks (a chain segment). @@ -1454,18 +1496,3 @@ impl SyncManager { } } } - -impl From> for BlockProcessingResult { - fn from(result: Result) -> Self { - match result { - Ok(status) => BlockProcessingResult::Ok(status), - Err(e) => BlockProcessingResult::Err(e), - } - } -} - -impl From for BlockProcessingResult { - fn from(e: BlockError) -> Self { - BlockProcessingResult::Err(e) - } -} diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index c1b2793491f..956ed01683c 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -6,13 +6,16 @@ use crate::network_beacon_processor::{ use crate::sync::block_lookups::{BlockLookupSummary, PARENT_DEPTH_TOLERANCE}; use crate::sync::{ SyncMessage, - manager::{BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager}, + manager::{ + BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager, + WhichPeerToPenalize, + }, }; use beacon_chain::blob_verification::KzgVerifiedBlob; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::custody_context::NodeCustodyType; use beacon_chain::{ - AvailabilityProcessingStatus, BlockError, EngineState, NotifyExecutionLayer, + AvailabilityProcessingStatus, EngineState, NotifyExecutionLayer, block_verification_types::{AsBlock, AvailableBlockData}, data_availability_checker::Availability, test_utils::{ @@ -2087,7 +2090,13 @@ async fn too_many_processing_failures(depth: usize) { r.build_chain_and_trigger_last_block(depth).await; // Simulate that a peer always returns empty r.simulate( - SimulateConfig::new().with_process_result(|| BlockError::BlockSlotLimitReached.into()), + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { + penalty: Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + )), + reason: "lookup_block_processing_failure", + }), ) .await; // We register multiple penalties, the lookup fails and sync does not progress @@ -2135,15 +2144,21 @@ async fn unknown_parent_does_not_add_peers_to_itself() { } #[tokio::test] -/// Assert that if the beacon processor returns Ignored, the lookup is dropped +/// Assert that a non-attributable processing error (e.g. processor overloaded) is retried up to +/// `SINGLE_BLOCK_LOOKUP_MAX_ATTEMPTS`, no peer is penalized, and the lookup is then dropped. async fn test_single_block_lookup_ignored_response() { let mut r = TestRig::default(); r.build_chain_and_trigger_last_block(1).await; - // Send an Ignored response, the request should be dropped - r.simulate(SimulateConfig::new().with_process_result(|| BlockProcessingResult::Ignored)) - .await; + r.simulate( + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { + penalty: None, + reason: "processor_overloaded", + }), + ) + .await; // The block was not actually imported r.assert_head_slot(0); + r.assert_no_penalties(); assert_eq!(r.created_lookups(), 1, "no created lookups"); assert_eq!(r.dropped_lookups(), 1, "no dropped lookups"); assert_eq!(r.completed_lookups(), 0, "some completed lookups"); @@ -2156,8 +2171,7 @@ async fn test_single_block_lookup_duplicate_response() { r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully r.simulate( - SimulateConfig::new() - .with_process_result(|| BlockError::DuplicateFullyImported(Hash256::ZERO).into()), + SimulateConfig::new().with_process_result(|| BlockProcessingResult::Imported("duplicate")), ) .await; // The block was not actually imported From ce02aa5ef14e32c5c6d99af45aaf33702eb878b0 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 20 May 2026 06:08:02 +0200 Subject: [PATCH 2/5] Allow private_interfaces on BlockComponentCouplingError to unbreak lint --- beacon_node/network/src/sync/network_context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 465e23998b9..096eaa28490 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -100,7 +100,7 @@ pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), - BlockComponentCouplingError(CouplingError), + BlockComponentCouplingError(#[allow(private_interfaces)] CouplingError), } #[derive(Debug, PartialEq, Eq)] From 76e344bb42538db1908da45579d6f47e57f00dd7 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Wed, 20 May 2026 18:19:46 -0600 Subject: [PATCH 3/5] Fixes --- .../beacon_chain/src/block_verification.rs | 6 +- .../src/data_availability_checker/error.rs | 3 +- .../src/network_beacon_processor/mod.rs | 5 +- .../network_beacon_processor/sync_methods.rs | 245 ++++++++++-------- .../network/src/sync/block_lookups/mod.rs | 34 ++- .../sync/block_lookups/single_block_lookup.rs | 3 - beacon_node/network/src/sync/manager.rs | 55 +--- beacon_node/network/src/sync/mod.rs | 1 + beacon_node/network/src/sync/tests/lookups.rs | 24 +- 9 files changed, 189 insertions(+), 187 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 24f971f736f..4304cf68156 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -93,7 +93,7 @@ use std::fs; use std::io::Write; use std::sync::Arc; use store::{Error as DBError, KeyValueStore}; -use strum::AsRefStr; +use strum::{AsRefStr, IntoStaticStr}; use task_executor::JoinHandle; use tracing::{Instrument, Span, debug, debug_span, error, info_span, instrument}; use types::{ @@ -115,7 +115,7 @@ const WRITE_BLOCK_PROCESSING_SSZ: bool = cfg!(feature = "write_ssz_files"); /// /// - The block is malformed/invalid (indicated by all results other than `BeaconChainError`. /// - We encountered an error whilst trying to verify the block (a `BeaconChainError`). -#[derive(Debug, AsRefStr)] +#[derive(Debug, AsRefStr, IntoStaticStr)] pub enum BlockError { /// The parent block was unknown. /// @@ -345,7 +345,7 @@ impl From for BlockError { /// Returned when block validation failed due to some issue verifying /// the execution payload. -#[derive(Debug)] +#[derive(Debug, IntoStaticStr)] pub enum ExecutionPayloadError { /// There's no eth1 connection (mandatory after merge) /// diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index ab69a629853..2653c848608 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -1,7 +1,8 @@ use kzg::{Error as KzgError, KzgCommitment}; +use strum::IntoStaticStr; use types::{BeaconStateError, ColumnIndex, Hash256}; -#[derive(Debug)] +#[derive(Debug, IntoStaticStr)] pub enum Error { InvalidBlobs(KzgError), MissingBid(Hash256), diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 7bf969db106..6f1bb54a037 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -32,7 +32,10 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, instrument, trace, warn}; use types::*; -pub use sync_methods::ChainSegmentProcessId; +pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; +// Re-exported for test construction of `BlockProcessingResult::Error.penalty` tuples. +#[allow(unused_imports)] +pub use sync_methods::WhichPeerToPenalize; use types::data::FixedBlobSidecarList; pub type Error = TrySendError>; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 63d6d77eb5c..fff9cccf4a1 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -3,8 +3,8 @@ use crate::network_beacon_processor::{FUTURE_SLOT_TOLERANCE, NetworkBeaconProces use crate::sync::BatchProcessResult; use crate::sync::manager::CustodyBatchProcessResult; use crate::sync::{ - ChainId, - manager::{BlockProcessType, BlockProcessingResult, SyncMessage, WhichPeerToPenalize}, + ChainId, PeerGroup, SyncNetworkContext, + manager::{BlockProcessType, SyncMessage}, }; use beacon_chain::block_verification_types::LookupBlock; use beacon_chain::block_verification_types::{AsBlock, RangeSyncBlock}; @@ -22,6 +22,7 @@ use beacon_processor::{ }; use beacon_processor::{Work, WorkEvent}; use lighthouse_network::PeerAction; +use lighthouse_network::PeerId; use lighthouse_network::service::api_types::CustodyBackfillBatchId; use logging::crit; use std::sync::Arc; @@ -101,7 +102,7 @@ impl NetworkBeaconProcessor { process_type, result: BlockProcessingResult::Error { penalty: None, - reason: "processor_overloaded", + reason: "ignored_processor_overloaded".to_string(), }, }); }; @@ -242,7 +243,7 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: classify_processing_result(result, &process_type, block_root), + result: result.into(), process_type, }); @@ -353,7 +354,7 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: classify_processing_result(result, &process_type, block_root), + result: result.into(), process_type, }); } @@ -430,7 +431,7 @@ impl NetworkBeaconProcessor { } self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: classify_processing_result(result, &process_type, block_root), + result: result.into(), process_type, }); } @@ -1013,111 +1014,153 @@ impl NetworkBeaconProcessor { } } +/// The classified outcome of submitting a block / blob / column for processing. The producer +/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this +/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. +#[derive(Debug)] +pub enum BlockProcessingResult { + /// Data was sent for processing and the producer treats this as a non-failure outcome for + /// the lookup. `fully_imported` is `true` when the lookup's block-and-data state is complete + /// (block imported, duplicate, or genesis); `false` when the block is valid but data + /// sidecars are still required (`MissingComponents`) and the lookup should keep fetching. + /// `info` is a short stable identifier suitable for debug logs / metrics. + Imported(bool, &'static str), + /// Block processing reported an unknown parent. The lookup re-arms itself behind a parent + /// lookup for `parent_root` rather than retrying or penalizing. + ParentUnknown { parent_root: Hash256 }, + /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored; + /// its third element is the `&'static str` passed to `report_peer` for scoring telemetry. + /// `reason` is a free-form description used for debug logs only. + Error { + penalty: Option<(PeerAction, WhichPeerToPenalize, &'static str)>, + reason: String, + }, +} + /// Translate the beacon-chain processing outcome into a `BlockProcessingResult` the lookup state /// machine can act on directly. The policy decisions about *whether* and *which peer-class* to /// penalize live here, on the producer side, so consumers only need to resolve the symbolic /// `WhichPeerToPenalize` to an actual peer id at penalty time. -fn classify_processing_result( - result: Result, - process_type: &BlockProcessType, - block_root: Hash256, -) -> BlockProcessingResult { - // Emit the full `BlockError` debug repr before consuming `result` so downstream debugging - // isn't limited to the symbolic `reason` strings carried by `BlockProcessingResult`. - if let Err(e) = &result { - match e { - BlockError::BeaconChainError(_) | BlockError::InternalError(_) => { - error!(?block_root, ?process_type, error = ?e, "Internal error processing lookup component"); - } - BlockError::AvailabilityCheck(inner) - if inner.category() == AvailabilityCheckErrorCategory::Internal => - { - warn!(?block_root, ?process_type, error = ?e, "Internal availability check failure"); +impl From> for BlockProcessingResult { + fn from(result: Result) -> Self { + // Emit the full `BlockError` debug repr before consuming `result` so downstream debugging + // isn't limited to the symbolic `reason` strings carried by `BlockProcessingResult`. + + /// Penalty against the block peer, using `err`'s variant name (via `IntoStaticStr`) as the + /// peer-scoring telemetry message. + fn block_peer_penalty>( + err: E, + ) -> Option<(PeerAction, WhichPeerToPenalize, &'static str)> { + Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::BlockPeer, + err.into(), + )) + } + match result { + Ok(AvailabilityProcessingStatus::Imported(_)) => Self::Imported(true, "imported"), + Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { + Self::Imported(false, "missing_components") } - _ => { - debug!(?block_root, ?process_type, error = ?e, "Lookup component processing failed"); + Err(e) => { + let penalty = match &e { + BlockError::DuplicateFullyImported(_) => { + return Self::Imported(true, "duplicate"); + } + BlockError::GenesisBlock => return Self::Imported(true, "genesis"), + BlockError::ParentUnknown { parent_root, .. } => { + return Self::ParentUnknown { + parent_root: *parent_root, + }; + } + BlockError::BeaconChainError(_) | BlockError::InternalError(_) => None, + BlockError::DuplicateImportStatusUnknown(_) => None, + BlockError::AvailabilityCheck(inner) => match inner { + AvailabilityCheckError::InvalidColumn((Some(idx), _)) => Some(( + PeerAction::MidToleranceError, + WhichPeerToPenalize::CustodyPeerForColumn(*idx), + (&e).into(), + )), + inner => match inner.category() { + // Internal AC errors are our fault; do not penalize the peer. + AvailabilityCheckErrorCategory::Internal => None, + // Malicious AC errors (bad blobs, bad columns w/o index, kzg + // mismatch, etc.) are attributable to the data peer. + AvailabilityCheckErrorCategory::Malicious => block_peer_penalty(inner), + }, + }, + BlockError::ExecutionPayloadError(epe) => { + if epe.penalize_peer() { + block_peer_penalty(epe) + } else { + None + } + } + // All remaining invalid blocks attribute to the block peer (which is also the data + // peer pre-Gloas). Listed explicitly to keep this match exhaustive so future + // `BlockError` variants force a compile error and a deliberate classification choice. + BlockError::FutureSlot { .. } + | BlockError::StateRootMismatch { .. } + | BlockError::WouldRevertFinalizedSlot { .. } + | BlockError::NotFinalizedDescendant { .. } + | BlockError::BlockSlotLimitReached + | BlockError::IncorrectBlockProposer { .. } + | BlockError::UnknownValidator(_) + | BlockError::InvalidSignature(_) + | BlockError::BlockIsNotLaterThanParent { .. } + | BlockError::NonLinearParentRoots + | BlockError::NonLinearSlots + | BlockError::PerBlockProcessingError(_) + | BlockError::WeakSubjectivityConflict + | BlockError::InconsistentFork(_) + | BlockError::ParentExecutionPayloadInvalid { .. } + | BlockError::KnownInvalidExecutionPayload(_) + | BlockError::Slashable + | BlockError::EnvelopeBlockRootUnknown(_) + | BlockError::OptimisticSyncNotSupported { .. } + | BlockError::BlobNotRequired(_) + | BlockError::InvalidBlobCount { .. } + | BlockError::BidParentRootMismatch { .. } => block_peer_penalty(&e), + }; + Self::Error { + penalty, + reason: format!("{e:?}"), + } } } } +} - let no_penalty = |reason| BlockProcessingResult::Error { - penalty: None, - reason, - }; - let block_peer_penalty = || BlockProcessingResult::Error { - penalty: Some(( - PeerAction::MidToleranceError, - WhichPeerToPenalize::BlockPeer, - )), - reason: match process_type { - BlockProcessType::SingleBlock { .. } => "lookup_block_processing_failure", - BlockProcessType::SingleBlob { .. } | BlockProcessType::SingleCustodyColumn(_) => { - "lookup_data_processing_failure" +/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer +/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer +/// group for data processing) and `apply` selects from it. +#[derive(Debug, Clone, Copy)] +pub enum WhichPeerToPenalize { + /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer + /// or the blob peer — i.e. the peer responsible for the component as a whole). + BlockPeer, + /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. + CustodyPeerForColumn(u64), +} + +impl WhichPeerToPenalize { + /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s) + /// with the given action and telemetry message. + pub fn apply( + self, + action: PeerAction, + peer_group: &PeerGroup, + msg: &'static str, + cx: &mut SyncNetworkContext, + ) { + let peers: Vec = match self { + WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), + WhichPeerToPenalize::CustodyPeerForColumn(idx) => { + peer_group.of_index(idx as usize).copied().collect() } - }, - }; - match result { - Ok(AvailabilityProcessingStatus::Imported(_)) => { - BlockProcessingResult::Imported("imported") - } - Ok(AvailabilityProcessingStatus::MissingComponents(_, _)) => { - BlockProcessingResult::Imported("missing_components") - } - Err(BlockError::DuplicateFullyImported(_)) => BlockProcessingResult::Imported("duplicate"), - Err(BlockError::GenesisBlock) => BlockProcessingResult::Imported("genesis"), - Err(BlockError::ParentUnknown { parent_root, .. }) => { - BlockProcessingResult::ParentUnknown { parent_root } - } - Err(BlockError::BeaconChainError(_)) | Err(BlockError::InternalError(_)) => { - no_penalty("internal_error") - } - Err(BlockError::DuplicateImportStatusUnknown(_)) => no_penalty("duplicate_unknown_status"), - Err(BlockError::AvailabilityCheck(ref inner)) - if inner.category() == AvailabilityCheckErrorCategory::Internal => - { - no_penalty("availability_internal") - } - Err(BlockError::ExecutionPayloadError(ref epe)) if !epe.penalize_peer() => { - no_penalty("execution_payload") + }; + for peer in peers { + cx.report_peer(peer, action, msg); } - // Bad-column attribution: only meaningful for the data path, but classify uniformly — - // block-side processing won't produce this variant. - Err(BlockError::AvailabilityCheck(AvailabilityCheckError::InvalidColumn(( - Some(idx), - _, - )))) => BlockProcessingResult::Error { - penalty: Some(( - PeerAction::MidToleranceError, - WhichPeerToPenalize::CustodyPeerForColumn(idx), - )), - reason: "lookup_data_processing_failure", - }, - // All remaining invalid blocks attribute to the block peer (which is also the data peer - // pre-Gloas). Listed explicitly to keep this match exhaustive so future `BlockError` - // variants force a compile error and a deliberate classification choice. - Err(BlockError::FutureSlot { .. }) - | Err(BlockError::StateRootMismatch { .. }) - | Err(BlockError::WouldRevertFinalizedSlot { .. }) - | Err(BlockError::NotFinalizedDescendant { .. }) - | Err(BlockError::BlockSlotLimitReached) - | Err(BlockError::IncorrectBlockProposer { .. }) - | Err(BlockError::UnknownValidator(_)) - | Err(BlockError::InvalidSignature(_)) - | Err(BlockError::BlockIsNotLaterThanParent { .. }) - | Err(BlockError::NonLinearParentRoots) - | Err(BlockError::NonLinearSlots) - | Err(BlockError::PerBlockProcessingError(_)) - | Err(BlockError::WeakSubjectivityConflict) - | Err(BlockError::InconsistentFork(_)) - | Err(BlockError::ExecutionPayloadError(_)) - | Err(BlockError::ParentExecutionPayloadInvalid { .. }) - | Err(BlockError::KnownInvalidExecutionPayload(_)) - | Err(BlockError::Slashable) - | Err(BlockError::AvailabilityCheck(_)) - | Err(BlockError::EnvelopeBlockRootUnknown(_)) - | Err(BlockError::OptimisticSyncNotSupported { .. }) - | Err(BlockError::BlobNotRequired(_)) - | Err(BlockError::InvalidBlobCount { .. }) - | Err(BlockError::BidParentRootMismatch { .. }) => block_peer_penalty(), } } diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index e67c2312068..a9d87c4b607 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -23,9 +23,10 @@ use self::parent_chain::{NodeChain, compute_parent_chains}; pub use self::single_block_lookup::DownloadResult; use self::single_block_lookup::{LookupRequestError, LookupResult, SingleBlockLookup}; -use super::manager::{BlockProcessType, BlockProcessingResult, SLOT_IMPORT_TOLERANCE}; +use super::manager::{BlockProcessType, SLOT_IMPORT_TOLERANCE}; use super::network_context::{PeerGroup, RpcResponseError, SyncNetworkContext}; use crate::metrics; +use crate::network_beacon_processor::BlockProcessingResult; use crate::sync::SyncMessage; use crate::sync::block_lookups::parent_chain::find_oldest_fork_ancestor; use beacon_chain::BeaconChainTypes; @@ -583,24 +584,29 @@ impl BlockLookups { ); let action = match result { - BlockProcessingResult::Imported(info) => { + BlockProcessingResult::Imported(fully_imported, _info) => { // `on_processing_success` is called here to ensure the request state is updated - // prior to checking if all components have been processed (relevant for - // MissingComponents). + // prior to checking if all components have been processed (relevant for the + // `!fully_imported` case below). request_state.on_processing_success()?; - if info == "missing_components" { + if fully_imported { + Action::Continue + } else { + // Block processing returned `Ok(MissingComponents)`: the block is valid but + // data sidecars are still required to satisfy availability. The lookup must + // stay alive and re-issue component requests; completing it here would drop + // the lookup before the data is fetched. if lookup.all_components_processed() { - // We don't request for other block components until being sure that the - // block has data. If we request blobs / columns to a peer we are sure - // those must exist. Therefore if all components are processed and we - // still receive `MissingComponents` it indicates an internal bug. - return Err(LookupRequestError::MissingComponentsAfterAllProcessed); + // Defensive: if every component has been processed but the producer still + // sees missing components, the lookup state and the DA checker have + // diverged. Treat as an internal bug and drop the lookup. + return Err(LookupRequestError::Failed( + "missing components after all processed".to_owned(), + )); } else { Action::Retry } - } else { - Action::Continue } } BlockProcessingResult::ParentUnknown { parent_root } => { @@ -624,8 +630,8 @@ impl BlockLookups { "Lookup component processing failed; retrying" ); let peer_group = request_state.on_processing_failure()?; - if let Some((action_kind, whom)) = penalty { - whom.apply(action_kind, &peer_group, reason, cx); + if let Some((action_kind, whom, msg)) = penalty { + whom.apply(action_kind, &peer_group, msg, cx); } Action::Retry } diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 23bfd531f0f..9514eba2c94 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -42,9 +42,6 @@ pub enum LookupRequestError { BadState(String), /// Lookup failed for some other reason and should be dropped Failed(/* reason: */ String), - /// Received MissingComponents when all components have been processed. This should never - /// happen, and indicates some internal bug - MissingComponentsAfterAllProcessed, /// Attempted to retrieve a not known lookup id UnknownLookup, /// Received a download result for a different request id than the in-flight request. diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index f41670382aa..f33501f8a6e 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -40,7 +40,9 @@ use super::network_context::{ }; use super::peer_sync_info::{PeerSyncType, remote_sync_type}; use super::range_sync::{EPOCHS_PER_BATCH, RangeSync, RangeSyncType}; -use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor}; +use crate::network_beacon_processor::{ + BlockProcessingResult, ChainSegmentProcessId, NetworkBeaconProcessor, +}; use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::block_lookups::{ @@ -203,57 +205,6 @@ impl BlockProcessType { } } -/// The classified outcome of submitting a block / blob / column for processing. The producer -/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this -/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. -#[derive(Debug)] -pub enum BlockProcessingResult { - /// Data was imported (or already present, or otherwise satisfies the lookup). `info` is a - /// short stable identifier suitable for debug logs / metrics. - Imported(&'static str), - /// Block processing reported an unknown parent. The lookup re-arms itself behind a parent - /// lookup for `parent_root` rather than retrying or penalizing. - ParentUnknown { parent_root: Hash256 }, - /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored. - Error { - penalty: Option<(PeerAction, WhichPeerToPenalize)>, - reason: &'static str, - }, -} - -/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer -/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer -/// group for data processing) and `apply` selects from it. -#[derive(Debug, Clone, Copy)] -pub enum WhichPeerToPenalize { - /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer - /// or the blob peer — i.e. the peer responsible for the component as a whole). - BlockPeer, - /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. - CustodyPeerForColumn(u64), -} - -impl WhichPeerToPenalize { - /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s). - pub fn apply( - self, - action: PeerAction, - peer_group: &PeerGroup, - reason: &'static str, - cx: &mut SyncNetworkContext, - ) { - let peers: Vec = match self { - WhichPeerToPenalize::BlockPeer => peer_group.all().copied().collect(), - WhichPeerToPenalize::CustodyPeerForColumn(idx) => { - peer_group.of_index(idx as usize).copied().collect() - } - }; - for peer in peers { - cx.report_peer(peer, action, reason); - } - } -} - /// The result of processing multiple blocks (a chain segment). #[derive(Debug)] pub enum BatchProcessResult { diff --git a/beacon_node/network/src/sync/mod.rs b/beacon_node/network/src/sync/mod.rs index 054bab654c2..f121c1f1b7e 100644 --- a/beacon_node/network/src/sync/mod.rs +++ b/beacon_node/network/src/sync/mod.rs @@ -15,4 +15,5 @@ mod range_sync; mod tests; pub use manager::{BatchProcessResult, SyncMessage}; +pub use network_context::{PeerGroup, SyncNetworkContext}; pub use range_sync::ChainId; diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 956ed01683c..245accbd7d2 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1,15 +1,13 @@ use super::*; use crate::NetworkMessage; +use crate::network_beacon_processor::{BlockProcessingResult, WhichPeerToPenalize}; use crate::network_beacon_processor::{ ChainSegmentProcessId, InvalidBlockStorage, NetworkBeaconProcessor, }; use crate::sync::block_lookups::{BlockLookupSummary, PARENT_DEPTH_TOLERANCE}; use crate::sync::{ SyncMessage, - manager::{ - BatchProcessResult, BlockProcessType, BlockProcessingResult, SyncManager, - WhichPeerToPenalize, - }, + manager::{BatchProcessResult, BlockProcessType, SyncManager}, }; use beacon_chain::blob_verification::KzgVerifiedBlob; use beacon_chain::block_verification_types::LookupBlock; @@ -2094,8 +2092,9 @@ async fn too_many_processing_failures(depth: usize) { penalty: Some(( PeerAction::MidToleranceError, WhichPeerToPenalize::BlockPeer, + "lookup_block_processing_failure", )), - reason: "lookup_block_processing_failure", + reason: "lookup_block_processing_failure".to_string(), }), ) .await; @@ -2152,7 +2151,7 @@ async fn test_single_block_lookup_ignored_response() { r.simulate( SimulateConfig::new().with_process_result(|| BlockProcessingResult::Error { penalty: None, - reason: "processor_overloaded", + reason: "processor_overloaded".to_string(), }), ) .await; @@ -2171,7 +2170,8 @@ async fn test_single_block_lookup_duplicate_response() { r.build_chain_and_trigger_last_block(1).await; // Send a DuplicateFullyImported response, the lookup should complete successfully r.simulate( - SimulateConfig::new().with_process_result(|| BlockProcessingResult::Imported("duplicate")), + SimulateConfig::new() + .with_process_result(|| BlockProcessingResult::Imported(true, "duplicate")), ) .await; // The block was not actually imported @@ -2576,7 +2576,7 @@ async fn crypto_on_fail_with_invalid_block_signature() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_block_processing_failure"); + r.assert_penalties_of_type("InvalidSignature"); } } @@ -2594,7 +2594,7 @@ async fn crypto_on_fail_with_bad_blob_proposer_signature() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_blobs_processing_failure"); + r.assert_penalties_of_type("InvalidSignature"); } } @@ -2612,7 +2612,7 @@ async fn crypto_on_fail_with_bad_blob_kzg_proof() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_blobs_processing_failure"); + r.assert_penalties_of_type("InvalidBlobs"); } } @@ -2630,7 +2630,7 @@ async fn crypto_on_fail_with_bad_column_proposer_signature() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_custody_column_processing_failure"); + r.assert_penalties_of_type("InvalidSignature"); } } @@ -2648,6 +2648,6 @@ async fn crypto_on_fail_with_bad_column_kzg_proof() { r.assert_no_penalties(); } else { r.assert_failed_lookup_sync(); - r.assert_penalties_of_type("lookup_custody_column_processing_failure"); + r.assert_penalties_of_type("AvailabilityCheck"); } } From c5cf95e9c49a98274b7a92e0a2c64edb3bf2cdec Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 21 May 2026 06:22:06 +0200 Subject: [PATCH 4/5] Tidy --- .../src/network_beacon_processor/mod.rs | 3 +- .../network_beacon_processor/sync_methods.rs | 55 ++++++------------- .../network/src/sync/block_lookups/mod.rs | 23 +++----- .../src/sync/block_sidecar_coupling.rs | 2 +- .../network/src/sync/network_context.rs | 2 +- 5 files changed, 26 insertions(+), 59 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 6f1bb54a037..d259af9b52d 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -32,10 +32,9 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, instrument, trace, warn}; use types::*; -pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; -// Re-exported for test construction of `BlockProcessingResult::Error.penalty` tuples. #[allow(unused_imports)] pub use sync_methods::WhichPeerToPenalize; +pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; use types::data::FixedBlobSidecarList; pub type Error = TrySendError>; diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index fff9cccf4a1..6a9d9890c50 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -243,8 +243,8 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: result.into(), process_type, + result: result.into(), }); // Drop the handle to remove the entry from the cache @@ -354,8 +354,8 @@ impl NetworkBeaconProcessor { // Sync handles these results self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: result.into(), process_type, + result: result.into(), }); } @@ -431,8 +431,8 @@ impl NetworkBeaconProcessor { } self.send_sync_message(SyncMessage::BlockComponentProcessed { - result: result.into(), process_type, + result: result.into(), }); } @@ -1014,40 +1014,26 @@ impl NetworkBeaconProcessor { } } -/// The classified outcome of submitting a block / blob / column for processing. The producer -/// (`network_beacon_processor`) translates the raw beacon-chain `Result<_, BlockError>` into this -/// shape so the lookup state machine only has to resolve "which peer to penalize" symbolically. +/// The classified outcome of submitting a block / blob / column for processing, ready for the +/// lookup state machine to act on without re-inspecting `BlockError`. #[derive(Debug)] pub enum BlockProcessingResult { - /// Data was sent for processing and the producer treats this as a non-failure outcome for - /// the lookup. `fully_imported` is `true` when the lookup's block-and-data state is complete - /// (block imported, duplicate, or genesis); `false` when the block is valid but data - /// sidecars are still required (`MissingComponents`) and the lookup should keep fetching. - /// `info` is a short stable identifier suitable for debug logs / metrics. + /// `fully_imported` is true if the lookup is complete; false if `MissingComponents` (the + /// lookup must keep fetching). `info` is a stable label for logs / metrics. Imported(bool, &'static str), - /// Block processing reported an unknown parent. The lookup re-arms itself behind a parent - /// lookup for `parent_root` rather than retrying or penalizing. - ParentUnknown { parent_root: Hash256 }, + ParentUnknown { + parent_root: Hash256, + }, /// Processing failed. `penalty` is `Some` when an attributable peer should be downscored; - /// its third element is the `&'static str` passed to `report_peer` for scoring telemetry. - /// `reason` is a free-form description used for debug logs only. + /// the third tuple element is the `report_peer` telemetry msg. `reason` is for logs only. Error { penalty: Option<(PeerAction, WhichPeerToPenalize, &'static str)>, reason: String, }, } -/// Translate the beacon-chain processing outcome into a `BlockProcessingResult` the lookup state -/// machine can act on directly. The policy decisions about *whether* and *which peer-class* to -/// penalize live here, on the producer side, so consumers only need to resolve the symbolic -/// `WhichPeerToPenalize` to an actual peer id at penalty time. impl From> for BlockProcessingResult { fn from(result: Result) -> Self { - // Emit the full `BlockError` debug repr before consuming `result` so downstream debugging - // isn't limited to the symbolic `reason` strings carried by `BlockProcessingResult`. - - /// Penalty against the block peer, using `err`'s variant name (via `IntoStaticStr`) as the - /// peer-scoring telemetry message. fn block_peer_penalty>( err: E, ) -> Option<(PeerAction, WhichPeerToPenalize, &'static str)> { @@ -1082,10 +1068,7 @@ impl From> for BlockProcessingR (&e).into(), )), inner => match inner.category() { - // Internal AC errors are our fault; do not penalize the peer. AvailabilityCheckErrorCategory::Internal => None, - // Malicious AC errors (bad blobs, bad columns w/o index, kzg - // mismatch, etc.) are attributable to the data peer. AvailabilityCheckErrorCategory::Malicious => block_peer_penalty(inner), }, }, @@ -1096,9 +1079,8 @@ impl From> for BlockProcessingR None } } - // All remaining invalid blocks attribute to the block peer (which is also the data - // peer pre-Gloas). Listed explicitly to keep this match exhaustive so future - // `BlockError` variants force a compile error and a deliberate classification choice. + // Remaining invalid blocks: penalize the block peer. Listed explicitly so a + // new `BlockError` variant forces a compile error here. BlockError::FutureSlot { .. } | BlockError::StateRootMismatch { .. } | BlockError::WouldRevertFinalizedSlot { .. } @@ -1131,21 +1113,16 @@ impl From> for BlockProcessingR } } -/// Symbolic identifier for the peer(s) the lookup should resolve and downscore. The consumer -/// passes in the relevant `PeerGroup` (a singleton for block processing, the in-flight data peer -/// group for data processing) and `apply` selects from it. +/// Selector for which peer(s) in a `PeerGroup` to downscore. #[derive(Debug, Clone, Copy)] pub enum WhichPeerToPenalize { - /// All peers in the passed `PeerGroup` (typically a singleton constructed from the block peer - /// or the blob peer — i.e. the peer responsible for the component as a whole). + /// All peers in the group (block peer, or all data peers). BlockPeer, - /// The custody peer(s) that served a specific column index in the passed `PeerGroup`. + /// Only the peer(s) that served the given column index. CustodyPeerForColumn(u64), } impl WhichPeerToPenalize { - /// Resolve this symbolic identifier against `peer_group` and downscore the matching peer(s) - /// with the given action and telemetry message. pub fn apply( self, action: PeerAction, diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index a9d87c4b607..2dfead74100 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -586,27 +586,18 @@ impl BlockLookups { let action = match result { BlockProcessingResult::Imported(fully_imported, _info) => { // `on_processing_success` is called here to ensure the request state is updated - // prior to checking if all components have been processed (relevant for the - // `!fully_imported` case below). + // prior to checking if all components have been processed (relevant for + // MissingComponents). request_state.on_processing_success()?; if fully_imported { Action::Continue + } else if lookup.all_components_processed() { + return Err(LookupRequestError::Failed( + "missing components after all processed".to_owned(), + )); } else { - // Block processing returned `Ok(MissingComponents)`: the block is valid but - // data sidecars are still required to satisfy availability. The lookup must - // stay alive and re-issue component requests; completing it here would drop - // the lookup before the data is fetched. - if lookup.all_components_processed() { - // Defensive: if every component has been processed but the producer still - // sees missing components, the lookup state and the DA checker have - // diverged. Treat as an internal bug and drop the lookup. - return Err(LookupRequestError::Failed( - "missing components after all processed".to_owned(), - )); - } else { - Action::Retry - } + Action::Retry } } BlockProcessingResult::ParentUnknown { parent_root } => { diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index bb43396473f..c8cf7b68e3e 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -62,7 +62,7 @@ enum RangeBlockDataRequest { } #[derive(Debug)] -pub(crate) enum CouplingError { +pub enum CouplingError { InternalError(String), /// The peer we requested the columns from was faulty/malicious DataColumnPeerFailure { diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 096eaa28490..465e23998b9 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -100,7 +100,7 @@ pub enum RpcResponseError { RpcError(#[allow(dead_code)] RPCError), VerifyError(LookupVerifyError), CustodyRequestError(#[allow(dead_code)] CustodyRequestError), - BlockComponentCouplingError(#[allow(private_interfaces)] CouplingError), + BlockComponentCouplingError(CouplingError), } #[derive(Debug, PartialEq, Eq)] From 18d76b936eb440d4497e46d82063c61dfb44dc86 Mon Sep 17 00:00:00 2001 From: dapplion <35266934+dapplion@users.noreply.github.com> Date: Thu, 21 May 2026 06:36:27 +0200 Subject: [PATCH 5/5] Tidy --- beacon_node/network/src/network_beacon_processor/mod.rs | 4 +--- beacon_node/network/src/sync/block_lookups/mod.rs | 4 ++++ beacon_node/network/src/sync/tests/lookups.rs | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index d259af9b52d..0b15fd030bd 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -32,8 +32,6 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{debug, error, instrument, trace, warn}; use types::*; -#[allow(unused_imports)] -pub use sync_methods::WhichPeerToPenalize; pub use sync_methods::{BlockProcessingResult, ChainSegmentProcessId}; use types::data::FixedBlobSidecarList; @@ -41,7 +39,7 @@ pub type Error = TrySendError>; mod gossip_methods; mod rpc_methods; -mod sync_methods; +pub(crate) mod sync_methods; mod tests; pub(crate) const FUTURE_SLOT_TOLERANCE: u64 = 1; diff --git a/beacon_node/network/src/sync/block_lookups/mod.rs b/beacon_node/network/src/sync/block_lookups/mod.rs index 2dfead74100..a65abcd07ba 100644 --- a/beacon_node/network/src/sync/block_lookups/mod.rs +++ b/beacon_node/network/src/sync/block_lookups/mod.rs @@ -593,6 +593,10 @@ impl BlockLookups { if fully_imported { Action::Continue } else if lookup.all_components_processed() { + // We don't request for other block components until being sure that the block has + // data. If we request blobs / columns to a peer we are sure those must exist. + // Therefore if all components are processed and we still receive `MissingComponents` + // it indicates an internal bug. return Err(LookupRequestError::Failed( "missing components after all processed".to_owned(), )); diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 245accbd7d2..803db7eb6b3 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -1,6 +1,7 @@ use super::*; use crate::NetworkMessage; -use crate::network_beacon_processor::{BlockProcessingResult, WhichPeerToPenalize}; +use crate::network_beacon_processor::BlockProcessingResult; +use crate::network_beacon_processor::sync_methods::WhichPeerToPenalize; use crate::network_beacon_processor::{ ChainSegmentProcessId, InvalidBlockStorage, NetworkBeaconProcessor, };