Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 58 additions & 48 deletions validator_client/validator_services/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,54 +1660,8 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// Only download duties and push out additional block production events if we have some
// validators.
if !local_pubkeys.is_empty() {
let download_result = duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
})
.await;

match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;

let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();

debug!(
%dependent_root,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);

if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(current_epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
// Don't return early here, we still want to try and produce blocks using the cached values.
Err(e) => error!(
err = %e,
"Failed to download proposer duties"
),
for epoch in [current_epoch, current_epoch + 1] {
fetch_and_store_proposer_duties(duties_service, epoch, &local_pubkeys).await;
}

// Compute the block proposers for this slot again, now that we've received an update from
Expand Down Expand Up @@ -1750,6 +1704,62 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
Ok(())
}

async fn fetch_and_store_proposer_duties<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
epoch: Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) {
let download_result = duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node.get_validator_duties_proposer(epoch).await
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we should use get_validator_duties_proposer_v2 here to avoid the issues with the dependent root in the v1 API being wrong. Using v1 we're going to get a bunch of false positive Proposer duties re-org (the dependent root will change but the shuffling won't).

Problem is, the v2 endpoint was only added recently, so using it would be a breaking change. Maybe we could try v2 and fallback to v1 if it's not available? I always hate writing that sort of logic though and it's prone to introduce timeout issues. Maybe using v2 by default and having a flag to disable it is cleaner and easier (and we can delete the flag eventually after Gloas).

})
.await;

match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;

let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();

debug!(
%dependent_root,
%epoch,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);

if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
%epoch,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
Err(e) => error!(
err = %e,
%epoch,
"Failed to download proposer duties"
),
}
}

/// Query the beacon node for ptc duties for any known validators.
async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::BeaconNodeFallback;
use eth2::types::ProposerData;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, ForkName, ProposerPreferences};
use types::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, ProposerPreferences};
use validator_store::ValidatorStore;

pub struct Inner<S, T> {
Expand Down Expand Up @@ -66,52 +68,83 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
let executor = self.executor.clone();

let interval_fut = async move {
let mut published_preferences: HashMap<Epoch, Hash256> = HashMap::new();

loop {
let Some(current_slot) = self.slot_clock.now() else {
error!("Failed to read slot clock");
sleep(slot_duration).await;
continue;
};

if !self
.chain_spec
.fork_name_at_slot::<S::E>(current_slot)
.gloas_enabled()
{
let duration_to_next_epoch = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32);
sleep(duration_to_next_epoch).await;
continue;
}

let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let fork_name = self.chain_spec.fork_name_at_slot::<S::E>(current_slot);
self.publish_proposer_preferences(current_epoch, fork_name)

self.poll_and_publish_preferences(current_epoch, &mut published_preferences)
.await;

let duration_to_next_epoch = self
let duration_to_next_slot = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32);
sleep(duration_to_next_epoch).await;
.duration_to_next_slot()
.unwrap_or(slot_duration);
sleep(duration_to_next_slot).await;
}
};

executor.spawn(interval_fut, "proposer_preferences_service");
Ok(())
}

async fn publish_proposer_preferences(&self, current_epoch: Epoch, fork_name: ForkName) {
let (dependent_root, duties) = {
let proposers = self.duties_service.proposers.read();
match proposers.get(&current_epoch) {
Some((root, duties)) => (*root, duties.clone()),
None => return,
/// Publish proposer preferences for `current_epoch` and `current_epoch + 1`.
/// Will only publish preferences for a given epoch once per dependent root.
async fn poll_and_publish_preferences(
&self,
current_epoch: Epoch,
published_preferences: &mut HashMap<Epoch, Hash256>,
) {
for (epoch, fork_name) in [
(
current_epoch,
self.chain_spec.fork_name_at_epoch(current_epoch),
),
(
current_epoch + 1,
self.chain_spec.fork_name_at_epoch(current_epoch + 1),
),
] {
if !fork_name.gloas_enabled() {
continue;
}
};

let (dependent_root, duties) = {
let proposers = self.duties_service.proposers.read();
match proposers.get(&epoch) {
Some((root, duties)) => (*root, duties.clone()),
None => continue,
}
};

if published_preferences.get(&epoch) == Some(&dependent_root) {
continue;
}

if self
.publish_proposer_preferences(epoch, fork_name, dependent_root, duties)
.await
{
published_preferences.insert(epoch, dependent_root);
}
}

published_preferences.retain(|epoch, _| *epoch >= current_epoch);
}

async fn publish_proposer_preferences(
&self,
epoch: Epoch,
fork_name: ForkName,
dependent_root: Hash256,
duties: Vec<ProposerData>,
) -> bool {
let preferences_to_sign: Vec<_> = {
let mut result = vec![];
for duty in &duties {
Expand Down Expand Up @@ -144,11 +177,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
};

if preferences_to_sign.is_empty() {
return;
return false;
}

debug!(
%current_epoch,
%epoch,
count = preferences_to_sign.len(),
"Signing proposer preferences"
);
Expand All @@ -172,7 +205,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
}

if signed.is_empty() {
return;
return false;
}

let count = signed.len();
Expand Down Expand Up @@ -204,17 +237,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
match result {
Ok(()) => {
info!(
%current_epoch,
%epoch,
%count,
"Successfully published proposer preferences"
);
true
}
Err(e) => {
error!(
error = %e,
%current_epoch,
%epoch,
"Failed to publish proposer preferences"
);
false
}
}
}
Expand Down
Loading