Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
let (committee_opt, committees_per_slot) = chain.with_committee_cache(
attestation.data.target.root,
attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
|cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committee_opt = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.committee_index)
.map(|beacon_committee| beacon_committee.committee.to_vec());
Expand Down Expand Up @@ -1574,7 +1575,8 @@ where
return Err(Error::UnknownTargetRoot(target.root));
}

chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
Expand Down
183 changes: 33 additions & 150 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ use crate::persisted_custody::persist_custody_context;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::shuffling_cache::{CachedPTCs, CachedShuffling, ShufflingCache, with_cached_shuffling};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand Down Expand Up @@ -472,7 +472,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache<T::EthSpec>>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
Expand Down Expand Up @@ -1696,7 +1696,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (duties, dependent_root) = self.with_committee_cache(
head_block_root,
epoch,
|committee_cache, dependent_root| {
|cached_shuffling, dependent_root| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let duties = validator_indices
.iter()
.map(|validator_index| {
Expand Down Expand Up @@ -4906,15 +4907,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), BlockError> {
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch());

let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id);

// Skip priming the cache for `shuffling_epoch` if it is Gloas but the state is not:
// we do not have the PTCs on hand in this case.
if self
.spec
.fork_name_at_epoch(shuffling_epoch)
.gloas_enabled()
&& !state.fork_name_unchecked().gloas_enabled()
{
continue;
}

if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
let ptcs = CachedPTCs::from_state(state, shuffling_epoch, &self.spec)?;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs);
self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, committee_cache);
.insert_committee_cache(shuffling_id, cached_shuffling)?;
}
}
Ok(())
Expand Down Expand Up @@ -6981,11 +6996,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

/// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head
/// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head
/// `head_block_root`. The `map_fn` will be supplied two values:
///
/// - `&CommitteeCache`: the committee cache that serves the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`.
/// - `&CachedShuffling`: the committee cache and optional PTCs that serve the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling.
///
/// It's not necessary that `head_block_root` matches our current view of the chain, it can be
/// any block that is:
Expand All @@ -7002,12 +7017,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Notes
///
/// This function exists in this odd "map" pattern because efficiently obtaining a committee
/// This function exists in this odd "map" pattern because efficiently obtaining a shuffling
/// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache`
/// or it might involve reading it from a state from the DB. Due to the complexities of
/// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here.
///
/// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the
/// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the
/// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`.
pub fn with_committee_cache<F, R>(
&self,
Expand All @@ -7016,149 +7031,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
map_fn: F,
) -> Result<R, Error>
where
F: Fn(&CommitteeCache, Hash256) -> Result<R, Error>,
F: Fn(&CachedShuffling<T::EthSpec>, Hash256) -> Result<R, Error>,
{
let head_block = self
.canonical_head
.fork_choice_read_lock()
.get_block(&head_block_root)
.ok_or(Error::MissingBeaconBlock(head_block_root))?;

let shuffling_id = BlockShufflingIds {
current: head_block.current_epoch_shuffling_id.clone(),
next: head_block.next_epoch_shuffling_id.clone(),
previous: None,
block_root: head_block.root,
}
.id_for_epoch(shuffling_epoch)
.ok_or_else(|| Error::InvalidShufflingId {
with_cached_shuffling(
&self.canonical_head,
&self.shuffling_cache,
&self.store,
&self.spec,
head_block_root,
shuffling_epoch,
head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()),
})?;

// Obtain the shuffling cache, timing how long we wait.
let mut shuffling_cache = {
let _ =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
self.shuffling_cache.write()
};

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
// access.
drop(shuffling_cache);

let committee_cache = cache_item.wait()?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
// This avoids the case where multiple threads attempt to produce the same value at the
// same time.
//
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
// promise from being created twice.
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;

// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);

debug!(
shuffling_id = ?shuffling_epoch,
head_block_root = head_block_root.to_string(),
"Committee cache miss"
);

// If the block's state will be so far ahead of `shuffling_epoch` that even its
// previous epoch committee cache will be too new, then error. Callers of this function
// shouldn't be requesting such old shufflings for this `head_block_root`.
let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch());
if head_block_epoch > shuffling_epoch + 1 {
return Err(Error::InvalidStateForShuffling {
state_epoch: head_block_epoch,
shuffling_epoch,
});
}

let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);

// If the head of the chain can serve this request, use it.
//
// This code is a little awkward because we need to ensure that the head we read and
// the head we copy is identical. Taking one lock to read the head values and another
// to copy the head is liable to race-conditions.
let head_state_opt = self.with_head(|head| {
if head.beacon_block_root == head_block_root {
Ok(Some((head.beacon_state.clone(), head.beacon_state_root())))
} else {
Ok::<_, Error>(None)
}
})?;

// Compute the `target_slot` to advance the block's state to.
//
// Since there's a one-epoch look-ahead on the attester shuffling, it suffices to
// only advance into the first slot of the epoch prior to `shuffling_epoch`.
//
// If the `head_block` is already ahead of that slot, then we should load the state
// at that slot, as we've determined above that the `shuffling_epoch` cache will
// not be too far in the past.
let target_slot = std::cmp::max(
shuffling_epoch
.saturating_sub(1_u64)
.start_slot(T::EthSpec::slots_per_epoch()),
head_block.slot,
);

// If the head state is useful for this request, use it. Otherwise, read a state from
// disk that is advanced as close as possible to `target_slot`.
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
} else {
// We assume that the `Pending` state has the same shufflings as a `Full` state
// for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};

metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);

// If the state is still in an earlier epoch, advance it to the `target_slot` so
// that its next epoch committee cache matches the `shuffling_epoch`.
if state.current_epoch() + 1 < shuffling_epoch {
// Advance the state into the required slot, using the "partial" method since the
// state roots are not relevant for the shuffling.
partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?;
}
metrics::stop_timer(state_skip_timer);

let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);

let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch)
.map_err(Error::IncorrectStateForAttestation)?;

state.build_committee_cache(relative_epoch, &self.spec)?;

let committee_cache = state.committee_cache(relative_epoch)?.clone();
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);

sender.send(committee_cache.clone());

map_fn(&committee_cache, shuffling_decision_block)
}
map_fn,
)
}

/// Dumps the entire canonical chain, from the head to genesis to a vector for analysis.
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ pub enum BeaconChainError {
state_epoch: Epoch,
shuffling_epoch: Epoch,
},
MissingPtcForGloasShuffling {
shuffling_epoch: Epoch,
},
SyncDutiesError(BeaconStateError),
InconsistentForwardsIter {
request_slot: Slot,
Expand Down Expand Up @@ -251,6 +254,13 @@ pub enum BeaconChainError {
request_epoch: Epoch,
cache_epoch: Epoch,
},
AttesterCachePtcOutOfBounds {
slot: Slot,
epoch: Epoch,
},
AttesterCacheNoPtcPreGloas {
slot: Slot,
},
SkipProposerPreparation,
FailedColumnCustodyInfoUpdate,
}
Expand Down
Loading
Loading