Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,8 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> {
let (committee_opt, committees_per_slot) = chain.with_committee_cache(
attestation.data.target.root,
attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()),
|committee_cache, _| {
|cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committee_opt = committee_cache
.get_beacon_committee(attestation.data.slot, attestation.committee_index)
.map(|beacon_committee| beacon_committee.committee.to_vec());
Expand Down Expand Up @@ -1574,7 +1575,8 @@ where
return Err(Error::UnknownTargetRoot(target.root));
}

chain.with_committee_cache(target.root, attestation_epoch, |committee_cache, _| {
chain.with_committee_cache(target.root, attestation_epoch, |cached_shuffling, _| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let committees_per_slot = committee_cache.committees_per_slot();

Ok(committee_cache
Expand Down
185 changes: 35 additions & 150 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ use crate::persisted_custody::persist_custody_context;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::pre_finalization_cache::PreFinalizationBlockCache;
use crate::proposer_preferences_verification::proposer_preference_cache::GossipVerifiedProposerPreferenceCache;
use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::shuffling_cache::{
CachedShuffling, ShufflingCache, get_ptcs_for_shuffling_epoch, with_cached_shuffling,
};
use crate::sync_committee_verification::{
Error as SyncCommitteeError, VerifiedSyncCommitteeMessage, VerifiedSyncContribution,
};
Expand Down Expand Up @@ -472,7 +474,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// HTTP server is enabled.
pub event_handler: Option<ServerSentEventHandler<T::EthSpec>>,
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub shuffling_cache: RwLock<ShufflingCache>,
pub shuffling_cache: RwLock<ShufflingCache<T::EthSpec>>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub beacon_proposer_cache: Arc<Mutex<BeaconProposerCache>>,
/// Caches a map of `validator_index -> validator_pubkey`.
Expand Down Expand Up @@ -1696,7 +1698,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let (duties, dependent_root) = self.with_committee_cache(
head_block_root,
epoch,
|committee_cache, dependent_root| {
|cached_shuffling, dependent_root| {
let committee_cache = cached_shuffling.committee_cache.as_ref();
let duties = validator_indices
.iter()
.map(|validator_index| {
Expand Down Expand Up @@ -4906,15 +4909,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), BlockError> {
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;
let shuffling_epoch = relative_epoch.into_epoch(state.current_epoch());

let shuffling_is_cached = self.shuffling_cache.read().contains(&shuffling_id);

// Skip priming the cache for `shuffling_epoch` if it is Gloas but the state is not:
// we do not have the PTCs on hand in this case.
if self
.spec
.fork_name_at_epoch(shuffling_epoch)
.gloas_enabled()
&& !state.fork_name_unchecked().gloas_enabled()
{
continue;
}

if !shuffling_is_cached {
state.build_committee_cache(relative_epoch, &self.spec)?;
let committee_cache = state.committee_cache(relative_epoch)?;
let ptcs = get_ptcs_for_shuffling_epoch(state, shuffling_epoch, &self.spec)?;
let cached_shuffling = CachedShuffling::new(committee_cache.clone(), ptcs);
self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, committee_cache);
.insert_committee_cache_with_ptcs(shuffling_id, cached_shuffling, &self.spec)?;
}
}
Ok(())
Expand Down Expand Up @@ -6981,11 +6998,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
)
}

/// Runs the `map_fn` with the committee cache for `shuffling_epoch` from the chain with head
/// Runs the `map_fn` with the cached shuffling for `shuffling_epoch` from the chain with head
/// `head_block_root`. The `map_fn` will be supplied two values:
///
/// - `&CommitteeCache`: the committee cache that serves the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the `CommitteeCache`.
/// - `&CachedShuffling`: the committee cache and optional PTCs that serve the given parameters.
/// - `Hash256`: the "shuffling decision root" which uniquely identifies the cached shuffling.
///
/// It's not necessary that `head_block_root` matches our current view of the chain, it can be
/// any block that is:
Expand All @@ -7002,12 +7019,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// ## Notes
///
/// This function exists in this odd "map" pattern because efficiently obtaining a committee
/// This function exists in this odd "map" pattern because efficiently obtaining a shuffling
/// can be complex. It might involve reading straight from the `beacon_chain.shuffling_cache`
/// or it might involve reading it from a state from the DB. Due to the complexities of
/// `RwLock`s on the shuffling cache, a simple `Cow` isn't suitable here.
///
/// If the committee for `(head_block_root, shuffling_epoch)` isn't found in the
/// If the shuffling for `(head_block_root, shuffling_epoch)` isn't found in the
/// `shuffling_cache`, we will read a state from disk and then update the `shuffling_cache`.
pub fn with_committee_cache<F, R>(
&self,
Expand All @@ -7016,149 +7033,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
map_fn: F,
) -> Result<R, Error>
where
F: Fn(&CommitteeCache, Hash256) -> Result<R, Error>,
F: Fn(&CachedShuffling<T::EthSpec>, Hash256) -> Result<R, Error>,
{
let head_block = self
.canonical_head
.fork_choice_read_lock()
.get_block(&head_block_root)
.ok_or(Error::MissingBeaconBlock(head_block_root))?;

let shuffling_id = BlockShufflingIds {
current: head_block.current_epoch_shuffling_id.clone(),
next: head_block.next_epoch_shuffling_id.clone(),
previous: None,
block_root: head_block.root,
}
.id_for_epoch(shuffling_epoch)
.ok_or_else(|| Error::InvalidShufflingId {
with_cached_shuffling(
&self.canonical_head,
&self.shuffling_cache,
&self.store,
&self.spec,
head_block_root,
shuffling_epoch,
head_block_epoch: head_block.slot.epoch(T::EthSpec::slots_per_epoch()),
})?;

// Obtain the shuffling cache, timing how long we wait.
let mut shuffling_cache = {
let _ =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_SHUFFLING_CACHE_WAIT_TIMES);
self.shuffling_cache.write()
};

if let Some(cache_item) = shuffling_cache.get(&shuffling_id) {
// The shuffling cache is no longer required, drop the write-lock to allow concurrent
// access.
drop(shuffling_cache);

let committee_cache = cache_item.wait()?;
map_fn(&committee_cache, shuffling_id.shuffling_decision_block)
} else {
// Create an entry in the cache that "promises" this value will eventually be computed.
// This avoids the case where multiple threads attempt to produce the same value at the
// same time.
//
// Creating the promise whilst we hold the `shuffling_cache` lock will prevent the same
// promise from being created twice.
let sender = shuffling_cache.create_promise(shuffling_id.clone())?;

// Drop the shuffling cache to avoid holding the lock for any longer than
// required.
drop(shuffling_cache);

debug!(
shuffling_id = ?shuffling_epoch,
head_block_root = head_block_root.to_string(),
"Committee cache miss"
);

// If the block's state will be so far ahead of `shuffling_epoch` that even its
// previous epoch committee cache will be too new, then error. Callers of this function
// shouldn't be requesting such old shufflings for this `head_block_root`.
let head_block_epoch = head_block.slot.epoch(T::EthSpec::slots_per_epoch());
if head_block_epoch > shuffling_epoch + 1 {
return Err(Error::InvalidStateForShuffling {
state_epoch: head_block_epoch,
shuffling_epoch,
});
}

let state_read_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_READ_TIMES);

// If the head of the chain can serve this request, use it.
//
// This code is a little awkward because we need to ensure that the head we read and
// the head we copy is identical. Taking one lock to read the head values and another
// to copy the head is liable to race-conditions.
let head_state_opt = self.with_head(|head| {
if head.beacon_block_root == head_block_root {
Ok(Some((head.beacon_state.clone(), head.beacon_state_root())))
} else {
Ok::<_, Error>(None)
}
})?;

// Compute the `target_slot` to advance the block's state to.
//
// Since there's a one-epoch look-ahead on the attester shuffling, it suffices to
// only advance into the first slot of the epoch prior to `shuffling_epoch`.
//
// If the `head_block` is already ahead of that slot, then we should load the state
// at that slot, as we've determined above that the `shuffling_epoch` cache will
// not be too far in the past.
let target_slot = std::cmp::max(
shuffling_epoch
.saturating_sub(1_u64)
.start_slot(T::EthSpec::slots_per_epoch()),
head_block.slot,
);

// If the head state is useful for this request, use it. Otherwise, read a state from
// disk that is advanced as close as possible to `target_slot`.
let (mut state, state_root) = if let Some((state, state_root)) = head_state_opt {
(state, state_root)
} else {
// We assume that the `Pending` state has the same shufflings as a `Full` state
// for the same block. Analysis: https://hackmd.io/@dapplion/gloas_dependant_root
let (state_root, state) = self
.store
.get_advanced_hot_state(head_block_root, target_slot, head_block.state_root)?
.ok_or(Error::MissingBeaconState(head_block.state_root))?;
(state, state_root)
};

metrics::stop_timer(state_read_timer);
let state_skip_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_STATE_SKIP_TIMES);

// If the state is still in an earlier epoch, advance it to the `target_slot` so
// that its next epoch committee cache matches the `shuffling_epoch`.
if state.current_epoch() + 1 < shuffling_epoch {
// Advance the state into the required slot, using the "partial" method since the
// state roots are not relevant for the shuffling.
partial_state_advance(&mut state, Some(state_root), target_slot, &self.spec)?;
}
metrics::stop_timer(state_skip_timer);

let committee_building_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_COMMITTEE_BUILDING_TIMES);

let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), shuffling_epoch)
.map_err(Error::IncorrectStateForAttestation)?;

state.build_committee_cache(relative_epoch, &self.spec)?;

let committee_cache = state.committee_cache(relative_epoch)?.clone();
let shuffling_decision_block = shuffling_id.shuffling_decision_block;

self.shuffling_cache
.write()
.insert_committee_cache(shuffling_id, &committee_cache);

metrics::stop_timer(committee_building_timer);

sender.send(committee_cache.clone());

map_fn(&committee_cache, shuffling_decision_block)
}
map_fn,
)
}

/// Dumps the entire canonical chain, from the head to genesis to a vector for analysis.
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ pub enum BeaconChainError {
state_epoch: Epoch,
shuffling_epoch: Epoch,
},
MissingPtcForGloasShuffling {
shuffling_epoch: Epoch,
},
SyncDutiesError(BeaconStateError),
InconsistentForwardsIter {
request_slot: Slot,
Expand Down
Loading
Loading