Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
let (committee_opt, committees_per_slot) = chain.with_committee_cache(
attestation.data.target.root,
attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
|cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committee_opt = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.committee_index)
.map(|beacon_committee| beacon_committee.committee.to_vec());
Expand Down Expand Up @@ -1574,7 +1575,8 @@ where
return Err(Error::UnknownTargetRoot(target.root));
}

chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
Expand Down
174 changes: 24 additions & 150 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ use crate::persisted_custody::persist_custody_context;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::shuffling_cache::{
CachedShuffling, ShufflingCache, get_ptcs_for_shuffling_epoch, with_cached_shuffling,
};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand Down Expand Up @@ -472,7 +474,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache<T::EthSpec>>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
Expand Down Expand Up @@ -1696,7 +1698,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (duties, dependent_root) = self.with_committee_cache(
head_block_root,
epoch,
|committee_cache, dependent_root| {
|cached_shuffling, dependent_root| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let duties = validator_indices
.iter()
.map(|validator_index| {
Expand Down Expand Up @@ -4912,9 +4915,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch());
let ptcs = get_ptcs_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs);
self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, committee_cache);
.insert_committee_cache_with_ptc(shuffling_id, cached_shuffling);
}
}
Ok(())
Expand Down Expand Up @@ -6981,11 +6987,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

/// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head
/// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head
/// `head_block_root`. The `map_fn` will be supplied two values:
///
/// - `&CommitteeCache`: the committee cache that serves the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`.
/// - `&CachedShuffling`: the committee cache and optional PTCs that serve the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling.
///
/// It's not necessary that `head_block_root` matches our current view of the chain, it can be
/// any block that is:
Expand All @@ -7002,12 +7008,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Notes
///
/// This function exists in this odd "map" pattern because efficiently obtaining a committee
/// This function exists in this odd "map" pattern because efficiently obtaining a shuffling
/// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache`
/// or it might involve reading it from a state from the DB. Due to the complexities of
/// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here.
///
/// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the
/// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the
/// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`.
pub fn with_committee_cache<F, R>(
&self,
Expand All @@ -7016,149 +7022,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
map_fn: F,
) -> Result<R, Error>
where
F: Fn(&CommitteeCache, Hash256) -> Result<R, Error>,
F: Fn(&CachedShuffling<T::EthSpec>, Hash256) -> Result<R, Error>,
{
let head_block = self
.canonical_head
.fork_choice_read_lock()
.get_block(&head_block_root)
.ok_or(Error::MissingBeaconBlock(head_block_root))?;

let shuffling_id = BlockShufflingIds {
current: head_block.current_epoch_shuffling_id.clone(),
next: head_block.next_epoch_shuffling_id.clone(),
previous: None,
block_root: head_block.root,
}
.id_for_epoch(shuffling_epoch)
.ok_or_else(|| Error::InvalidShufflingId {
with_cached_shuffling(
&self.canonical_head,
&self.shuffling_cache,
&self.store,
&self.spec,
head_block_root,
shuffling_epoch,
head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()),
})?;

// Obtain the shuffling cache, timing how long we wait.
let mut shuffling_cache = {
let _ =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
self.shuffling_cache.write()
};

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
// access.
drop(shuffling_cache);

let committee_cache = cache_item.wait()?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
// This avoids the case where multiple threads attempt to produce the same value at the
// same time.
//
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
// promise from being created twice.
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;

// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);

debug!(
shuffling_id = ?shuffling_epoch,
head_block_root = head_block_root.to_string(),
"Committee cache miss"
);

// If the block's state will be so far ahead of `shuffling_epoch` that even its
// previous epoch committee cache will be too new, then error. Callers of this function
// shouldn't be requesting such old shufflings for this `head_block_root`.
let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch());
if head_block_epoch > shuffling_epoch + 1 {
return Err(Error::InvalidStateForShuffling {
state_epoch: head_block_epoch,
shuffling_epoch,
});
}

let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);

// If the head of the chain can serve this request, use it.
//
// This code is a little awkward because we need to ensure that the head we read and
// the head we copy is identical. Taking one lock to read the head values and another
// to copy the head is liable to race-conditions.
let head_state_opt = self.with_head(|head| {
if head.beacon_block_root == head_block_root {
Ok(Some((head.beacon_state.clone(), head.beacon_state_root())))
} else {
Ok::<_, Error>(None)
}
})?;

// Compute the `target_slot` to advance the block's state to.
//
// Since there's a one-epoch look-ahead on the attester shuffling, it suffices to
// only advance into the first slot of the epoch prior to `shuffling_epoch`.
//
// If the `head_block` is already ahead of that slot, then we should load the state
// at that slot, as we've determined above that the `shuffling_epoch` cache will
// not be too far in the past.
let target_slot = std::cmp::max(
shuffling_epoch
.saturating_sub(1_u64)
.start_slot(T::EthSpec::slots_per_epoch()),
head_block.slot,
);

// If the head state is useful for this request, use it. Otherwise, read a state from
// disk that is advanced as close as possible to `target_slot`.
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
} else {
// We assume that the `Pending` state has the same shufflings as a `Full` state
// for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};

metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);

// If the state is still in an earlier epoch, advance it to the `target_slot` so
// that its next epoch committee cache matches the `shuffling_epoch`.
if state.current_epoch() + 1 < shuffling_epoch {
// Advance the state into the required slot, using the "partial" method since the
// state roots are not relevant for the shuffling.
partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?;
}
metrics::stop_timer(state_skip_timer);

let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);

let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch)
.map_err(Error::IncorrectStateForAttestation)?;

