Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use rand::SeedableRng;
use rand::rngs::{OsRng, StdRng};
use slasher::Slasher;
use slasher_service::SlasherService;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -639,6 +640,10 @@ where
network_globals: self.network_globals.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(),
historical_committee_cache: Arc::new(http_api::HistoricalCommitteeCache::new(
NonZeroUsize::new(self.http_api_config.historical_committee_cache_size)
.unwrap_or(NonZeroUsize::MIN),
)),
});

let exit = runtime_context.executor.exit();
Expand Down
81 changes: 44 additions & 37 deletions beacon_node/http_api/src/beacon/states.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::StateId;
use crate::caches::{HistoricalCommitteeCache, HistoricalShufflingId};
use crate::task_spawner::{Priority, TaskSpawner};
use crate::utils::ResponseFilter;
use crate::validator::pubkey_to_validator_index;
Expand All @@ -13,7 +14,10 @@ use eth2::types::{
};
use ssz::Encode;
use std::sync::Arc;
use types::{AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch};
use types::{
AttestationShufflingId, BeaconStateError, CommitteeCache, EthSpec, RelativeEpoch,
RelativeEpochError,
};
use warp::filters::BoxedFilter;
use warp::http::Response;
use warp::hyper::Body;
Expand All @@ -26,6 +30,8 @@ type BeaconStatesPath<T> = BoxedFilter<(
Arc<BeaconChain<T>>,
)>;

type BeaconStatesCommitteesFilter = BoxedFilter<(Arc<HistoricalCommitteeCache>,)>;

