diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 9dfb8304bc8..f532ef716e2 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -36,6 +36,7 @@ use rand::SeedableRng; use rand::rngs::{OsRng, StdRng}; use slasher::Slasher; use slasher_service::SlasherService; +use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -639,6 +640,10 @@ where network_globals: self.network_globals.clone(), beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()), sse_logging_components: runtime_context.sse_logging_components.clone(), + historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new( + NonZeroUsize::new(self.http_api_config.historical_committee_cache_size) + .unwrap_or(NonZeroUsize::MIN), + )), }); let exit = runtime_context.executor.exit(); diff --git a/beacon_node/http_api/src/beacon/states.rs b/beacon_node/http_api/src/beacon/states.rs index 52b05a807b6..1b765aa2276 100644 --- a/beacon_node/http_api/src/beacon/states.rs +++ b/beacon_node/http_api/src/beacon/states.rs @@ -1,4 +1,5 @@ use crate::StateId; +use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId}; use crate::task_spawner::{Priority, TaskSpawner}; use crate::utils::ResponseFilter; use crate::validator::pubkey_to_validator_index; @@ -13,7 +14,10 @@ use eth2::types::{ }; use ssz::Encode; use std::sync::Arc; -use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch}; +use types::{ + AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch, + RelativeEpochError, +}; use warp::filters::BoxedFilter; use warp::http::Response; use warp::hyper::Body; @@ -26,6 +30,8 @@ type BeaconStatesPath = BoxedFilter<( Arc>, )>; +type BeaconStatesCommitteesFilter = BoxedFilter<(Arc,)>; + // GET beacon/states/{state_id}/pending_consolidations pub fn get_beacon_state_pending_consolidations( beacon_states_path: BeaconStatesPath, @@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees( // GET beacon/states/{state_id}/committees?slot,index,epoch pub fn get_beacon_state_committees( beacon_states_path: BeaconStatesPath, + beacon_states_committees_filter: BeaconStatesCommitteesFilter, ) -> ResponseFilter { beacon_states_path .clone() .and(warp::path("committees")) .and(warp::query::()) + .and(beacon_states_committees_filter) .and(warp::path::end()) .then( |state_id: StateId, task_spawner: TaskSpawner, chain: Arc>, - query: eth2::types::CommitteesQuery| { + query: eth2::types::CommitteesQuery, + historical_committee_cache: Arc| { task_spawner.blocking_json_task(Priority::P1, move || { let (data, execution_optimistic, finalized) = state_id .map_state_and_execution_optimistic_and_finalized( @@ -364,33 +373,33 @@ pub fn get_beacon_state_committees( let shuffling_id = if let Ok(Some(shuffling_decision_block)) = chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev) { - Some(AttestationShufflingId { - shuffling_epoch: epoch, - shuffling_decision_block, - }) + Some(HistoricalShufflingId::ShufflingId( + AttestationShufflingId { + shuffling_epoch: epoch, + shuffling_decision_block, + }, + )) + } else if epoch < chain.head().finalized_checkpoint().epoch { + // Use the case for finalized epochs + Some(HistoricalShufflingId::FinalizedEpoch(epoch)) } else { None }; // Attempt to read from the chain cache if there exists a // shuffling_id - let maybe_cached_shuffling = if let Some(shuffling_id) = - shuffling_id.as_ref() - { - chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - .and_then(|mut cache_write| cache_write.get(shuffling_id)) - .and_then(|cache_item| cache_item.wait().ok()) - } else { - None - }; + let maybe_cached_shuffling = + if let Some(shuffling_id) = shuffling_id.as_ref() { + historical_committee_cache.get(shuffling_id) + } else { + None + }; let committee_cache = if let Some(shuffling) = maybe_cached_shuffling { shuffling } else { - let possibly_built_cache = match RelativeEpoch::from_epoch( + let committee_cache = match RelativeEpoch::from_epoch( current_epoch, epoch, ) { @@ -401,11 +410,19 @@ pub fn get_beacon_state_committees( { state.committee_cache(relative_epoch).cloned() } - _ => CommitteeCache::initialized( - state, - epoch, - &chain.spec, - ), + Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => { + CommitteeCache::initialized( + state, + epoch, + &chain.spec, + ) + } + Err(RelativeEpochError::EpochTooHigh { .. }) => { + Err(BeaconStateError::EpochOutOfBounds) + } + Err(RelativeEpochError::ArithError(e)) => { + Err(BeaconStateError::ArithError(e)) + } } .map_err(|e| match e { BeaconStateError::EpochOutOfBounds => { @@ -419,22 +436,12 @@ pub fn get_beacon_state_committees( ), })?; - // Attempt to write to the beacon cache (only if the cache - // size is not the default value). - if chain.config.shuffling_cache_size - != beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE - && let Some(shuffling_id) = shuffling_id - && let Some(mut cache_write) = chain - .shuffling_cache - .try_write_for(std::time::Duration::from_secs(1)) - { - cache_write.insert_committee_cache( - shuffling_id, - &possibly_built_cache, - ); + if let Some(shuffling_id) = shuffling_id { + historical_committee_cache + .insert(shuffling_id, committee_cache.clone()); } - possibly_built_cache + committee_cache }; // Use either the supplied slot or all slots in the epoch. diff --git a/beacon_node/http_api/src/caches.rs b/beacon_node/http_api/src/caches.rs new file mode 100644 index 00000000000..d92571594a7 --- /dev/null +++ b/beacon_node/http_api/src/caches.rs @@ -0,0 +1,43 @@ +use lru::LruCache; +use parking_lot::Mutex; +use std::num::NonZeroUsize; +use std::sync::Arc; +use types::{AttestationShufflingId, CommitteeCache, Epoch}; + +/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale +pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16; + +/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we +/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to +/// key those committees by finalized epoch. +#[derive(Eq, Hash, PartialEq)] +pub enum HistoricalShufflingId { + FinalizedEpoch(Epoch), + ShufflingId(AttestationShufflingId), +} + +/// Dedicated cache for attestation committees, used exclusively by the HTTP API. +/// +/// This may contain committees for finalized and unfinalized epochs. The name is slightly +/// missleading :) +pub struct HistoricalCommitteeCache { + committees: Mutex>>, +} + +impl HistoricalCommitteeCache { + pub fn new(size: NonZeroUsize) -> Self { + Self { + committees: Mutex::new(LruCache::new(size)), + } + } +} + +impl HistoricalCommitteeCache { + pub fn get(&self, id: &HistoricalShufflingId) -> Option> { + self.committees.lock().get(id).cloned() + } + + pub fn insert(&self, id: HistoricalShufflingId, cache: Arc) { + self.committees.lock().put(id, cache); + } +} diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index f31817c5ba7..74bf1ccd764 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -12,6 +12,7 @@ mod beacon; mod block_id; mod build_block_contents; mod builder_states; +mod caches; mod custody; mod database; mod light_client; @@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{ post_beacon_execution_payload_envelope_ssz, }; use crate::beacon::pool::*; +use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE; +pub use crate::caches::HistoricalCommitteeCache; use crate::light_client::{get_light_client_bootstrap, get_light_client_updates}; use crate::utils::{AnyVersionFilter, EthV1Filter}; use crate::validator::post_validator_liveness_epoch; @@ -132,6 +135,7 @@ pub struct Context { pub network_globals: Option>>, pub beacon_processor_send: Option>, pub sse_logging_components: Option, + pub historical_committee_cache: Arc, } /// Configuration for the HTTP server. @@ -148,6 +152,7 @@ pub struct Config { #[serde(with = "eth2::types::serde_status_code")] pub duplicate_block_status_code: StatusCode, pub target_peers: usize, + pub historical_committee_cache_size: usize, } impl Default for Config { @@ -163,6 +168,7 @@ impl Default for Config { enable_beacon_processor: true, duplicate_block_status_code: StatusCode::ACCEPTED, target_peers: 100, + historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE, } } } @@ -416,6 +422,11 @@ pub fn serve( }) .boxed(); + let historical_committee_cache = ctx.historical_committee_cache.clone(); + let beacon_states_committees_filter = warp::any() + .map(move || historical_committee_cache.clone()) + .boxed(); + // Create a `warp` filter that provides access to the network sender channel. let network_tx = ctx .network_senders @@ -628,8 +639,10 @@ pub fn serve( states::get_beacon_state_validators_id(beacon_states_path.clone()); // GET beacon/states/{state_id}/committees?slot,index,epoch - let get_beacon_state_committees = - states::get_beacon_state_committees(beacon_states_path.clone()); + let get_beacon_state_committees = states::get_beacon_state_committees( + beacon_states_path.clone(), + beacon_states_committees_filter, + ); // GET beacon/states/{state_id}/sync_committees?epoch let get_beacon_state_sync_committees = diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 27e2a27d35c..f27a04d17a2 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{Config, Context}; +use crate::{Config, Context, caches::HistoricalCommitteeCache}; use beacon_chain::{ BeaconChain, BeaconChainTypes, custody_context::NodeCustodyType, @@ -22,10 +22,10 @@ use lighthouse_network::{ }; use network::{NetworkReceivers, NetworkSenders}; use sensitive_url::SensitiveUrl; -use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use std::{future::Future, num::NonZeroUsize}; use store::MemoryStore; use task_executor::test_utils::TestRuntime; use types::{ChainSpec, EthSpec}; @@ -293,6 +293,9 @@ pub async fn create_api_server_with_config( network_globals: Some(network_globals), beacon_processor_send: Some(beacon_processor_send), sse_logging_components: None, + historical_committee_cache: Arc::new(HistoricalCommitteeCache::new( + NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(), + )), }); let (listening_socket, server) = diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 8ba2c0f3214..f10f9e3b455 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -215,6 +215,9 @@ pub fn get_config( if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? { client_config.chain.shuffling_cache_size = cache_size; + // Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak + // the behaviour of the HTTP API route `beacon/states/committees` + client_config.http_api.historical_committee_cache_size = cache_size; } if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {