diff --git a/node/bft/README.md b/node/bft/README.md index 1dd4a55e7b..db426f3ed4 100644 --- a/node/bft/README.md +++ b/node/bft/README.md @@ -31,6 +31,18 @@ f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a ti ``` Note that in this quote `2f + 1` should really be `n - f`. +#### Batch Proposal + +Batch proposals are driven by a dedicated **batch proposal task** that runs in a loop and is the only place that calls `propose_batch()`. This keeps proposal on a single execution path and avoids concurrent proposal attempts. + +Each iteration of the inner loop waits for the first of these to fire before calling `propose_batch()`: + +1. **Ready notification** (`is_ready_notify`) — When the primary advances to a new round (e.g. after a certificate is committed, or in the Narwhal case when storage increments the round), it signals readiness via `is_ready_notify`. The task wakes up and, if the node is synced, calls `propose_batch()`. +2. **Delay timeout** — If not sufficient time has elapsed, the task sets a timer for `MAX_BATCH_DELAY − elapsed`. +3. **Sync completion** If the node is currently syncing, it waits for the state to change to `Synced`. This lets the task wake up as soon as sync finishes without polling. + +The primary tracks the latest proposed **(round, timestamp)** in `latest_proposed_batch`. This state is used to: avoid proposing the same round twice; rate-limit the primary's own proposals (via a dedicated check against the previous proposal timestamp); and decide whether to advance when a certificate is received. Peer proposal timestamps are validated separately so that the primary does not accept batches proposed too soon after a peer's previous proposal. + ### Ledger Advancement The BFT module also advances the ledger as new certificates are added to the DAG. There are two different ways the ledger can advance. diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 9c3bbf0bbc..439bdadb67 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -14,7 +14,7 @@ // limitations under the License. use crate::{ - MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, + MAX_LEADER_CERTIFICATE_DELAY, helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now}, primary::{Primary, PrimaryCallback}, sync::SyncCallback, @@ -207,7 +207,13 @@ impl BFT { #[async_trait::async_trait] impl PrimaryCallback for BFT { /// Notification that a new round has started. - fn update_to_next_round(&self, current_round: u64) -> bool { + /// + /// # Arguments + /// * `current_round` - the round the caller is in (to avoid race conditions) + /// + /// # Returns + /// `true` if the BFT moved to the next round. + fn try_advance_to_next_round(&self, current_round: u64) -> bool { // Ensure the current round is at least the storage round (this is a sanity check). let storage_round = self.storage().current_round(); if current_round < storage_round { @@ -482,7 +488,7 @@ impl BFT { /// /// This is always true for a new BFT instance. fn is_timer_expired(&self) -> bool { - self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now() + self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now() } /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions: @@ -877,7 +883,7 @@ impl BFT { mod tests { use crate::{ BFT, - MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, + MAX_LEADER_CERTIFICATE_DELAY, PrimaryCallback, helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round}, sync::SyncCallback, @@ -1109,9 +1115,7 @@ mod tests { assert!(!result); } // Wait for the timer to expire. - let leader_certificate_timeout = - std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000); - std::thread::sleep(leader_certificate_timeout); + std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY); // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round. let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); if bft_timer.is_timer_expired() { diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 8fbdd0bbfa..d68618c8e5 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -17,7 +17,7 @@ use crate::helpers::Telemetry; use crate::{ CONTEXT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, MEMORY_POOL_PORT, Worker, events::{DisconnectReason, EventCodec, PrimaryPing}, @@ -102,9 +102,9 @@ use tokio_stream::StreamExt; use tokio_util::codec::Framed; /// The maximum interval of events to cache. -const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds +const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds /// The maximum interval of requests to cache. -const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds +const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds /// The maximum number of connection attempts in an interval. #[cfg(not(test))] diff --git a/node/bft/src/helpers/pending.rs b/node/bft/src/helpers/pending.rs index c57dc1a83d..9ffafbb85f 100644 --- a/node/bft/src/helpers/pending.rs +++ b/node/bft/src/helpers/pending.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::MAX_FETCH_TIMEOUT_IN_MS; +use crate::MAX_FETCH_TIMEOUT; use snarkos_node_bft_ledger_service::LedgerService; use snarkvm::{ console::network::{Network, consensus_config_value}, @@ -35,8 +35,8 @@ use time::OffsetDateTime; use tokio::sync::oneshot; /// The maximum number of seconds to wait before expiring a callback. -/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds. -pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64; +/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT` when converting to seconds. +pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT.as_secs() as i64; /// Returns the maximum number of redundant requests for the number of validators in the specified round. pub fn max_redundant_requests(ledger: Arc>, round: u64) -> Result { diff --git a/node/bft/src/helpers/timestamp.rs b/node/bft/src/helpers/timestamp.rs index fca6978584..536fb7a593 100644 --- a/node/bft/src/helpers/timestamp.rs +++ b/node/bft/src/helpers/timestamp.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::MAX_TIMESTAMP_DELTA_IN_SECS; +use crate::MAX_TIMESTAMP_DELTA; use snarkvm::prelude::{Result, bail}; use time::OffsetDateTime; @@ -36,7 +36,7 @@ pub fn to_utc_datetime(timestamp: i64) -> OffsetDateTime { /// Sanity checks the timestamp for liveness. pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> { // Ensure the timestamp is within range. - if timestamp > (now() + MAX_TIMESTAMP_DELTA_IN_SECS) { + if timestamp > (now() + MAX_TIMESTAMP_DELTA.as_secs() as i64) { bail!("Timestamp {timestamp} is too far in the future") } Ok(()) @@ -45,17 +45,17 @@ pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> { #[cfg(test)] mod prop_tests { use super::*; - use crate::MAX_TIMESTAMP_DELTA_IN_SECS; + use crate::MAX_TIMESTAMP_DELTA; use proptest::prelude::*; use test_strategy::proptest; fn any_valid_timestamp() -> BoxedStrategy { - (Just(now()), 0..MAX_TIMESTAMP_DELTA_IN_SECS).prop_map(|(now, delta)| now + delta).boxed() + (Just(now()), 0..(MAX_TIMESTAMP_DELTA.as_secs() as i64)).prop_map(|(now, delta)| now + delta).boxed() } fn any_invalid_timestamp() -> BoxedStrategy { - (Just(now()), MAX_TIMESTAMP_DELTA_IN_SECS..).prop_map(|(now, delta)| now + delta).boxed() + (Just(now()), (MAX_TIMESTAMP_DELTA.as_secs() as i64)..).prop_map(|(now, delta)| now + delta).boxed() } #[proptest] diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index 4ff4e409ff..9b0210b0aa 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -29,6 +29,8 @@ pub use snarkos_node_bft_events as events; pub use snarkos_node_bft_ledger_service as ledger_service; pub use snarkos_node_bft_storage_service as storage_service; +use std::time::Duration; + pub mod helpers; mod bft; @@ -51,24 +53,34 @@ pub const CONTEXT: &str = "[MemoryPool]"; /// The port on which the memory pool listens for incoming connections. pub const MEMORY_POOL_PORT: u16 = 5000; // port -/// The maximum number of milliseconds to wait before proposing a batch. -pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms -/// The minimum number of seconds to wait before proposing a batch. -pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds -/// The maximum number of milliseconds to wait before timing out on a fetch. -pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms -/// The maximum number of seconds allowed for the leader to send their certificate. -pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds -/// The maximum number of seconds before the timestamp is considered expired. -pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds +/// The maximum time to wait before proposing a batch. +pub const MAX_BATCH_DELAY: Duration = Duration::from_millis(2500); + +/// The minimum time that needs to elapse between two consecutive batch proposals. +/// This creates a lower bound on the block interval, and ensures the network will not be overwhelmed with too many blocks/certificates. +pub const MIN_BATCH_DELAY: Duration = Duration::from_secs(1); + +/// The time a primary waits between attempts to create a new batch (only relevant after `MIN_BATCH_DELAY` has passed). +/// This only serves as a failsafe in case the task does not get woken up through other means. +/// eowering it too much will most likely waste +pub const CREATE_BATCH_INTERVAL: Duration = Duration::from_millis(250); + +/// The maximum time to wait before timing out on a fetch. +pub const MAX_FETCH_TIMEOUT: Duration = Duration::from_secs(3 * MAX_BATCH_DELAY.as_secs()); +/// The maximum time allowed for the leader to send their certificate. +/// After this time, the node will consider the leader as failed and try to advance the round without it. +pub const MAX_LEADER_CERTIFICATE_DELAY: Duration = Duration::from_secs(2 * MAX_BATCH_DELAY.as_secs()); +/// The maximum difference allowed between our local time and a certificate's timestamp, for the node to sign the certificate. +/// This prevents malicious actors from proposing certificates with timestamps that are too log or too far in the future) +pub const MAX_TIMESTAMP_DELTA: Duration = Duration::from_secs(10); /// The maximum number of workers that can be spawned. pub const MAX_WORKERS: u8 = 1; // worker(s) /// The interval at which each primary broadcasts a ping to every other node. /// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly. -pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms +pub const PRIMARY_PING_INTERVAL: Duration = Duration::from_secs(2 * MAX_BATCH_DELAY.as_secs()); /// The interval at which each worker broadcasts a ping to every other node. -pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms +pub const WORKER_PING_INTERVAL: Duration = Duration::from_secs(4 * MAX_BATCH_DELAY.as_secs()); /// A helper macro to spawn a blocking task. #[macro_export] diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 57e3b424bc..90de9ee526 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -14,14 +14,15 @@ // limitations under the License. use crate::{ + CREATE_BATCH_INTERVAL, Gateway, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, MAX_WORKERS, - MIN_BATCH_DELAY_IN_SECS, - PRIMARY_PING_IN_MS, + MIN_BATCH_DELAY, + PRIMARY_PING_INTERVAL, Sync, Transport, - WORKER_PING_IN_MS, + WORKER_PING_INTERVAL, Worker, events::{BatchPropose, BatchSignature, Event}, helpers::{ @@ -70,7 +71,7 @@ use indexmap::{IndexMap, IndexSet}; #[cfg(feature = "locktick")] use locktick::{ parking_lot::{Mutex, RwLock}, - tokio::Mutex as TMutex, + tokio::RwLock as TRwLock, }; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; @@ -80,22 +81,29 @@ use std::{ collections::{HashMap, HashSet}, future::Future, net::SocketAddr, + pin::Pin, sync::{Arc, OnceLock}, - time::Duration, + time::Instant, }; #[cfg(not(feature = "locktick"))] -use tokio::sync::Mutex as TMutex; -use tokio::task::JoinHandle; +use tokio::sync::RwLock as TRwLock; +use tokio::{sync::Notify, task::JoinHandle}; -/// A helper type for an optional proposed batch. -pub type ProposedBatch = RwLock>>; +/// A helper type to keep track of the latest proposed batch. +pub(crate) type ProposedBatch = RwLock>>; /// This callback trait allows listening to changes in the Primary, such as round advancement. -/// This is currently used by BFT. +/// This is implemented by [`BFT`]. #[async_trait::async_trait] pub trait PrimaryCallback: Send + std::marker::Sync { - /// Notifies that a new round has started. - fn update_to_next_round(&self, current_round: u64) -> bool; + /// Asks the callback to if we can move to the next round. + /// + /// # Arguments + /// * `current_round` - the round the Primary is in (to avoid race conditions) + /// + /// # Returns + /// `true` if we moved to the next round. + fn try_advance_to_next_round(&self, current_round: u64) -> bool; /// Add a certificated that was created by the primary or received from a peer. async fn add_new_certificate(&self, certificate: BatchCertificate) -> Result<()>; @@ -115,20 +123,33 @@ pub struct Primary { ledger: Arc>, /// The workers. workers: Arc>>>, + /// The primary callback (used by [`BFT`]). primary_callback: Arc>>>, + /// The batch proposal, if the primary is currently proposing a batch. proposed_batch: Arc>, - /// The timestamp of the most recent proposed batch. - latest_proposed_batch_timestamp: Arc>, + + /// Holds the most recent round and timestamp that the primary proposed a batch for. + /// TODO(kaimast): avoiding using an async lock here, so this can be merged with the `proposed_batch`, + /// to have a unified `primary_state` field. + latest_proposed_batch: Arc>>, + /// The recently-signed batch proposals. signed_proposals: Arc>>, + /// The handles for all background tasks spawned by this primary. handles: Arc>>>, - /// The lock for propose_batch. It holds the most recent round that was proposed for. - propose_lock: Arc>, + /// The node configuration directory. node_data_dir: NodeDataDir, + + /// Allows notifying the proposal taks when a new batch can be proposed. + is_ready_notify: Arc, + + /// Used to wake up a the dedicated round-increment task, if we may be able to advance to the next round. + /// This is used, so the timeout for round advancement is reset on every round increment. + round_increment_notify: Arc, } impl Primary { @@ -168,14 +189,15 @@ impl Primary { gateway, storage, ledger, + node_data_dir, workers: Default::default(), primary_callback: Default::default(), proposed_batch: Default::default(), - latest_proposed_batch_timestamp: Default::default(), + latest_proposed_batch: Default::default(), signed_proposals: Default::default(), handles: Default::default(), - propose_lock: Default::default(), - node_data_dir, + is_ready_notify: Default::default(), + round_increment_notify: Default::default(), }) } @@ -190,26 +212,9 @@ impl Primary { let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) = proposal_cache.into(); - // Verify that the proposal cache is not too far ahead of the ledger. - // If the cache round exceeds the ledger round by more than MAX_GC_ROUNDS, the ledger - // snapshot is too old to recover from the cached state. The operator must restore a - // more recent ledger snapshot before restarting the node. - let ledger_round = self.ledger.latest_round(); - let max_gc_rounds = BatchHeader::::MAX_GC_ROUNDS as u64; - if latest_certificate_round > ledger_round.saturating_add(max_gc_rounds) { - bail!( - "The proposal cache (round {latest_certificate_round}) is more than {max_gc_rounds} \ - rounds ahead of the ledger (round {ledger_round}). \ - Please restore a more recent ledger snapshot before restarting the node." - ); - } - - // Write the proposed batch. + *self.latest_proposed_batch.write().await = Some((latest_certificate_round, now())); *self.proposed_batch.write() = proposed_batch; - // Write the signed proposals. *self.signed_proposals.write() = signed_proposals; - // Writ the propose lock. - *self.propose_lock.lock().await = latest_certificate_round; // Update the storage with the pending certificates. for certificate in pending_certificates { @@ -330,11 +335,6 @@ impl Primary { pub fn workers(&self) -> &[Worker] { self.workers.get().expect("Primary is not running yet") } - - /// Returns the batch proposal of our primary, if one currently exists. - pub fn proposed_batch(&self) -> &Arc> { - &self.proposed_batch - } } impl Primary { @@ -396,9 +396,17 @@ impl Primary { /// 2. Sign the batch. /// 3. Set the batch proposal in the primary. /// 4. Broadcast the batch header to all validators for signing. - pub async fn propose_batch(&self) -> Result<()> { - // This function isn't re-entrant. - let mut lock_guard = self.propose_lock.lock().await; + /// + /// # Returns + /// - `Ok(true)` if the batch was proposed. + /// - `Ok(false)` if the batch was not proposed for a benign reason, e.g., the timestamp is too soon after the previous certificate. + /// - `Err(err)` if an unexpected error occured. + pub async fn propose_batch(&self) -> Result { + // Ensure there are not concurrent executions of this function. + // + // Note, in the current design, this function is only invoked from the batch proposal task, and it is technically + // not possible for there to be concurrent invocations of the function, but we keep this lock for now. + let mut lock_guard = self.latest_proposed_batch.write().await; // Check if the proposed batch has expired, and clear it if it has expired. if let Err(err) = self @@ -406,7 +414,7 @@ impl Primary { .with_context(|| "Failed to check the proposed batch for expiration") { warn!("{}", flatten_error(&err)); - return Ok(()); + return Ok(false); } // Retrieve the current round. @@ -420,14 +428,16 @@ impl Primary { ensure!(round > 0, "Round 0 cannot have transaction batches"); // If the current storage round is below the latest proposal round, then return early. - if round < *lock_guard { - warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard); - return Ok(()); + if let Some((latest_round, _)) = &*lock_guard + && round < *latest_round + { + warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}"); + return Ok(false); } // If there is a batch being proposed already, // rebroadcast the batch header to the non-signers, and return early. - if let Some(proposal) = self.proposed_batch.read().as_ref() { + if let Some(proposal) = &*self.proposed_batch.read() { // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. if round < proposal.round() || proposal @@ -440,7 +450,7 @@ impl Primary { "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", proposal.round(), ); - return Ok(()); + return Ok(false); } // Construct the event. // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. @@ -464,34 +474,30 @@ impl Primary { } } debug!("Proposed batch for round {} is still valid", proposal.round()); - return Ok(()); + return Ok(false); } #[cfg(feature = "metrics")] metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64); // Ensure that the primary does not create a new proposal too quickly. - if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) { - debug!( - "{}", - flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}"))) - ); - return Ok(()); + if let Some((_, latest_timestamp)) = &*lock_guard + && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())? + { + return Ok(false); } // Ensure the primary has not proposed a batch for this round before. if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) { // If a BFT sender was provided, attempt to advance the current round. if let Some(cb) = &*self.primary_callback.get_ref() { - match cb.update_to_next_round(self.current_round()) { - // 'is_ready' is true if the primary is ready to propose a batch for the next round. + match cb.try_advance_to_next_round(self.current_round()) { true => (), // continue, - // 'is_ready' is false if the primary is not ready to propose a batch for the next round. - false => return Ok(()), + false => return Ok(false), } } debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed()); - return Ok(()); + return Ok(false); } // Determine if the current round has been proposed. @@ -499,9 +505,11 @@ impl Primary { // good for network reliability and should not be prevented for the already existing proposed_batch. // If a certificate already exists for the current round, an attempt should be made to advance the // round as early as possible. - if round == *lock_guard { + if let Some((latest_round, _)) = &*lock_guard + && *latest_round == round + { debug!("Primary is safely skipping a batch proposal - round {round} already proposed"); - return Ok(()); + return Ok(false); } // Retrieve the committee to check against. @@ -519,7 +527,7 @@ impl Primary { "(please connect to more validators)".dimmed() ); trace!("Primary is connected to {} validators", connected_validators.len() - 1); - return Ok(()); + return Ok(false); } } @@ -548,7 +556,7 @@ impl Primary { "Primary is safely skipping a batch proposal for round {round} {}", format!("(previous round {previous_round} has not reached quorum)").dimmed() ); - return Ok(()); + return Ok(false); } // Initialize the map of transmissions. @@ -690,11 +698,11 @@ impl Primary { // Determine the current timestamp. let current_timestamp = now(); - *lock_guard = round; - /* Proceeding to sign & propose the batch. */ info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len()); + // Update the latest proposed round and timestamp. + *lock_guard = Some((round, current_timestamp)); // Retrieve the private key. let private_key = *self.gateway.account().private_key(); // Retrieve the committee ID. @@ -723,13 +731,14 @@ impl Primary { error!("{}", flatten_error(err.context("Failed to reinsert transmissions"))); } })?; + + // Store the proposal. + *self.proposed_batch.write() = Some(proposal); + // Broadcast the batch to all validators for signing. self.gateway.broadcast(Event::BatchPropose(batch_header.into())); - // Set the timestamp of the latest proposed batch. - *self.latest_proposed_batch_timestamp.write() = proposal.timestamp(); - // Set the proposed batch. - *self.proposed_batch.write() = Some(proposal); - Ok(()) + + Ok(true) } /// Processes a batch propose from a peer. @@ -841,7 +850,7 @@ impl Primary { // Compute the previous round. let previous_round = batch_round.saturating_sub(1); // Ensure that the peer did not propose a batch too quickly. - if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { + if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { // Proceed to disconnect the validator. self.gateway.disconnect(peer_ip); return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'"))); @@ -1214,9 +1223,9 @@ impl Primary { // Determine if we are currently proposing a round that is relevant. // Note: This is important, because while our peers have advanced, // they may not be proposing yet, and thus still able to sign our proposed batch. - let should_advance = match &*self.proposed_batch.read() { + let should_advance = match &*self.latest_proposed_batch.read().await { // We advance if the proposal round is less than the current round that was just certified. - Some(proposal) => proposal.round() < certificate_round, + Some((latest_round, _)) => *latest_round < certificate_round, // If there's no proposal, we consider advancing. None => true, }; @@ -1227,7 +1236,7 @@ impl Primary { // Determine whether to advance to the next round. if is_quorum && should_advance && certificate_round >= current_round { // If we have reached the quorum threshold and the round should advance, then proceed to the next round. - self.try_increment_to_the_next_round(current_round + 1).await?; + self.round_increment_notify.notify_one(); } Ok(()) } @@ -1238,8 +1247,8 @@ impl Primary { /// /// For each receiver in the `primary_receiver` struct, there will be a dedicated task /// that awaits new data and handles it accordingly. - /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically - /// tries to move the the next round of batches. + /// Additionally, this spawns a task that periodically issues PrimaryPings and one that + /// tries to move to the next round when triggered (e.g. after a certificate is stored) or on a timeout. /// /// This function is called exactly once, in `Self::run()`. fn start_handlers(&self, primary_receiver: PrimaryReceiver) { @@ -1257,7 +1266,7 @@ impl Primary { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; + tokio::time::sleep(PRIMARY_PING_INTERVAL).await; // Retrieve the block locators. let self__ = self_.clone(); @@ -1345,7 +1354,7 @@ impl Primary { let self_ = self.clone(); self.spawn(async move { loop { - tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await; + tokio::time::sleep(WORKER_PING_INTERVAL).await; // If the primary is not synced, then do not broadcast the worker ping(s). if !self_.sync.is_synced() { trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed()); @@ -1358,32 +1367,55 @@ impl Primary { } }); - // Start the batch proposer. + // Start the batch proposal task. let self_ = self.clone(); self.spawn(async move { + // Each outer loop iteration represents one new proposed batch. loop { - // Sleep briefly, but longer than if there were no batch. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; + let round_start = Instant::now(); let current_round = self_.current_round(); - // If the primary is not synced, then do not propose a batch. - if !self_.sync.is_synced() { - debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed()); - continue; - } - // A best-effort attempt to skip the scheduled batch proposal if - // round progression already triggered one. - if self_.propose_lock.try_lock().is_err() { - trace!( - "Skipping batch proposal for round {current_round} {}", - "(node is already proposing)".dimmed() - ); - continue; - }; - // If there is no proposed batch, attempt to propose a batch. - // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path, - // and only one batch needs to be proposed at a time. - if let Err(e) = self_.propose_batch().await { - warn!("Cannot propose a batch - {e}"); + + // The inner loop represents multiple attempts to propose a batch within the same round. + // Propose a batch within the same round until the timeout is triggered, or the round is advanced by some other means. + while self_.current_round() == current_round { + // Assemble a list of conditions that will trigger the primary to attempt batch creation. + let mut futures: Vec + Send>>> = vec![]; + + // Always wait for is_ready + futures.push(Box::pin(self_.is_ready_notify.notified())); + + // If the maximum batch delay has not been reached, wait for it. + // Otherwise, add a small timeout to avoid busy waiting. + // + // TODO(kaimast): ideally, we do not need the minimum timeout at all after MAX_BATCH_DELAY, + // but that would need more testing to ensure this loop never gets stuck. + let timeout = MAX_BATCH_DELAY.saturating_sub(round_start.elapsed()).max(CREATE_BATCH_INTERVAL); + futures.push(Box::pin(tokio::time::sleep(timeout))); + + // If the node is not synced yet, wait for block sync to complete. + if let Some(fut) = self_.sync.wait_for_synced_if_syncing() { + futures.push(fut); + } + + // Wait until one of the conditions is met. + futures::future::select_all(futures).await; + + // If the primary is not synced, then do not propose a batch. + if !self_.sync.is_synced() { + debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed()); + continue; + } + + // If there is no proposed batch, attempt to propose a batch. + // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path, + // and only one batch needs to be proposed at a time. + match self_.propose_batch().await { + Ok(true) => break, + Ok(false) => (), // retry + Err(err) => { + warn!("{}", flatten_error(err.context("Cannot propose a batch"))); + } + } } } }); @@ -1397,6 +1429,7 @@ impl Primary { trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed()); continue; } + // Spawn a task to process the proposed batch. let self_ = self_.clone(); tokio::spawn(async move { @@ -1464,44 +1497,53 @@ impl Primary { } }); - // This task periodically tries to move to the next round. - // - // Note: This is necessary to ensure that the primary is not stuck on a previous round - // despite having received enough certificates to advance to the next round. + // This task tries to move to the next round when triggered (e.g. after a certificate is stored) + // or after a timeout, so we are not stuck on a previous round despite having quorum. let self_ = self.clone(); self.spawn(async move { loop { - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; - // If the primary is not synced, then do not increment to the next round. - if !self_.sync.is_synced() { - trace!("Skipping round increment {}", "(node is syncing)".dimmed()); - continue; - } - // Attempt to increment to the next round. + let round_start = Instant::now(); let current_round = self_.current_round(); - let next_round = current_round.saturating_add(1); - // Determine if the quorum threshold is reached for the current round. - let is_quorum_threshold_reached = { - // Retrieve the certificate authors for the current round. - let authors = self_.storage.get_certificate_authors_for_round(current_round); - // If there are no certificates, then skip this check. - if authors.is_empty() { - continue; + + // Inner loop: wait and try to increment while we're still in the same round. + while self_.current_round() == current_round { + let mut futures: Vec + Send>>> = + vec![Box::pin(self_.round_increment_notify.notified())]; + + if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed()) + && !remaining_delay.is_zero() + { + futures.push(Box::pin(tokio::time::sleep(remaining_delay))); } - // Retrieve the committee lookback for the current round. - let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else { - warn!("Failed to retrieve the committee lookback for round {current_round}"); + if !self_.sync.is_synced() { + futures.push(Box::pin(self_.sync.wait_for_synced())); + } + let _ = futures::future::select_all(futures).await; + + if !self_.sync.is_synced() { + trace!("Skipping round increment {}", "(node is syncing)".dimmed()); continue; + } + + let next_round = current_round.saturating_add(1); + let is_quorum_threshold_reached = { + let authors = self_.storage.get_certificate_authors_for_round(current_round); + if authors.is_empty() { + continue; + } + let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) + else { + warn!("Failed to retrieve the committee lookback for round {current_round}"); + continue; + }; + committee_lookback.is_quorum_threshold_reached(&authors) }; - // Check if the quorum threshold is reached for the current round. - committee_lookback.is_quorum_threshold_reached(&authors) - }; - // Attempt to increment to the next round if the quorum threshold is reached. - if is_quorum_threshold_reached { - debug!("Quorum threshold reached for round {current_round}"); - if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { - warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); + + if is_quorum_threshold_reached { + debug!("Quorum threshold reached for round {current_round}"); + if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { + warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); + } } } } @@ -1600,7 +1642,7 @@ impl Primary { if current_round < next_round { // If a BFT sender was provided, send the current round to the BFT. let is_ready = if let Some(cb) = self.primary_callback.get() { - cb.update_to_next_round(current_round) + cb.try_advance_to_next_round(current_round) } // Otherwise, handle the Narwhal case. else { @@ -1610,15 +1652,12 @@ impl Primary { true }; - // Log whether the next round is ready. - match is_ready { - true => debug!("Primary is ready to propose the next round"), - false => debug!("Primary is not ready to propose the next round"), - } - - // If the node is ready, propose a batch for the next round. - if is_ready { - self.propose_batch().await?; + // Notify the proposal task if the new round is ready. + if is_ready && self.is_synced() { + debug!("Primary is ready to propose the next round"); + self.is_ready_notify.notify_one(); + } else { + debug!("Primary is not ready to propose the next round"); } } Ok(()) @@ -1651,37 +1690,62 @@ impl Primary { /// Ensure the primary is not creating batch proposals too frequently. /// This checks that the certificate timestamp for the previous round is within the expected range. - fn check_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { + fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { + ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself"); + // Retrieve the timestamp of the previous timestamp to check against. let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago. Some(certificate) => certificate.timestamp(), - None => match self.gateway.account().address() == author { - // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. - true => *self.latest_proposed_batch_timestamp.read(), - // If we do not see a previous certificate for the author, then proceed optimistically. - false => return Ok(()), - }, + // If we do not see a previous certificate for the author, then proceed optimistically. + None => return Ok(()), }; // Determine the elapsed time since the previous timestamp. let elapsed = timestamp .checked_sub(previous_timestamp) .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. - match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago. + match elapsed < MIN_BATCH_DELAY.as_secs() as i64 { true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), false => Ok(()), } } + /// Ensure the primary is not creating batch proposals too frequently. + /// This checks that the certificate timestamp for the previous round is within the expected range. + /// + /// # Returns + /// - `Ok(true)` if the timestamp allows a new proposal. + /// - `Ok(false)` if the timestamp is valid but too soon after the previous proposal. + /// - `Err(err)` if an unexpected error occured, such as the timestamp being before the previous certificate. + fn check_own_proposal_timestamp( + &self, + previous_round: u64, + previous_timestamp: i64, + timestamp: i64, + ) -> Result { + // Determine the elapsed time since the previous timestamp. + let elapsed = timestamp + .checked_sub(previous_timestamp) + .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; + + Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64) + } + /// Stores the certified batch and broadcasts it to all validators, returning the certificate. async fn store_and_broadcast_certificate(&self, proposal: &Proposal, committee: &Committee) -> Result<()> { // Create the batch certificate and transmissions. let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?; + // Convert the transmissions into a HashMap. // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety. let transmissions = transmissions.into_iter().collect::>(); + + // Store some metadata about the certified batch. + let round = certificate.round(); + let num_transmissions = certificate.transmission_ids().len(); + // Store the certified batch. let (storage, certificate_) = (self.storage.clone(), certificate.clone()); spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; @@ -1689,18 +1753,20 @@ impl Primary { // If a BFT sender was provided, send the certificate to the BFT. if let Some(cb) = self.primary_callback.get() { // Await the callback to continue. - cb.add_new_certificate(certificate.clone()) - .await - .with_context(|| "Failed to add new certificate from primary")?; + cb.add_new_certificate(certificate.clone()).await.with_context(|| { + format!("Failed to insert our newly certified batch for round {round} into the DAG") + })?; } // Broadcast the certified batch to all validators. - self.gateway.broadcast(Event::BatchCertified(certificate.clone().into())); + self.gateway.broadcast(Event::BatchCertified(certificate.into())); + // Log the certified batch. - let num_transmissions = certificate.transmission_ids().len(); - let round = certificate.round(); info!("Our batch with {num_transmissions} transmissions for round {round} was certified!"); - // Increment to the next round. - self.try_increment_to_the_next_round(round + 1).await + + // Wake up the round increment task to re-check quorum. + self.round_increment_notify.notify_one(); + + Ok(()) } /// Inserts the missing transmissions from the proposal into the workers. @@ -1777,6 +1843,8 @@ impl Primary { if let Some(cb) = self.primary_callback.get() { cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?; } + // Wake the round-increment task to re-check quorum. + self.round_increment_notify.notify_one(); } Ok(()) } @@ -1820,7 +1888,9 @@ impl Primary { // If our primary is far behind the peer, update our committee to the batch round. if is_behind_schedule || is_peer_far_in_future { // If the batch round is greater than the current committee round, update the committee. - self.try_increment_to_the_next_round(batch_round).await?; + self.try_increment_to_the_next_round(batch_round) + .await + .with_context(|| "Failed to fast forward current round")?; } // Ensure the primary has all of the transmissions. @@ -2003,7 +2073,10 @@ impl Primary { let proposal_cache = { let proposal = self.proposed_batch.write().take(); let signed_proposals = self.signed_proposals.read().clone(); - let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await); + let latest_round = proposal + .as_ref() + .map(Proposal::round) + .unwrap_or(self.latest_proposed_batch.read().await.map(|(round, _)| round).unwrap_or(0)); let pending_certificates = self.storage.get_pending_certificates(); ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates) }; @@ -2337,7 +2410,7 @@ mod tests { store_certificate_chain(&primary, &accounts, round, &mut rng); // Sleep for a while to ensure the primary is ready to propose the next round. - tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + tokio::time::sleep(MIN_BATCH_DELAY).await; // Generate a solution and a transaction. let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); @@ -2404,7 +2477,7 @@ mod tests { } // Sleep for a while to ensure the primary is ready to propose the next round. - tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + tokio::time::sleep(MIN_BATCH_DELAY).await; // Advance to the next round. assert!(primary.storage.increment_to_next_round(round).is_ok()); @@ -2469,7 +2542,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2508,7 +2581,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2548,7 +2621,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2585,7 +2658,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2631,7 +2704,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2687,7 +2760,7 @@ mod tests { .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) .expect("No previous proposal exists") .timestamp(); - let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; + let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1; let proposal = create_test_proposal( &peer_account.1, @@ -2741,7 +2814,7 @@ mod tests { let peer_account = &accounts[2]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); @@ -2798,15 +2871,17 @@ mod tests { primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); // Set the proposal lock to a round ahead of the storage. - let old_proposal_lock_round = *primary.propose_lock.lock().await; - *primary.propose_lock.lock().await = round + 1; + let (old_proposal_round, old_proposal_timestamp) = + primary.latest_proposed_batch.read().await.map(|(round, timestamp)| (round, timestamp)).unwrap_or((0, 0)); + *primary.latest_proposed_batch.write().await = + Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64)); // Propose a batch and enforce that it fails. assert!(primary.propose_batch().await.is_ok()); assert!(primary.proposed_batch.read().is_none()); // Set the proposal lock back to the old round. - *primary.propose_lock.lock().await = old_proposal_lock_round; + *primary.latest_proposed_batch.write().await = Some((old_proposal_round, old_proposal_timestamp)); // Try to propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); @@ -2851,7 +2926,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2875,6 +2950,8 @@ mod tests { // Check the certificate was created and stored by the primary. assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); + // Manually attempt round advancement (because the handler is not running). + primary.try_increment_to_the_next_round(round + 1).await.unwrap(); // Check the round was incremented. assert_eq!(primary.current_round(), round + 1); } @@ -2914,6 +2991,8 @@ mod tests { // Check the certificate was created and stored by the primary. assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); + // Manually attempt round advancement (because the handler is not running). + primary.try_increment_to_the_next_round(round + 1).await.unwrap(); // Check the round was incremented. assert_eq!(primary.current_round(), round + 1); } @@ -2926,7 +3005,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2964,7 +3043,7 @@ mod tests { let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); // Create a valid proposal. - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index e1cab24b23..5636b67a3e 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -15,7 +15,7 @@ use crate::{ Gateway, - MAX_FETCH_TIMEOUT_IN_MS, + MAX_FETCH_TIMEOUT, Transport, events::{CertificateRequest, CertificateResponse, Event}, helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, @@ -23,7 +23,6 @@ use crate::{ spawn_blocking, }; -use snarkos_node_network::PeerPoolHandling; use snarkos_node_sync::{BftSyncMode, BlockSync, InsertBlockResponseError, Ping, locators::BlockLocators}; use snarkos_utilities::CallbackHandle; @@ -53,7 +52,10 @@ use std::{ }; #[cfg(not(feature = "locktick"))] use tokio::sync::Mutex as TMutex; -use tokio::{sync::oneshot, task::JoinHandle}; +use tokio::{ + sync::{Notify, oneshot}, + task::JoinHandle, +}; /// This callback trait allows listening to synchronization updates, such as discorvering new `BatchCertificate`s. /// This is currently used by BFT. @@ -105,6 +107,8 @@ pub struct Sync { /// /// Whenever a new block is added to this map, BlockSync::set_sync_height needs to be called. pending_blocks: Arc>>>, + /// Notified after sync progress when the node is synced; used by [`Self::wait_for_synced`]. + synced_notify: Arc, } impl Sync { @@ -133,9 +137,22 @@ impl Sync { handles: Default::default(), response_lock: Default::default(), pending_blocks: Default::default(), + synced_notify: Default::default(), } } + /// Waits until the node is synced (has connected peers and is block-synced). + /// Returns immediately if already synced. + pub async fn wait_for_synced(&self) { + self.block_sync.wait_for_synced().await; + } + + /// Returns `None` if the node is already synced. + /// Otherwise, returns a future that completes once the node becomes synced. + pub fn wait_for_synced_if_syncing(&self) -> Option> { + self.block_sync.wait_for_synced_if_syncing() + } + /// Initializes the sync module and sync the storage with the ledger at bootup. pub fn initialize(&self, sync_callback: Option>>) -> Result<()> { // If a callback was provided, set it. @@ -205,7 +222,7 @@ impl Sync { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(MAX_FETCH_TIMEOUT).await; // Remove the expired pending transmission requests. let self__ = self_.clone(); @@ -493,6 +510,11 @@ impl Sync { } }; + // When we are synced, wake waiters in [`Self::wait_for_synced`]. + if self.is_synced() { + self.synced_notify.notify_waiters(); + } + if let Some(ping) = &ping && new_blocks { @@ -984,12 +1006,6 @@ impl Sync { impl Sync { /// Returns `true` if the node is synced and has connected peers. pub fn is_synced(&self) -> bool { - // Ensure the validator is connected to other validators, - // not just clients. - if self.gateway.number_of_connected_peers() == 0 { - return false; - } - self.block_sync.is_block_synced() } @@ -1041,7 +1057,7 @@ impl Sync { } // Wait for the certificate to be fetched. // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators. - tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) + tokio::time::timeout(MAX_FETCH_TIMEOUT, callback_receiver) .await .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))? .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id))) diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index ba9265f1ba..120e8ca3a2 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -14,7 +14,7 @@ // limitations under the License. use crate::{ - MAX_FETCH_TIMEOUT_IN_MS, + MAX_FETCH_TIMEOUT, MAX_WORKERS, ProposedBatch, Transport, @@ -40,7 +40,7 @@ use locktick::parking_lot::{Mutex, RwLock}; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; use rand::seq::IteratorRandom; -use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{future::Future, net::SocketAddr, sync::Arc}; use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; /// A worker's main role is maintaining a queue of verified ("ready") transmissions, @@ -441,7 +441,7 @@ impl Worker { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(MAX_FETCH_TIMEOUT).await; // Remove the expired pending certificate requests. let self__ = self_.clone(); @@ -519,9 +519,9 @@ impl Worker { self.format_transmission_id(transmission_id) ); } - // Wait for the transmission to be fetched. - let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) + // Wait for the transmission to be fetched. + let transmission = timeout(MAX_FETCH_TIMEOUT, callback_receiver) .await .with_context(|| { format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id)) @@ -603,7 +603,7 @@ mod tests { use bytes::Bytes; use mockall::mock; - use std::{io, ops::Range}; + use std::{io, ops::Range, time::Duration}; type CurrentNetwork = snarkvm::prelude::MainnetV0; diff --git a/node/bft/tests/bft_e2e.rs b/node/bft/tests/bft_e2e.rs index 93115a3215..84e732e1fa 100644 --- a/node/bft/tests/bft_e2e.rs +++ b/node/bft/tests/bft_e2e.rs @@ -21,11 +21,11 @@ mod components; use crate::common::primary::{TestNetwork, TestNetworkConfig}; use deadline::deadline; use itertools::Itertools; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::MAX_FETCH_TIMEOUT; use std::time::Duration; use tokio::time::sleep; -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "long-running e2e test"] async fn test_state_coherence() { const N: u16 = 4; @@ -37,8 +37,6 @@ async fn test_state_coherence() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: true, }) }) @@ -50,7 +48,7 @@ async fn test_state_coherence() { std::future::pending::<()>().await; } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "fails"] async fn test_resync() { // Start N nodes, connect them and start the cannons for each. @@ -62,8 +60,6 @@ async fn test_resync() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: false, }) }) @@ -83,7 +79,6 @@ async fn test_resync() { bft: true, connect_all: false, fire_transmissions: None, - log_level: None, log_connections: false, }); spare_network.start().await; @@ -100,7 +95,7 @@ async fn test_resync() { deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(RECOVERY_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_threshold() { // Start N nodes but don't connect them. const N: u16 = 4; @@ -112,8 +107,6 @@ async fn test_quorum_threshold() { bft: true, connect_all: false, fire_transmissions: None, - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -129,7 +122,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -140,7 +133,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -158,7 +151,7 @@ async fn test_quorum_threshold() { deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_break() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -169,8 +162,6 @@ async fn test_quorum_break() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -192,7 +183,8 @@ async fn test_quorum_break() { assert!(network.is_halted().await); } -#[tokio::test(flavor = "multi_thread")] +/// Tests that the all validators agree on the same leader for every even round. +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_leader_election_consistency() { // The minimum and maximum rounds to check for leader consistency. // From manual experimentation, the minimum round that works is 4. @@ -209,8 +201,6 @@ async fn test_leader_election_consistency() { bft: true, connect_all: true, fire_transmissions: Some(CANNON_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -249,11 +239,11 @@ async fn test_leader_election_consistency() { println!("Found {} validators with a leader ({} out of sync)", leaders.len(), validators.len() - leaders.len()); // Assert that all leaders are equal - assert!(leaders.iter().all_equal()); + assert!(leaders.iter().all_equal(), "Leaders are not equal: {leaders:?} for round {target_round}"); } } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "run multiple times to see failure"] async fn test_transient_break() { // Start N nodes, connect them and start the cannons for each. @@ -265,8 +255,6 @@ async fn test_transient_break() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(6), log_connections: false, }) }) diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index bc29763f65..ff41e4d194 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -16,13 +16,13 @@ use crate::common::{ CurrentNetwork, TranslucentLedgerService, - utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger}, + utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions}, }; use snarkos_account::Account; use snarkos_node_bft::{ BFT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, MEMORY_POOL_PORT, Primary, helpers::{PrimarySender, Storage, init_primary_channels}, @@ -78,8 +78,6 @@ pub struct TestNetworkConfig { pub connect_all: bool, /// If `Some(i)` is set, the cannons will fire every `i` milliseconds. pub fire_transmissions: Option, - /// The log level to use for the test. - pub log_level: Option, /// If this is set to `true`, the number of connections is logged every 5 seconds. pub log_connections: bool, } @@ -140,10 +138,6 @@ impl TestNetwork { pub fn new(config: TestNetworkConfig) -> Self { let mut rng = TestRng::default(); - if let Some(log_level) = config.log_level { - initialize_logger(log_level); - } - let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng); let bonded_balances: IndexMap<_, _> = committee .members() @@ -259,9 +253,10 @@ impl TestNetwork { pub async fn connect_validators(&self, first_id: u16, second_id: u16) { let first_validator = self.validators.get(&first_id).unwrap(); let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip(); - let _ = first_validator.primary.gateway().connect(second_validator_ip); - // Give the connection time to be established. - sleep(Duration::from_millis(100)).await; + let gateway = first_validator.primary.gateway(); + let handle = gateway.connect(second_validator_ip).expect("connection attempt failed"); + // Await the full TCP + handshake completion instead of relying on a fixed sleep. + handle.await.unwrap().expect("connecting validators failed"); } // Connects all nodes to each other. @@ -323,7 +318,7 @@ impl TestNetwork { // Checks if all the nodes have stopped progressing. pub async fn is_halted(&self) -> bool { let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap(); - sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await; + sleep(MAX_BATCH_DELAY * 2).await; self.validators.values().all(|v| v.primary.current_round() <= halt_round) } diff --git a/node/bft/tests/narwhal_e2e.rs b/node/bft/tests/narwhal_e2e.rs index 177db6a159..608d379184 100644 --- a/node/bft/tests/narwhal_e2e.rs +++ b/node/bft/tests/narwhal_e2e.rs @@ -17,14 +17,14 @@ mod common; use crate::common::primary::{TestNetwork, TestNetworkConfig}; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::MAX_FETCH_TIMEOUT; use std::time::Duration; use deadline::deadline; use tokio::time::sleep; -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "long-running e2e test"] async fn test_state_coherence() { const N: u16 = 4; @@ -36,8 +36,6 @@ async fn test_state_coherence() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: true, }) }) @@ -52,7 +50,7 @@ async fn test_state_coherence() { std::future::pending::<()>().await; } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_threshold() { // Start N nodes but don't connect them. const N: u16 = 4; @@ -64,8 +62,6 @@ async fn test_quorum_threshold() { bft: false, connect_all: false, fire_transmissions: None, - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -81,7 +77,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -92,7 +88,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -110,7 +106,7 @@ async fn test_quorum_threshold() { deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_break() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -121,8 +117,6 @@ async fn test_quorum_break() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -144,7 +138,7 @@ async fn test_quorum_break() { assert!(network.is_halted().await); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_storage_coherence() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -155,8 +149,6 @@ async fn test_storage_coherence() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 04d50800a4..1774b00030 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -27,7 +27,7 @@ extern crate snarkos_node_metrics as metrics; use snarkos_account::Account; use snarkos_node_bft::{ BFT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, Primary, helpers::{ ConsensusReceiver, @@ -64,8 +64,11 @@ use locktick::parking_lot::{Mutex, RwLock}; use lru::LruCache; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; -use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration}; -use tokio::{sync::oneshot, task::JoinHandle}; +use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc}; +use tokio::{ + sync::{Notify, oneshot}, + task::JoinHandle, +}; #[cfg(feature = "metrics")] use std::collections::HashMap; @@ -113,6 +116,8 @@ pub struct Consensus { ping: Arc>, /// The block sync logic. block_sync: Arc>, + /// Notifies when a block is committed, and relays it to the primary. + block_commit_notify: Arc, } impl Consensus { @@ -163,6 +168,7 @@ impl Consensus { transmissions_tracker: Default::default(), handles: Default::default(), ping: ping.clone(), + block_commit_notify: Arc::new(Notify::new()), }; info!("Starting the consensus instance..."); @@ -501,13 +507,17 @@ impl Consensus { // Process the unconfirmed transactions in the memory pool. // - // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted - // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions. + // This loop wakes up either when a block is committed (signaled via notify) or after a timeout. + // When a block is committed, the BFT workers have freed capacity for more transmissions. + // The timeout serves as a fallback for startup or idle periods when blocks are not being produced. let self_ = self.clone(); self.spawn(async move { loop { - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; + // Wait for either a block commit notification or timeout. + tokio::select! { + _ = self_.block_commit_notify.notified() => {} + _ = tokio::time::sleep(MAX_BATCH_DELAY) => {} + } // Process the unconfirmed transactions in the memory pool. if let Err(err) = self_.process_unconfirmed_transactions().await { warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions"))); @@ -538,12 +548,23 @@ impl Consensus { let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") }; // If the block failed to advance, reinsert the transmissions into the memory pool. - if result.is_err() { - // On failure, reinsert the transmissions into the memory pool. - self.reinsert_transmissions(transmissions).await; + match result { + Ok(true) => { + // On success, notify that the BFT workers have freed capacity for more transmissions. + self.block_commit_notify.notify_one(); + } + Ok(false) | Err(_) => { + if let Err(err) = self.reinsert_transmissions(transmissions).await { + error!( + "{}", + flatten_error( + err.context("Failed to reinsert transmissions after unsuccessful block advancement") + ) + ); + } + } } - // Send the callback **after** advancing to the next block. - // Note: We must await the block to be advanced before sending the callback. + callback.send(result).ok(); } @@ -551,7 +572,7 @@ impl Consensus { /// /// # Returns /// - `Ok(true)` if the ledger was advanced to the next block. - /// - `Ok(false)` if the block may be valide but the ledger already advanced. + /// - `Ok(false)` if the block may be valid but the ledger already advanced. /// - `Err(anyhow::Error)` for incorrect blocks and any other error. fn try_advance_to_next_block( &self, @@ -568,10 +589,18 @@ impl Consensus { // Create the candidate next block. let ledger_update = self.ledger.begin_ledger_update()?; - let block = match ledger_update - .prepare_advance_to_next_quorum_block(subdag, transmissions) - .and_then(|block| ledger_update.check_next_block(block)) - { + let prepare_instant = std::time::Instant::now(); + let block = match ledger_update.prepare_advance_to_next_quorum_block(subdag, transmissions) { + Ok(block) => block, + Err(err) => return Err(err.into_anyhow()), + }; + let prepare_elapsed = prepare_instant.elapsed(); + trace!("prepare_advance_to_next_quorum_block took {:.3}s", prepare_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::PREPARE_ADVANCE_SECS, prepare_elapsed.as_secs_f64()); + + let check_instant = std::time::Instant::now(); + let block = match ledger_update.check_next_block(block) { Ok(block) => block, Err(CheckBlockError::BlockAlreadyExists { .. }) => { debug!("The given block hash already exists in the ledger"); @@ -587,11 +616,20 @@ impl Consensus { } Err(err) => return Err(err.into_anyhow()), }; + let check_elapsed = check_instant.elapsed(); + trace!("check_next_block took {:.3}s", check_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::CHECK_NEXT_BLOCK_SECS, check_elapsed.as_secs_f64()); let block_height = block.height(); // Advance to the next block. + let advance_instant = std::time::Instant::now(); ledger_update.advance_to_next_block(&block)?; + let advance_elapsed = advance_instant.elapsed(); + trace!("advance_to_next_block took {:.3}s", advance_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::ADVANCE_TO_NEXT_BLOCK_SECS, advance_elapsed.as_secs_f64()); #[cfg(feature = "telemetry")] // Fetch the latest committee. @@ -648,27 +686,29 @@ impl Consensus { // If telemetry is enabled, update participation scores. #[cfg(feature = "telemetry")] - match latest_committee { - Ok(latest_committee) => { - // Retrieve the latest participation scores. - let participation_scores = self - .bft() - .primary() - .gateway() - .validator_telemetry() - .get_participation_scores(&latest_committee); - - // Log the participation scores. - for (address, participation_score) in participation_scores { - metrics::histogram_label( - metrics::consensus::VALIDATOR_PARTICIPATION, - "validator_address", - address.to_string(), - participation_score, - ) + { + match latest_committee { + Ok(latest_committee) => { + // Retrieve the latest participation scores. + let participation_scores = self + .bft() + .primary() + .gateway() + .validator_telemetry() + .get_participation_scores(&latest_committee); + + // Log the participation scores. + for (address, participation_score) in participation_scores { + metrics::histogram_label( + metrics::consensus::VALIDATOR_PARTICIPATION, + "validator_address", + address.to_string(), + participation_score, + ) + } } + Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))), } - Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))), } } @@ -676,14 +716,14 @@ impl Consensus { } /// Reinserts the given transmissions into the memory pool. - async fn reinsert_transmissions(&self, transmissions: IndexMap, Transmission>) { + async fn reinsert_transmissions(&self, transmissions: IndexMap, Transmission>) -> Result<()> { // Iterate over the transmissions. for (transmission_id, transmission) in transmissions.into_iter() { // Reinsert the transmission into the memory pool. match self.reinsert_transmission(transmission_id, transmission).await { Ok(true) => {} Ok(false) => debug!( - "Unable to reinsert transmission {}.{} into the memory pool. Already exists.", + "Unable to reinsert transmission {}:{} into the memory pool. Already exists.", fmt_id(transmission_id), fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() ), @@ -697,6 +737,8 @@ impl Consensus { } } } + + Ok(()) } /// Reinserts the given transmission into the memory pool. diff --git a/node/metrics/src/names.rs b/node/metrics/src/names.rs index faace75150..71050a51fd 100644 --- a/node/metrics/src/names.rs +++ b/node/metrics/src/names.rs @@ -47,8 +47,15 @@ pub(super) const GAUGE_NAMES: [&str; 28] = [ tcp::TCP_TASKS, ]; -pub(super) const HISTOGRAM_NAMES: [&str; 4] = - [bft::COMMIT_ROUNDS_LATENCY, consensus::CERTIFICATE_COMMIT_LATENCY, consensus::BLOCK_LATENCY, consensus::BLOCK_LAG]; +pub(super) const HISTOGRAM_NAMES: [&str; 7] = [ + bft::COMMIT_ROUNDS_LATENCY, + consensus::CERTIFICATE_COMMIT_LATENCY, + consensus::BLOCK_LATENCY, + consensus::BLOCK_LAG, + consensus::PREPARE_ADVANCE_SECS, + consensus::CHECK_NEXT_BLOCK_SECS, + consensus::ADVANCE_TO_NEXT_BLOCK_SECS, +]; pub mod bft { pub const COMMIT_ROUNDS_LATENCY: &str = "snarkos_bft_commit_rounds_latency_secs"; // <-- This one doesn't even make sense. @@ -84,6 +91,12 @@ pub mod consensus { pub const COMMITTED_CERTIFICATES: &str = "snarkos_consensus_committed_certificates_total"; pub const BLOCK_LATENCY: &str = "snarkos_consensus_block_latency_secs"; pub const BLOCK_LAG: &str = "snarkos_consensus_block_lag_ms"; + /// Time spent in prepare_advance_to_next_quorum_block (block construction). + pub const PREPARE_ADVANCE_SECS: &str = "snarkos_consensus_prepare_advance_secs"; + /// Time spent in check_next_block. + pub const CHECK_NEXT_BLOCK_SECS: &str = "snarkos_consensus_check_next_block_secs"; + /// Time spent in advance_to_next_block (ledger write). + pub const ADVANCE_TO_NEXT_BLOCK_SECS: &str = "snarkos_consensus_advance_to_next_block_secs"; pub const UNCONFIRMED_TRANSACTIONS: &str = "snarkos_consensus_unconfirmed_transactions_total"; pub const UNCONFIRMED_SOLUTIONS: &str = "snarkos_consensus_unconfirmed_solutions_total"; pub const TRANSMISSION_LATENCY: &str = "snarkos_consensus_transmission_latency"; diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index 19610ded1c..9ac6b2bc41 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -17,6 +17,7 @@ use crate::{ helpers::{PeerPair, PrepareSyncRequest, SyncRequest}, locators::BlockLocators, }; +use futures::future::BoxFuture; use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService}; use snarkos_node_router::messages::DataBlocks; use snarkos_node_sync_communication_service::CommunicationService; @@ -29,6 +30,7 @@ use snarkvm::{ }; use anyhow::{Context, Result, anyhow, bail, ensure}; +use futures::FutureExt; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; #[cfg(feature = "locktick")] @@ -228,6 +230,9 @@ pub struct BlockSync { /// Tracks failed requests that need to be re-issued. failed_requests: Mutex>, + + /// Condition variable that wakes up waiting tasks when the node is synced. + synced_notify: Notify, } impl BlockSync { @@ -248,6 +253,7 @@ impl BlockSync { metrics: Default::default(), prepare_requests_lock: Default::default(), failed_requests: Default::default(), + synced_notify: Default::default(), } } @@ -255,6 +261,9 @@ impl BlockSync { /// or block request has been fully processed (either successfully or unsuccessfully). /// /// Used by the outgoing task. + /// + /// # Concurrency + /// Only one task can wait on this at a time. pub async fn wait_for_peer_update(&self) { self.peer_notify.notified().await } @@ -262,6 +271,9 @@ impl BlockSync { /// Blocks until there is a new response to a block request. /// /// Used by the incoming task. + /// + /// # Concurrency + /// Only one task can wait on this at a time. pub async fn wait_for_block_responses(&self) { self.response_notify.notified().await } @@ -272,6 +284,56 @@ impl BlockSync { self.sync_state.read().is_block_synced() } + /// This futures blocks until the node is synced. + /// + /// # Concurrency + /// Multiple tasks can wait on this at the same time safely. + pub async fn wait_for_synced(&self) { + loop { + let mut fut = std::pin::pin!(self.synced_notify.notified()); + + { + let sync_state = self.sync_state.read(); + if sync_state.is_block_synced() { + return; + } + + // Register this task as waiting before dropping the lock. + fut.as_mut().enable(); + } + + fut.await; + } + } + + /// Similar as [`Self::wait_for_synced`] but returns `None` if the node is already synced. + /// Otherwise, it will return a future that behaves like `wait_for_synced`. + /// + /// # Concurrency + /// * This method is atomic, unlike calling `is_synced` and `wait_for_synced` sequentially. + /// * Multiple tasks can wait on this at the same time safely. + pub fn wait_for_synced_if_syncing(&self) -> Option> { + let mut notified = Box::pin(self.synced_notify.notified()); + + { + let sync_state = self.sync_state.read(); + if sync_state.is_block_synced() { + return None; + } + + // Register this task as waiting before dropping the lock. + notified.as_mut().enable(); + } + + Some( + async move { + notified.await; + self.wait_for_synced().await; + } + .boxed(), + ) + } + /// Returns the number of blocks the node is behind the greatest peer height, /// or `None` if no peers are connected yet. #[inline] @@ -865,11 +927,22 @@ impl BlockSync { } // -- Finally, update sync state and notify the sync loop about the change. -- - if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { - self.sync_state.write().set_greatest_peer_height(greatest_peer_height); + let is_synced = if let Some(greatest_peer_height) = + self.locators.read().values().map(|l| l.latest_locator_height()).max() + { + let mut sync_state = self.sync_state.write(); + sync_state.set_greatest_peer_height(greatest_peer_height); + sync_state.is_block_synced() } else { error!("Got new block locators but greatest peer height is zero."); + false + }; + + // For the unlikely case a peer's height gets lowered. + if is_synced { + self.synced_notify.notify_waiters(); } + // Even if the greatest peer height did not change, we still received new block locators // that the sync loop might need to proceed. self.peer_notify.notify_one(); @@ -893,12 +966,25 @@ impl BlockSync { // Remove all block requests to the peer. self.remove_block_requests_to_peer(peer_ip); - // Update sync state, because the greatest peer height may have decreased. - if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { - self.sync_state.write().set_greatest_peer_height(greatest_peer_height); - } else { - // There are no more peers left. - self.sync_state.write().clear_greatest_peer_height(); + let synced = { + // Do not lock sync state and locators at the same time. + let max_height = self.locators.read().values().map(|l| l.latest_locator_height()).max(); + let mut sync_state = self.sync_state.write(); + + // Update sync state, because the greatest peer height may have decreased. + if let Some(greatest_peer_height) = max_height { + sync_state.set_greatest_peer_height(greatest_peer_height); + } else { + // There are no more peers left. + sync_state.clear_greatest_peer_height(); + } + + sync_state.is_block_synced() + }; + + // For the case where the maximum peer height gets lowered. + if synced { + self.synced_notify.notify_waiters(); } // Notify the sync loop that something changed. @@ -1109,15 +1195,19 @@ impl BlockSync { /// This is a no-op if `new_height` is equal or less to the current sync height. pub fn set_sync_height(&self, new_height: u32) { // Scope state lock to avoid locking state and metrics at the same time. - let fully_synced = { + let (synced, fully_synced) = { let mut state = self.sync_state.write(); state.set_sync_height(new_height); - !state.can_issue_new_block_requests() + (state.is_block_synced(), !state.can_issue_new_block_requests()) }; if fully_synced { self.metrics.mark_fully_synced(); } + + if synced { + self.synced_notify.notify_waiters(); + } } /// Inserts a block request for the given height. @@ -1690,6 +1780,7 @@ mod tests { common_ancestors: RwLock::new(sync.common_ancestors.read().clone()), requests: RwLock::new(sync.requests.read().clone()), sync_state: RwLock::new(sync.sync_state.read().clone()), + synced_notify: Notify::new(), advance_with_sync_blocks_lock: Default::default(), metrics: Default::default(), prepare_requests_lock: Default::default(),