// GET beacon/states/{state_id}/pending_consolidations
pub fn get_beacon_state_pending_consolidations<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
Expand Down Expand Up @@ -337,17 +343,20 @@ pub fn get_beacon_state_sync_committees<T: BeaconChainTypes>(
// GET beacon/states/{state_id}/committees?slot,index,epoch
pub fn get_beacon_state_committees<T: BeaconChainTypes>(
beacon_states_path: BeaconStatesPath<T>,
beacon_states_committees_filter: BeaconStatesCommitteesFilter,
) -> ResponseFilter {
beacon_states_path
.clone()
.and(warp::path("committees"))
.and(warp::query::<eth2::types::CommitteesQuery>())
.and(beacon_states_committees_filter)
.and(warp::path::end())
.then(
|state_id: StateId,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
query: eth2::types::CommitteesQuery| {
query: eth2::types::CommitteesQuery,
historical_committee_cache: Arc<HistoricalCommitteeCache>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let (data, execution_optimistic, finalized) = state_id
.map_state_and_execution_optimistic_and_finalized(
Expand All @@ -364,33 +373,33 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
let shuffling_id = if let Ok(Some(shuffling_decision_block)) =
chain.block_root_at_slot(decision_slot, WhenSlotSkipped::Prev)
{
Some(AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
})
Some(HistoricalShufflingId::ShufflingId(
AttestationShufflingId {
shuffling_epoch: epoch,
shuffling_decision_block,
},
))
} else if epoch < chain.head().finalized_checkpoint().epoch {
// Use the case for finalized epochs
Some(HistoricalShufflingId::FinalizedEpoch(epoch))
} else {
None
};

// Attempt to read from the chain cache if there exists a
// shuffling_id
let maybe_cached_shuffling = if let Some(shuffling_id) =
shuffling_id.as_ref()
{
chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
.and_then(|mut cache_write| cache_write.get(shuffling_id))
.and_then(|cache_item| cache_item.wait().ok())
} else {
None
};
let maybe_cached_shuffling =
if let Some(shuffling_id) = shuffling_id.as_ref() {
historical_committee_cache.get(shuffling_id)
} else {
None
};

let committee_cache =
if let Some(shuffling) = maybe_cached_shuffling {
shuffling
} else {
let possibly_built_cache = match RelativeEpoch::from_epoch(
let committee_cache = match RelativeEpoch::from_epoch(
current_epoch,
epoch,
) {
Expand All @@ -401,11 +410,19 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
{
state.committee_cache(relative_epoch).cloned()
}
_ => CommitteeCache::initialized(
state,
epoch,
&chain.spec,
),
Ok(_) | Err(RelativeEpochError::EpochTooLow { .. }) => {
Comment thread
michaelsproul marked this conversation as resolved.
CommitteeCache::initialized(
state,
epoch,
&chain.spec,
)
}
Err(RelativeEpochError::EpochTooHigh { .. }) => {
Err(BeaconStateError::EpochOutOfBounds)
}
Err(RelativeEpochError::ArithError(e)) => {
Err(BeaconStateError::ArithError(e))
}
}
.map_err(|e| match e {
BeaconStateError::EpochOutOfBounds => {
Expand All @@ -419,22 +436,12 @@ pub fn get_beacon_state_committees<T: BeaconChainTypes>(
),
})?;

// Attempt to write to the beacon cache (only if the cache
// size is not the default value).
if chain.config.shuffling_cache_size
!= beacon_chain::shuffling_cache::DEFAULT_CACHE_SIZE
&& let Some(shuffling_id) = shuffling_id
&& let Some(mut cache_write) = chain
.shuffling_cache
.try_write_for(std::time::Duration::from_secs(1))
{
cache_write.insert_committee_cache(
shuffling_id,
&possibly_built_cache,
);
if let Some(shuffling_id) = shuffling_id {
Comment thread
michaelsproul marked this conversation as resolved.
Comment thread
michaelsproul marked this conversation as resolved.
historical_committee_cache
.insert(shuffling_id, committee_cache.clone());
}

possibly_built_cache
committee_cache
};

// Use either the supplied slot or all slots in the epoch.
Expand Down
43 changes: 43 additions & 0 deletions beacon_node/http_api/src/caches.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use lru::LruCache;
use parking_lot::Mutex;
use std::num::NonZeroUsize;
use std::sync::Arc;
use types::{AttestationShufflingId, CommitteeCache, Epoch};

/// See `shuffling_cache::DEFAULT_CACHE_SIZE` for rationale
pub const DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE: usize = 16;

/// Indexes the `HistoricalCommitteeCache`. We can compute committees for very old epochs, and we
/// can't retrieve the decision root cheaply from a state. For those cases we allow the cache to
/// key those committees by finalized epoch.
#[derive(Eq, Hash, PartialEq)]
pub enum HistoricalShufflingId {
FinalizedEpoch(Epoch),
ShufflingId(AttestationShufflingId),
}

/// Dedicated cache for attestation committees, used exclusively by the HTTP API.
///
/// This may contain committees for finalized and unfinalized epochs. The name is slightly
/// missleading :)
pub struct HistoricalCommitteeCache {
committees: Mutex<LruCache<HistoricalShufflingId, Arc<CommitteeCache>>>,
}

impl HistoricalCommitteeCache {
pub fn new(size: NonZeroUsize) -> Self {
Self {
committees: Mutex::new(LruCache::new(size)),
}
}
}

impl HistoricalCommitteeCache {
pub fn get(&self, id: &HistoricalShufflingId) -> Option<Arc<CommitteeCache>> {
self.committees.lock().get(id).cloned()
}

pub fn insert(&self, id: HistoricalShufflingId, cache: Arc<CommitteeCache>) {
self.committees.lock().put(id, cache);
}
}
17 changes: 15 additions & 2 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod beacon;
mod block_id;
mod build_block_contents;
mod builder_states;
mod caches;
mod custody;
mod database;
mod light_client;
Expand Down Expand Up @@ -40,6 +41,8 @@ use crate::beacon::execution_payload_envelope::{
post_beacon_execution_payload_envelope_ssz,
};
use crate::beacon::pool::*;
use crate::caches::DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE;
pub use crate::caches::HistoricalCommitteeCache;
use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::utils::{AnyVersionFilter, EthV1Filter};
use crate::validator::post_validator_liveness_epoch;
Expand Down Expand Up @@ -132,6 +135,7 @@ pub struct Context<T: BeaconChainTypes> {
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub sse_logging_components: Option<SSELoggingComponents>,
pub historical_committee_cache: Arc<HistoricalCommitteeCache>,
}

/// Configuration for the HTTP server.
Expand All @@ -148,6 +152,7 @@ pub struct Config {
#[serde(with = "eth2::types::serde_status_code")]
pub duplicate_block_status_code: StatusCode,
pub target_peers: usize,
pub historical_committee_cache_size: usize,
}

impl Default for Config {
Expand All @@ -163,6 +168,7 @@ impl Default for Config {
enable_beacon_processor: true,
duplicate_block_status_code: StatusCode::ACCEPTED,
target_peers: 100,
historical_committee_cache_size: DEFAULT_HISTORICAL_COMMITTEE_CACHE_SIZE,
}
}
}
Expand Down Expand Up @@ -416,6 +422,11 @@ pub fn serve<T: BeaconChainTypes>(
})
.boxed();

let historical_committee_cache = ctx.historical_committee_cache.clone();
let beacon_states_committees_filter = warp::any()
.map(move || historical_committee_cache.clone())
.boxed();

// Create a `warp` filter that provides access to the network sender channel.
let network_tx = ctx
.network_senders
Expand Down Expand Up @@ -628,8 +639,10 @@ pub fn serve<T: BeaconChainTypes>(
states::get_beacon_state_validators_id(beacon_states_path.clone());

// GET beacon/states/{state_id}/committees?slot,index,epoch
let get_beacon_state_committees =
states::get_beacon_state_committees(beacon_states_path.clone());
let get_beacon_state_committees = states::get_beacon_state_committees(
beacon_states_path.clone(),
beacon_states_committees_filter,
);

// GET beacon/states/{state_id}/sync_committees?epoch
let get_beacon_state_sync_committees =
Expand Down
7 changes: 5 additions & 2 deletions beacon_node/http_api/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Config, Context};
use crate::{Config, Context, caches::HistoricalCommitteeCache};
use beacon_chain::{
BeaconChain, BeaconChainTypes,
custody_context::NodeCustodyType,
Expand All @@ -22,10 +22,10 @@ use lighthouse_network::{
};
use network::{NetworkReceivers, NetworkSenders};
use sensitive_url::SensitiveUrl;
use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use std::{future::Future, num::NonZeroUsize};
use store::MemoryStore;
use task_executor::test_utils::TestRuntime;
use types::{ChainSpec, EthSpec};
Expand Down Expand Up @@ -293,6 +293,9 @@ pub async fn create_api_server_with_config<T: BeaconChainTypes>(
network_globals: Some(network_globals),
beacon_processor_send: Some(beacon_processor_send),
sse_logging_components: None,
historical_committee_cache: Arc::new(HistoricalCommitteeCache::new(
NonZeroUsize::new(http_config.historical_committee_cache_size).unwrap(),
)),
Comment thread
michaelsproul marked this conversation as resolved.
});

let (listening_socket, server) =
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ pub fn get_config<E: EthSpec>(

if let Some(cache_size) = clap_utils::parse_optional(cli_args, "shuffling-cache-size")? {
client_config.chain.shuffling_cache_size = cache_size;
// Mantain backwards compatibility with users customizing `shuffling_cache_size` to tweak
// the behaviour of the HTTP API route `beacon/states/committees`
client_config.http_api.historical_committee_cache_size = cache_size;
}

if let Some(batches) = clap_utils::parse_optional(cli_args, "blob-publication-batches")? {
Expand Down
Loading