state.build_committee_cache(relative_epoch, &self.spec)?;

let committee_cache = state.committee_cache(relative_epoch)?.clone();
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);

sender.send(committee_cache.clone());

map_fn(&committee_cache, shuffling_decision_block)
}
map_fn,
)
}

/// Dumps the entire canonical chain, from the head to genesis to a vector for analysis.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@ use super::Error;
use crate::beacon_chain::BeaconStore;
use crate::canonical_head::CanonicalHead;
use crate::observed_attesters::ObservedPayloadAttesters;
use crate::shuffling_cache::{ShufflingCache, with_cached_shuffling};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, metrics};
use bls::AggregateSignature;
use educe::Educe;
use eth2::types::{EventKind, ForkVersionedResponse};
use parking_lot::RwLock;
use safe_arith::SafeArith;
use slot_clock::SlotClock;
use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set;
use state_processing::state_advance::partial_state_advance;
use state_processing::per_block_processing::signature_sets::indexed_payload_attestation_signature_set_from_pubkeys;
use std::borrow::Cow;
use types::{ChainSpec, EthSpec, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot};
use types::{
ChainSpec, EthSpec, Hash256, IndexedPayloadAttestation, PTC, PayloadAttestationMessage, Slot,
};

pub struct GossipVerificationContext<'a, T: BeaconChainTypes> {
pub slot_clock: &'a T::SlotClock,
pub spec: &'a ChainSpec,
pub observed_payload_attesters: &'a RwLock<ObservedPayloadAttesters<T::EthSpec>>,
pub canonical_head: &'a CanonicalHead<T>,
pub shuffling_cache: &'a RwLock<ShufflingCache<T::EthSpec>>,
pub validator_pubkey_cache: &'a RwLock<ValidatorPubkeyCache<T>>,
pub store: &'a BeaconStore<T>,
pub genesis_validators_root: Hash256,
}

/// A `PayloadAttestationMessage` that has been verified for propagation on the gossip network.
Expand Down Expand Up @@ -76,56 +79,19 @@ impl<T: BeaconChainTypes> VerifiedPayloadAttestationMessage<T> {
return Err(Error::UnknownHeadBlock { beacon_block_root });
}

// Get head state for PTC computation. If the cached head state is too stale
// (e.g. during liveness failures with many skipped slots), fall back to loading
// a more recent state from the store and advancing it if necessary.
let head = ctx.canonical_head.cached_head();
let head_state = &head.snapshot.beacon_state;

let message_epoch = slot.epoch(T::EthSpec::slots_per_epoch());
let state_epoch = head_state.current_epoch();

// get_ptc can serve epochs in [state_epoch - 1, state_epoch + min_seed_lookahead].
// If the message epoch is beyond that range, the head state is stale.
let advanced_state = if message_epoch
> state_epoch
.safe_add(ctx.spec.min_seed_lookahead)
.map_err(BeaconChainError::from)?
{
let head_block_root = head.head_block_root();
let target_slot = message_epoch.start_slot(T::EthSpec::slots_per_epoch());

let (state_root, mut state) = ctx
.store
.get_advanced_hot_state(
head_block_root,
target_slot,
head.snapshot.beacon_state_root(),
)
.map_err(BeaconChainError::from)?
.ok_or(BeaconChainError::MissingBeaconState(
head.snapshot.beacon_state_root(),
))?;

if state
.current_epoch()
.safe_add(ctx.spec.min_seed_lookahead)
.map_err(BeaconChainError::from)?
< message_epoch
{
partial_state_advance(&mut state, Some(state_root), target_slot, ctx.spec)
.map_err(BeaconChainError::from)?;
}

Some(state)
} else {
None
};

let state = advanced_state.as_ref().unwrap_or(head_state);
let ptc = with_cached_shuffling(
ctx.canonical_head,
ctx.shuffling_cache,
ctx.store,
ctx.spec,
beacon_block_root,
message_epoch,
|cached_shuffling, _| Ok::<_, Error>(cached_shuffling.ptc_for_slot(slot).cloned()),
)?
.ok_or(Error::MissingPTC { slot })?;

// [REJECT] `validator_index` is within `get_ptc(state, data.slot)`.
let ptc = state.get_ptc(slot, ctx.spec)?;
if !ptc.0.contains(&(validator_index as usize)) {
return Err(Error::NotInPTC {
validator_index,
Expand All @@ -145,11 +111,13 @@ impl<T: BeaconChainTypes> VerifiedPayloadAttestationMessage<T> {
{
// [REJECT] The signature is valid with respect to the `validator_index`.
let pubkey_cache = ctx.validator_pubkey_cache.read();
let signature_set = indexed_payload_attestation_signature_set(
state,
let fork = ctx.spec.fork_at_epoch(message_epoch);
let signature_set = indexed_payload_attestation_signature_set_from_pubkeys(
|validator_index| pubkey_cache.get(validator_index).map(Cow::Borrowed),
&indexed_payload_attestation.signature,
&indexed_payload_attestation,
&fork,
ctx.genesis_validators_root,
ctx.spec,
)
.map_err(|_| Error::UnknownValidatorIndex(validator_index))?;
Expand Down Expand Up @@ -204,8 +172,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
spec: &self.spec,
observed_payload_attesters: &self.observed_payload_attesters,
canonical_head: &self.canonical_head,
shuffling_cache: &self.shuffling_cache,
validator_pubkey_cache: &self.validator_pubkey_cache,
store: &self.store,
genesis_validators_root: self.genesis_validators_root,
}
}

Expand Down
Loading
Loading