diff --git a/.cargo/audit.toml b/.cargo/audit.toml index bef88d3032..b2901331db 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -13,4 +13,6 @@ ignore = [ "RUSTSEC-2026-0002", # TODO remove once we migrate away from bincode, or it becomes maintained again. "RUSTSEC-2025-0141", + # TODO remove once termwiz/terminfo update to phf 0.13+, which drops the rand 0.8 dependency. + "RUSTSEC-2026-0097", ] diff --git a/.circleci/config.yml b/.circleci/config.yml index 460481af4e..aee3b17465 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -797,7 +797,7 @@ workflows: filters: branches: only: - - ci/reenable-check-logs + - fix/batch-triggers - canary - testnet - mainnet @@ -807,7 +807,7 @@ workflows: filters: branches: only: - - ci/reenable-check-logs + - fix/batch-triggers - canary - testnet - mainnet diff --git a/Cargo.lock b/Cargo.lock index b2e1d40d2e..384748aa05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,7 +157,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -168,7 +168,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -737,7 +737,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -1495,7 +1495,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4092,7 +4092,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -4151,7 +4151,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -6044,7 +6044,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -6281,7 +6281,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -6547,9 +6547,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.1" +version = "1.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" +checksum = "a91135f59b1cbf38c91e73cf3386fca9bb77915c45ce2771460c9d92f0f3d776" dependencies = [ "bytes", "libc", @@ -7374,7 +7374,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] diff --git a/node/bft/README.md b/node/bft/README.md index 1dd4a55e7b..acfa2bf18b 100644 --- a/node/bft/README.md +++ b/node/bft/README.md @@ -10,26 +10,44 @@ The `snarkos-node-bft` crate provides a node implementation for a BFT-based memo The primary is the coordinator, responsible for advancing rounds and broadcasting the anchor. -#### Triggering Round Advancement - -Each round runs until one of two conditions is met: -1. The coinbase target has been reached, or -2. The round has reached its timeout (currently set to 10 seconds) - -#### Advancing Rounds - -As described in the paper [Bullshark: The Partially Synchronous Version](https://arxiv.org/abs/2209.05633), -the BFT generally advances rounds when `n − f` vertices are delivered, however: -``` -The problem in advancing rounds whenever n − f vertices are delivered is that parties -might not vote for the anchor even if the party that broadcast it is just slightly slower -than the fastest n − f parties. To deal with this, the BFT integrates timeouts into -the DAG construction. If the first n − f vertices a party p gets in an even-numbered round r -do not include the anchor of round r, then p sets a timer and waits for the anchor -until the timer expires. Similarly, in an odd-numbered round, parties wait for either -f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a timeout. -``` -Note that in this quote `2f + 1` should really be `n - f`. +#### Round Advancement + +A round advances once a quorum (`n - f`) of validators have submitted certificates for that round +and the following round-type-specific conditions are met: + +- **Even rounds**: the elected leader's certificate is present among the quorum, confirming the + leader was reachable. If the leader's certificate is absent, the node waits up to + `MAX_LEADER_CERTIFICATE_DELAY` before advancing without it. +- **Odd rounds**: at least `f + 1` certificates from the current round reference the previous + even round's leader certificate (availability threshold), or `n - f` do not (non-leader + quorum). If neither threshold is reached, the node again falls back to the timeout. + +In both cases the timeout is `MAX_LEADER_CERTIFICATE_DELAY` (currently 5 seconds), reset at the +start of each round. This follows the [Bullshark](https://arxiv.org/abs/2209.05633) protocol. + +#### Batch Proposal + +Batch proposals are driven by a dedicated **`ProposalTask`** that runs in a loop and is the only place that calls `Primary::propose_batch()`. +This keeps proposal on a single execution path and avoids concurrent proposal attempts. Each loop iteration covers one full round and proceeds through three stages: + +**Stage 1 — Wait until ready to propose** + +The task blocks until all of the following conditions are satisfied: +1. The node is synced. If it is currently syncing, the task waits via `wait_for_synced_if_syncing()` before continuing. +2. `MIN_BATCH_DELAY` has elapsed since the start of the round, enforcing a minimum inter-proposal interval. +3. One of two events fires: + - **Ready signal** — `ProposalTask::signal()` is called from `try_increment_to_the_next_round()` when the primary successfully advances to a new round (e.g. after a leader certificate is committed). This is delivered via a `watch` channel. + - **`MAX_BATCH_DELAY` timeout** — If no signal arrives within `MAX_BATCH_DELAY` of the round start, the task proceeds anyway. This handles the case where the elected leader's certificate never arrives. + +A short `CREATE_BATCH_INTERVAL` heartbeat keeps the round-change check alive while waiting. + +**Stage 2 — Propose** + +The task calls `propose_batch()` in a loop until it returns `Ok(true)` (batch submitted). On `Ok(false)` or a transient error it retries every `CREATE_BATCH_INTERVAL`. If the round advances during retries, the task restarts from Stage 1. + +**Stage 3 — Wait for signatures** + +Once the batch is broadcast, the task periodically calls `propose_batch()` every `MAX_BATCH_DELAY` to rebroadcast to any validators that have not yet signed. It exits this stage as soon as the round advances (detected either via the ready signal or by polling `current_round()`). ### Ledger Advancement diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 0ed2d338da..95cbb4c70e 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -14,7 +14,7 @@ // limitations under the License. use crate::{ - MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, + MAX_LEADER_CERTIFICATE_DELAY, helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now}, primary::{Primary, PrimaryCallback}, sync::SyncCallback, @@ -42,6 +42,8 @@ use colored::Colorize; use indexmap::{IndexMap, IndexSet}; #[cfg(feature = "locktick")] use locktick::parking_lot::RwLock; +#[cfg(feature = "locktick")] +use locktick::tokio::Mutex; #[cfg(not(feature = "locktick"))] use parking_lot::RwLock; use std::{ @@ -52,6 +54,8 @@ use std::{ atomic::{AtomicI64, Ordering}, }, }; +#[cfg(not(feature = "locktick"))] +use tokio::sync::Mutex; use tokio::sync::{OnceCell, oneshot}; #[derive(Clone)] @@ -66,6 +70,12 @@ pub struct BFT { leader_certificate_timer: Arc, /// The consensus sender. consensus_sender: Arc>>, + /// Ensures only one call to `commit_leader_certificate` runs at a time. + /// + /// Without this, a second certificate crossing the availability threshold while the consensus + /// callback for a prior commit is still in-flight would re-walk already-committed rounds + /// (because `last_committed_round` hasn't been updated yet), causing duplicate subdag commits. + commit_lock: Arc>, } impl BFT { @@ -98,6 +108,7 @@ impl BFT { leader_certificate: Default::default(), leader_certificate_timer: Default::default(), consensus_sender: Default::default(), + commit_lock: Default::default(), }) } @@ -207,7 +218,13 @@ impl BFT { #[async_trait::async_trait] impl PrimaryCallback for BFT { /// Notification that a new round has started. - fn update_to_next_round(&self, current_round: u64) -> bool { + /// + /// # Arguments + /// * `current_round` - the round the caller is in (to avoid race conditions) + /// + /// # Returns + /// `true` if the BFT moved to the next round. + fn try_advance_to_next_round(&self, current_round: u64) -> bool { // Ensure the current round is at least the storage round (this is a sanity check). let storage_round = self.storage().current_round(); if current_round < storage_round { @@ -482,7 +499,7 @@ impl BFT { /// /// This is always true for a new BFT instance. fn is_timer_expired(&self) -> bool { - self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now() + self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now() } /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions: @@ -580,21 +597,40 @@ impl BFT { #[cfg(debug_assertions)] trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round()); + // Serialize all commits so that `last_committed_round` is up-to-date before the next call + // re-walks the DAG, preventing duplicate subdag commits. + let _commit_guard = self.commit_lock.lock().await; + // Fetch the leader round. let latest_leader_round = leader_certificate.round(); + // Determine the list of all previous leader certificates since the last committed round. // The order of the leader certificates is from **newest** to **oldest**. let mut leader_certificates = vec![leader_certificate.clone()]; + // Whether the consensus callback should be skipped (true when the round is already committed). + // When `latest_leader_round == last_committed_round` the round was already committed by a + // concurrent call that beat us to the lock, or by a prior session whose DAG state was + // reconstructed without populating `recently_committed`. In both cases we still re-run + // DFS + GC to ensure `recently_committed` and `gc_round` are populated, but we must NOT + // send a duplicate subdag to the consensus callback. + let skip_consensus; { - // Retrieve the leader round and the latest round we committed. - let leader_round = leader_certificate.round(); - // Read-lock the DAG. // We need to hold the lock, so we do not later fail to re-acquire it. let dag = self.dag.read(); + // Re-check under the lock: another call may have committed this round while we were waiting. + if latest_leader_round < dag.last_committed_round() { + trace!("Skipping already-committed leader round {latest_leader_round}"); + return Ok(()); + } + skip_consensus = latest_leader_round == dag.last_committed_round(); + + #[cfg(debug_assertions)] + trace!("Attempting to commit leader certificate for round {}...", latest_leader_round); + let mut current_certificate = leader_certificate; - for round in (dag.last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2) { + for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) { // Retrieve the previous committee for the leader round. let previous_committee_lookback = self.ledger().get_committee_lookback_for_round(round).with_context(|| { @@ -721,25 +757,28 @@ impl BFT { "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}", ); - // Trigger consensus. - if let Some(consensus_sender) = self.consensus_sender.get() { - // Initialize a callback sender and receiver. - let (callback_sender, callback_receiver) = oneshot::channel(); - // Send the subdag and transmissions to consensus. - consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?; - // Await the callback to continue. - match callback_receiver.await { - Ok(Ok(_)) => (), - Ok(Err(err)) => { - let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}")); - error!("{}", &flatten_error(err)); - return Ok(()); - } - Err(err) => { - let err: anyhow::Error = err.into(); - let err = err.context(format!("BFT failed to receive the callback for round {anchor_round}")); - error!("{}", flatten_error(err)); - return Ok(()); + // Trigger consensus (skipped if the round was already committed by a prior call). + if !skip_consensus { + if let Some(consensus_sender) = self.consensus_sender.get() { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the subdag and transmissions to consensus. + consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?; + // Await the callback to continue. + match callback_receiver.await { + Ok(Ok(_)) => (), + Ok(Err(err)) => { + let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}")); + error!("{}", &flatten_error(err)); + return Ok(()); + } + Err(err) => { + let err: anyhow::Error = err.into(); + let err = + err.context(format!("BFT failed to receive the callback for round {anchor_round}")); + error!("{}", flatten_error(err)); + return Ok(()); + } } } } @@ -881,7 +920,7 @@ impl BFT { mod tests { use crate::{ BFT, - MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, + MAX_LEADER_CERTIFICATE_DELAY, PrimaryCallback, helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round}, sync::SyncCallback, @@ -1114,9 +1153,7 @@ mod tests { assert!(!result); } // Wait for the timer to expire. - let leader_certificate_timeout = - std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000); - std::thread::sleep(leader_certificate_timeout); + std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY); // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round. let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2); if bft_timer.is_timer_expired() { diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index ed2ea72e3b..a406ddb354 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -17,7 +17,7 @@ use crate::helpers::Telemetry; use crate::{ CONTEXT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, MEMORY_POOL_PORT, Worker, events::{DisconnectReason, EventCodec, PrimaryPing}, @@ -99,9 +99,9 @@ use tokio_stream::StreamExt; use tokio_util::codec::Framed; /// The maximum interval of events to cache. -const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds +const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds /// The maximum interval of requests to cache. -const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds +const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds /// The maximum number of connection attempts in an interval. #[cfg(not(test))] diff --git a/node/bft/src/helpers/pending.rs b/node/bft/src/helpers/pending.rs index 5dd43224b3..a4466eed52 100644 --- a/node/bft/src/helpers/pending.rs +++ b/node/bft/src/helpers/pending.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{Gateway, MAX_FETCH_TIMEOUT_IN_MS}; +use crate::{Gateway, MAX_FETCH_TIMEOUT}; use snarkos_node_bft_ledger_service::LedgerService; use snarkos_node_network::PeerPoolHandling; use snarkvm::{ @@ -36,8 +36,8 @@ use time::OffsetDateTime; use tokio::sync::oneshot; /// The maximum number of seconds to wait before expiring a callback. -/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds. -pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64; +/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT` when converting to seconds. +pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT.as_secs() as i64; /// Returns the maximum number of redundant requests for the number of validators in the specified round. pub fn max_redundant_requests(ledger: Arc>, round: u64) -> Result { diff --git a/node/bft/src/helpers/timestamp.rs b/node/bft/src/helpers/timestamp.rs index fca6978584..536fb7a593 100644 --- a/node/bft/src/helpers/timestamp.rs +++ b/node/bft/src/helpers/timestamp.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::MAX_TIMESTAMP_DELTA_IN_SECS; +use crate::MAX_TIMESTAMP_DELTA; use snarkvm::prelude::{Result, bail}; use time::OffsetDateTime; @@ -36,7 +36,7 @@ pub fn to_utc_datetime(timestamp: i64) -> OffsetDateTime { /// Sanity checks the timestamp for liveness. pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> { // Ensure the timestamp is within range. - if timestamp > (now() + MAX_TIMESTAMP_DELTA_IN_SECS) { + if timestamp > (now() + MAX_TIMESTAMP_DELTA.as_secs() as i64) { bail!("Timestamp {timestamp} is too far in the future") } Ok(()) @@ -45,17 +45,17 @@ pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> { #[cfg(test)] mod prop_tests { use super::*; - use crate::MAX_TIMESTAMP_DELTA_IN_SECS; + use crate::MAX_TIMESTAMP_DELTA; use proptest::prelude::*; use test_strategy::proptest; fn any_valid_timestamp() -> BoxedStrategy { - (Just(now()), 0..MAX_TIMESTAMP_DELTA_IN_SECS).prop_map(|(now, delta)| now + delta).boxed() + (Just(now()), 0..(MAX_TIMESTAMP_DELTA.as_secs() as i64)).prop_map(|(now, delta)| now + delta).boxed() } fn any_invalid_timestamp() -> BoxedStrategy { - (Just(now()), MAX_TIMESTAMP_DELTA_IN_SECS..).prop_map(|(now, delta)| now + delta).boxed() + (Just(now()), (MAX_TIMESTAMP_DELTA.as_secs() as i64)..).prop_map(|(now, delta)| now + delta).boxed() } #[proptest] diff --git a/node/bft/src/lib.rs b/node/bft/src/lib.rs index 4ff4e409ff..3cdaaf997f 100644 --- a/node/bft/src/lib.rs +++ b/node/bft/src/lib.rs @@ -29,6 +29,8 @@ pub use snarkos_node_bft_events as events; pub use snarkos_node_bft_ledger_service as ledger_service; pub use snarkos_node_bft_storage_service as storage_service; +use std::time::Duration; + pub mod helpers; mod bft; @@ -51,24 +53,40 @@ pub const CONTEXT: &str = "[MemoryPool]"; /// The port on which the memory pool listens for incoming connections. pub const MEMORY_POOL_PORT: u16 = 5000; // port -/// The maximum number of milliseconds to wait before proposing a batch. -pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms -/// The minimum number of seconds to wait before proposing a batch. -pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds -/// The maximum number of milliseconds to wait before timing out on a fetch. -pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms -/// The maximum number of seconds allowed for the leader to send their certificate. -pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds -/// The maximum number of seconds before the timestamp is considered expired. -pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds +/// The maximum time to wait before proposing a batch. +pub const MAX_BATCH_DELAY: Duration = Duration::from_millis(2500); + +/// The minimum time that needs to elapse between two consecutive batch proposals. +/// This creates a lower bound on the block interval, and ensures the network will not be overwhelmed with too many blocks/certificates. +pub const MIN_BATCH_DELAY: Duration = Duration::from_secs(1); + +/// The time a primary waits between attempts to create a new batch (only relevant after `MIN_BATCH_DELAY` has passed). +/// This only serves as a failsafe in case the task does not get woken up through other means. +/// Lowering it too much would be wasteful. +pub const CREATE_BATCH_INTERVAL: Duration = Duration::from_millis(250); + +/// The maximum time to wait before timing out on a fetch. +/// TODO(kaimast): directy multiply by constant once the `const_trait_impl` feature is stable. +pub const MAX_FETCH_TIMEOUT: Duration = Duration::from_millis(3 * (MAX_BATCH_DELAY.as_millis() as u64)); + +/// The maximum time allowed for the leader to send their certificate. +/// After this time, the node will consider the leader as failed and try to advance the round without it. +pub const MAX_LEADER_CERTIFICATE_DELAY: Duration = Duration::from_millis(2 * (MAX_BATCH_DELAY.as_millis() as u64)); + +/// The maximum difference allowed between our local time and a certificate's timestamp, for the node to sign the certificate. +/// This prevents malicious actors from proposing certificates with timestamps that are too log or too far in the future) +/// w +pub const MAX_TIMESTAMP_DELTA: Duration = Duration::from_secs(10); + /// The maximum number of workers that can be spawned. pub const MAX_WORKERS: u8 = 1; // worker(s) /// The interval at which each primary broadcasts a ping to every other node. /// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly. -pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms +pub const PRIMARY_PING_INTERVAL: Duration = Duration::from_millis(2 * (MAX_BATCH_DELAY.as_millis() as u64)); + /// The interval at which each worker broadcasts a ping to every other node. -pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms +pub const WORKER_PING_INTERVAL: Duration = Duration::from_millis(4 * (MAX_BATCH_DELAY.as_millis() as u64)); /// A helper macro to spawn a blocking task. #[macro_export] diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index eaa428802f..880f5b30ec 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -13,15 +13,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod proposal_task; +pub use proposal_task::ProposalTask; + use crate::{ Gateway, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, + MAX_LEADER_CERTIFICATE_DELAY, MAX_WORKERS, - MIN_BATCH_DELAY_IN_SECS, - PRIMARY_PING_IN_MS, + MIN_BATCH_DELAY, + PRIMARY_PING_INTERVAL, Sync, Transport, - WORKER_PING_IN_MS, + WORKER_PING_INTERVAL, Worker, events::{BatchPropose, BatchSignature, Event}, helpers::{ @@ -72,34 +76,39 @@ use indexmap::{IndexMap, IndexSet}; #[cfg(feature = "locktick")] use locktick::{ parking_lot::{Mutex, RwLock}, - tokio::Mutex as TMutex, + tokio::RwLock as TRwLock, }; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; #[cfg(not(feature = "serial"))] use rayon::prelude::*; -#[cfg(feature = "metrics")] -use std::time::Instant; use std::{ collections::{HashMap, HashSet}, future::Future, net::SocketAddr, + pin::Pin, sync::{Arc, OnceLock}, - time::Duration, + time::Instant, }; #[cfg(not(feature = "locktick"))] -use tokio::sync::Mutex as TMutex; -use tokio::task::JoinHandle; +use tokio::sync::RwLock as TRwLock; +use tokio::{sync::Notify, task::JoinHandle}; -/// A helper type for an optional proposed batch. -pub type ProposedBatch = RwLock>>; +/// A helper type to keep track of the latest proposed batch. +pub(crate) type ProposedBatch = RwLock>>; /// This callback trait allows listening to changes in the Primary, such as round advancement. -/// This is currently used by BFT. +/// This is implemented by [`BFT`]. #[async_trait::async_trait] pub trait PrimaryCallback: Send + std::marker::Sync { - /// Notifies that a new round has started. - fn update_to_next_round(&self, current_round: u64) -> bool; + /// Asks the callback to if we can move to the next round. + /// + /// # Arguments + /// * `current_round` - the round the Primary is in (to avoid race conditions) + /// + /// # Returns + /// `true` if we moved to the next round. + fn try_advance_to_next_round(&self, current_round: u64) -> bool; /// Add a certificated that was created by the primary or received from a peer. async fn add_new_certificate(&self, certificate: BatchCertificate) -> Result<()>; @@ -119,24 +128,38 @@ pub struct Primary { ledger: Arc>, /// The workers. workers: Arc>>>, + /// The primary callback (used by [`BFT`]). primary_callback: Arc>>>, + /// The batch proposal, if the primary is currently proposing a batch. proposed_batch: Arc>, - /// The timestamp of the most recent proposed batch. - latest_proposed_batch_timestamp: Arc>, + /// The instant at which the current batch was proposed (used to measure certification latency). /// (used for higher precision in the metrics compared to the batch timestamp) #[cfg(feature = "metrics")] batch_propose_start: Arc>>, + + /// Holds the most recent round and timestamp that the primary proposed a batch for. + /// TODO(kaimast): avoiding using an async lock here, so this can be merged with the `proposed_batch`, + /// to have a unified `primary_state` field. + latest_proposal_timestamp: Arc>>, + /// The recently-signed batch proposals. signed_proposals: Arc>>, + /// The handles for all background tasks spawned by this primary. handles: Arc>>>, - /// The lock for propose_batch. It holds the most recent round that was proposed for. - propose_lock: Arc>, + /// The node configuration directory. node_data_dir: NodeDataDir, + + /// Manages proposal readiness state and drives the batch proposal loop. + proposal_task: ProposalTask, + + /// Used to wake up a the dedicated round-increment task, if we may be able to advance to the next round. + /// This is used, so the timeout for round advancement is reset on every round increment. + round_increment_notify: Arc, } impl Primary { @@ -176,16 +199,17 @@ impl Primary { gateway, storage, ledger, + node_data_dir, workers: Default::default(), primary_callback: Default::default(), proposed_batch: Default::default(), - latest_proposed_batch_timestamp: Default::default(), #[cfg(feature = "metrics")] batch_propose_start: Default::default(), + latest_proposal_timestamp: Default::default(), signed_proposals: Default::default(), handles: Default::default(), - propose_lock: Default::default(), - node_data_dir, + proposal_task: Default::default(), + round_increment_notify: Default::default(), }) } @@ -200,26 +224,9 @@ impl Primary { let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) = proposal_cache.into(); - // Verify that the proposal cache is not too far ahead of the ledger. - // If the cache round exceeds the ledger round by more than MAX_GC_ROUNDS, the ledger - // snapshot is too old to recover from the cached state. The operator must restore a - // more recent ledger snapshot before restarting the node. - let ledger_round = self.ledger.latest_round(); - let max_gc_rounds = BatchHeader::::MAX_GC_ROUNDS as u64; - if latest_certificate_round > ledger_round.saturating_add(max_gc_rounds) { - bail!( - "The proposal cache (round {latest_certificate_round}) is more than {max_gc_rounds} \ - rounds ahead of the ledger (round {ledger_round}). \ - Please restore a more recent ledger snapshot before restarting the node." - ); - } - - // Write the proposed batch. + *self.latest_proposal_timestamp.write().await = Some((latest_certificate_round, now())); *self.proposed_batch.write() = proposed_batch; - // Write the signed proposals. *self.signed_proposals.write() = signed_proposals; - // Writ the propose lock. - *self.propose_lock.lock().await = latest_certificate_round; // Update the storage with the pending certificates. for certificate in pending_certificates { @@ -340,11 +347,6 @@ impl Primary { pub fn workers(&self) -> &[Worker] { self.workers.get().expect("Primary is not running yet") } - - /// Returns the batch proposal of our primary, if one currently exists. - pub fn proposed_batch(&self) -> &Arc> { - &self.proposed_batch - } } impl Primary { @@ -398,7 +400,16 @@ impl Primary { } } -impl Primary { +#[async_trait::async_trait] +impl proposal_task::BatchPropose for Primary { + fn current_round(&self) -> u64 { + Primary::current_round(self) + } + + fn wait_for_synced_if_syncing(&self) -> Option> { + self.sync.wait_for_synced_if_syncing() + } + /// Proposes the batch for the current round. /// /// This method performs the following steps: @@ -406,9 +417,17 @@ impl Primary { /// 2. Sign the batch. /// 3. Set the batch proposal in the primary. /// 4. Broadcast the batch header to all validators for signing. - pub async fn propose_batch(&self) -> Result<()> { - // This function isn't re-entrant. - let mut lock_guard = self.propose_lock.lock().await; + /// + /// # Returns + /// - `Ok(true)` if the batch was proposed. + /// - `Ok(false)` if the batch was not proposed for a benign reason, e.g., the timestamp is too soon after the previous certificate. + /// - `Err(err)` if an unexpected error occured. + async fn propose_batch(&self) -> Result { + // Ensure there are not concurrent executions of this function. + // + // Note, in the current design, this function is only invoked from the batch proposal task, and it is technically + // not possible for there to be concurrent invocations of the function, but we keep this lock for now. + let mut lock_guard = self.latest_proposal_timestamp.write().await; // Check if the proposed batch has expired, and clear it if it has expired. if let Err(err) = self @@ -416,7 +435,7 @@ impl Primary { .with_context(|| "Failed to check the proposed batch for expiration") { warn!("{}", flatten_error(&err)); - return Ok(()); + return Ok(false); } // Retrieve the current round. @@ -430,14 +449,16 @@ impl Primary { ensure!(round > 0, "Round 0 cannot have transaction batches"); // If the current storage round is below the latest proposal round, then return early. - if round < *lock_guard { - warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard); - return Ok(()); + if let Some((latest_round, _)) = &*lock_guard + && round < *latest_round + { + warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}"); + return Ok(false); } // If there is a batch being proposed already, // rebroadcast the batch header to the non-signers, and return early. - if let Some(proposal) = self.proposed_batch.read().as_ref() { + if let Some(proposal) = &*self.proposed_batch.read() { // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this. if round < proposal.round() || proposal @@ -450,7 +471,7 @@ impl Primary { "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.", proposal.round(), ); - return Ok(()); + return Ok(false); } // Construct the event. // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. @@ -474,34 +495,30 @@ impl Primary { } } debug!("Proposed batch for round {} is still valid", proposal.round()); - return Ok(()); + return Ok(false); } #[cfg(feature = "metrics")] metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64); // Ensure that the primary does not create a new proposal too quickly. - if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) { - debug!( - "{}", - flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}"))) - ); - return Ok(()); + if let Some((_, latest_timestamp)) = &*lock_guard + && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())? + { + return Ok(false); } // Ensure the primary has not proposed a batch for this round before. if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) { // If a BFT sender was provided, attempt to advance the current round. if let Some(cb) = &*self.primary_callback.get_ref() { - match cb.update_to_next_round(self.current_round()) { - // 'is_ready' is true if the primary is ready to propose a batch for the next round. + match cb.try_advance_to_next_round(self.current_round()) { true => (), // continue, - // 'is_ready' is false if the primary is not ready to propose a batch for the next round. - false => return Ok(()), + false => return Ok(false), } } debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed()); - return Ok(()); + return Ok(false); } // Determine if the current round has been proposed. @@ -509,9 +526,11 @@ impl Primary { // good for network reliability and should not be prevented for the already existing proposed_batch. // If a certificate already exists for the current round, an attempt should be made to advance the // round as early as possible. - if round == *lock_guard { + if let Some((latest_round, _)) = &*lock_guard + && *latest_round == round + { debug!("Primary is safely skipping a batch proposal - round {round} already proposed"); - return Ok(()); + return Ok(false); } // Retrieve the committee to check against. @@ -529,7 +548,7 @@ impl Primary { "(please connect to more validators)".dimmed() ); trace!("Primary is connected to {} validators", connected_validators.len() - 1); - return Ok(()); + return Ok(false); } } @@ -558,7 +577,7 @@ impl Primary { "Primary is safely skipping a batch proposal for round {round} {}", format!("(previous round {previous_round} has not reached quorum)").dimmed() ); - return Ok(()); + return Ok(false); } // Initialize the map of transmissions. @@ -700,11 +719,11 @@ impl Primary { // Determine the current timestamp. let current_timestamp = now(); - *lock_guard = round; - /* Proceeding to sign & propose the batch. */ info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len()); + // Update the latest proposed round and timestamp. + *lock_guard = Some((round, current_timestamp)); // Retrieve the private key. let private_key = *self.gateway.account().private_key(); // Retrieve the committee ID. @@ -733,20 +752,22 @@ impl Primary { error!("{}", flatten_error(err.context("Failed to reinsert transmissions"))); } })?; + // Broadcast the batch to all validators for signing. self.gateway.broadcast(Event::BatchPropose(batch_header.into())); - // Set the timestamp of the latest proposed batch. - *self.latest_proposed_batch_timestamp.write() = proposal.timestamp(); + // Store the proposal. + *self.proposed_batch.write() = Some(proposal); // Record the wall-clock time at which the batch was proposed. #[cfg(feature = "metrics")] { *self.batch_propose_start.lock() = Some(Instant::now()); } - // Set the proposed batch. - *self.proposed_batch.write() = Some(proposal); - Ok(()) + + Ok(true) } +} +impl Primary { /// Processes a batch propose from a peer. /// /// This method performs the following steps: @@ -856,7 +877,7 @@ impl Primary { // Compute the previous round. let previous_round = batch_round.saturating_sub(1); // Ensure that the peer did not propose a batch too quickly. - if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { + if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) { // Proceed to disconnect the validator. self.gateway.disconnect(peer_ip); return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'"))); @@ -1229,9 +1250,9 @@ impl Primary { // Determine if we are currently proposing a round that is relevant. // Note: This is important, because while our peers have advanced, // they may not be proposing yet, and thus still able to sign our proposed batch. - let should_advance = match &*self.proposed_batch.read() { + let should_advance = match &*self.latest_proposal_timestamp.read().await { // We advance if the proposal round is less than the current round that was just certified. - Some(proposal) => proposal.round() < certificate_round, + Some((latest_round, _)) => *latest_round < certificate_round, // If there's no proposal, we consider advancing. None => true, }; @@ -1242,7 +1263,7 @@ impl Primary { // Determine whether to advance to the next round. if is_quorum && should_advance && certificate_round >= current_round { // If we have reached the quorum threshold and the round should advance, then proceed to the next round. - self.try_increment_to_the_next_round(current_round + 1).await?; + self.round_increment_notify.notify_one(); } Ok(()) } @@ -1253,8 +1274,8 @@ impl Primary { /// /// For each receiver in the `primary_receiver` struct, there will be a dedicated task /// that awaits new data and handles it accordingly. - /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically - /// tries to move the the next round of batches. + /// Additionally, this spawns a task that periodically issues PrimaryPings and one that + /// tries to move to the next round when triggered (e.g. after a certificate is stored) or on a timeout. /// /// This function is called exactly once, in `Self::run()`. fn start_handlers(&self, primary_receiver: PrimaryReceiver) { @@ -1272,7 +1293,7 @@ impl Primary { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await; + tokio::time::sleep(PRIMARY_PING_INTERVAL).await; // Retrieve the block locators. let self__ = self_.clone(); @@ -1360,7 +1381,7 @@ impl Primary { let self_ = self.clone(); self.spawn(async move { loop { - tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await; + tokio::time::sleep(WORKER_PING_INTERVAL).await; // If the primary is not synced, then do not broadcast the worker ping(s). if !self_.sync.is_synced() { trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed()); @@ -1373,35 +1394,10 @@ impl Primary { } }); - // Start the batch proposer. + // Start the batch proposal task. + let proposal_task = self.proposal_task.clone(); let self_ = self.clone(); - self.spawn(async move { - loop { - // Sleep briefly, but longer than if there were no batch. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; - let current_round = self_.current_round(); - // If the primary is not synced, then do not propose a batch. - if !self_.sync.is_synced() { - debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed()); - continue; - } - // A best-effort attempt to skip the scheduled batch proposal if - // round progression already triggered one. - if self_.propose_lock.try_lock().is_err() { - trace!( - "Skipping batch proposal for round {current_round} {}", - "(node is already proposing)".dimmed() - ); - continue; - }; - // If there is no proposed batch, attempt to propose a batch. - // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path, - // and only one batch needs to be proposed at a time. - if let Err(e) = self_.propose_batch().await { - warn!("Cannot propose a batch - {e}"); - } - } - }); + self.spawn(async move { proposal_task.run(self_).await }); // Start the proposed batch handler. let self_ = self.clone(); @@ -1412,6 +1408,7 @@ impl Primary { trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed()); continue; } + // Spawn a task to process the proposed batch. let self_ = self_.clone(); tokio::spawn(async move { @@ -1479,44 +1476,58 @@ impl Primary { } }); - // This task periodically tries to move to the next round. - // - // Note: This is necessary to ensure that the primary is not stuck on a previous round - // despite having received enough certificates to advance to the next round. + // This task tries to move to the next round when triggered (e.g. after a certificate is stored) + // or after a timeout, so we are not stuck on a previous round despite having quorum. let self_ = self.clone(); self.spawn(async move { loop { - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; - // If the primary is not synced, then do not increment to the next round. - if !self_.sync.is_synced() { - trace!("Skipping round increment {}", "(node is syncing)".dimmed()); - continue; - } - // Attempt to increment to the next round. + let round_start = Instant::now(); let current_round = self_.current_round(); - let next_round = current_round.saturating_add(1); - // Determine if the quorum threshold is reached for the current round. - let is_quorum_threshold_reached = { - // Retrieve the certificate authors for the current round. - let authors = self_.storage.get_certificate_authors_for_round(current_round); - // If there are no certificates, then skip this check. - if authors.is_empty() { - continue; + + // Inner loop: wait and try to increment while we're still in the same round. + while self_.current_round() == current_round { + let mut futures: Vec + Send>>> = + vec![Box::pin(self_.round_increment_notify.notified())]; + + if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed()) + && !remaining_delay.is_zero() + { + futures.push(Box::pin(tokio::time::sleep(remaining_delay))); + } + // Always ensure a wakeup no later than MAX_LEADER_CERTIFICATE_DELAY so that + // try_advance_to_next_round is called after the leader-certificate timer + // expires, even when no further certificates arrive (e.g. an even round where + // the elected leader was absent and quorum was reached without their cert). + futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY))); + if !self_.sync.is_synced() { + futures.push(Box::pin(self_.sync.wait_for_synced())); } - // Retrieve the committee lookback for the current round. - let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else { - warn!("Failed to retrieve the committee lookback for round {current_round}"); + let _ = futures::future::select_all(futures).await; + + if !self_.sync.is_synced() { + trace!("Skipping round increment {}", "(node is syncing)".dimmed()); continue; + } + + let next_round = current_round.saturating_add(1); + let is_quorum_threshold_reached = { + let authors = self_.storage.get_certificate_authors_for_round(current_round); + if authors.is_empty() { + continue; + } + let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) + else { + warn!("Failed to retrieve the committee lookback for round {current_round}"); + continue; + }; + committee_lookback.is_quorum_threshold_reached(&authors) }; - // Check if the quorum threshold is reached for the current round. - committee_lookback.is_quorum_threshold_reached(&authors) - }; - // Attempt to increment to the next round if the quorum threshold is reached. - if is_quorum_threshold_reached { - debug!("Quorum threshold reached for round {current_round}"); - if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { - warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); + + if is_quorum_threshold_reached { + debug!("Quorum threshold reached for round {current_round}"); + if let Err(err) = self_.try_increment_to_the_next_round(next_round).await { + warn!("{}", flatten_error(err.context("Failed to increment to the next round"))); + } } } } @@ -1615,7 +1626,7 @@ impl Primary { if current_round < next_round { // If a BFT sender was provided, send the current round to the BFT. let is_ready = if let Some(cb) = self.primary_callback.get() { - cb.update_to_next_round(current_round) + cb.try_advance_to_next_round(current_round) } // Otherwise, handle the Narwhal case. else { @@ -1625,15 +1636,12 @@ impl Primary { true }; - // Log whether the next round is ready. - match is_ready { - true => debug!("Primary is ready to propose the next round"), - false => debug!("Primary is not ready to propose the next round"), - } - - // If the node is ready, propose a batch for the next round. - if is_ready { - self.propose_batch().await?; + // Notify the proposal task if the new round is ready. + if is_ready && self.is_synced() { + debug!("Primary is ready to propose the next round"); + self.proposal_task.signal(); + } else { + debug!("Primary is not ready to propose the next round"); } } Ok(()) @@ -1666,37 +1674,62 @@ impl Primary { /// Ensure the primary is not creating batch proposals too frequently. /// This checks that the certificate timestamp for the previous round is within the expected range. - fn check_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { + fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address, timestamp: i64) -> Result<()> { + ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself"); + // Retrieve the timestamp of the previous timestamp to check against. let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) { - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago. Some(certificate) => certificate.timestamp(), - None => match self.gateway.account().address() == author { - // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. - true => *self.latest_proposed_batch_timestamp.read(), - // If we do not see a previous certificate for the author, then proceed optimistically. - false => return Ok(()), - }, + // If we do not see a previous certificate for the author, then proceed optimistically. + None => return Ok(()), }; // Determine the elapsed time since the previous timestamp. let elapsed = timestamp .checked_sub(previous_timestamp) .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; - // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago. - match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 { + // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago. + match elapsed < MIN_BATCH_DELAY.as_secs() as i64 { true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"), false => Ok(()), } } + /// Ensure the primary is not creating batch proposals too frequently. + /// This checks that the certificate timestamp for the previous round is within the expected range. + /// + /// # Returns + /// - `Ok(true)` if the timestamp allows a new proposal. + /// - `Ok(false)` if the timestamp is valid but too soon after the previous proposal. + /// - `Err(err)` if an unexpected error occured, such as the timestamp being before the previous certificate. + fn check_own_proposal_timestamp( + &self, + previous_round: u64, + previous_timestamp: i64, + timestamp: i64, + ) -> Result { + // Determine the elapsed time since the previous timestamp. + let elapsed = timestamp + .checked_sub(previous_timestamp) + .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?; + + Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64) + } + /// Stores the certified batch and broadcasts it to all validators, returning the certificate. async fn store_and_broadcast_certificate(&self, proposal: &Proposal, committee: &Committee) -> Result<()> { // Create the batch certificate and transmissions. let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?; + // Convert the transmissions into a HashMap. // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety. let transmissions = transmissions.into_iter().collect::>(); + + // Store some metadata about the certified batch. + let round = certificate.round(); + let num_transmissions = certificate.transmission_ids().len(); + // Store the certified batch. let (storage, certificate_) = (self.storage.clone(), certificate.clone()); spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?; @@ -1704,23 +1737,26 @@ impl Primary { // If a BFT sender was provided, send the certificate to the BFT. if let Some(cb) = self.primary_callback.get() { // Await the callback to continue. - cb.add_new_certificate(certificate.clone()) - .await - .with_context(|| "Failed to add new certificate from primary")?; + cb.add_new_certificate(certificate.clone()).await.with_context(|| { + format!("Failed to insert our newly certified batch for round {round} into the DAG") + })?; } // Broadcast the certified batch to all validators. - self.gateway.broadcast(Event::BatchCertified(certificate.clone().into())); + self.gateway.broadcast(Event::BatchCertified(certificate.into())); + // Log the certified batch. - let num_transmissions = certificate.transmission_ids().len(); - let round = certificate.round(); info!("Our batch with {num_transmissions} transmissions for round {round} was certified!"); + // Record the certification latency (time from batch proposal to certification). #[cfg(feature = "metrics")] if let Some(start) = self.batch_propose_start.lock().take() { metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64()); } - // Increment to the next round. - self.try_increment_to_the_next_round(round + 1).await + + // Wake up the round increment task to re-check quorum. + self.round_increment_notify.notify_one(); + + Ok(()) } /// Inserts the missing transmissions from the proposal into the workers. @@ -1797,6 +1833,8 @@ impl Primary { if let Some(cb) = self.primary_callback.get() { cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?; } + // Wake the round-increment task to re-check quorum. + self.round_increment_notify.notify_one(); } Ok(()) } @@ -1840,7 +1878,9 @@ impl Primary { // If our primary is far behind the peer, update our committee to the batch round. if is_behind_schedule || is_peer_far_in_future { // If the batch round is greater than the current committee round, update the committee. - self.try_increment_to_the_next_round(batch_round).await?; + self.try_increment_to_the_next_round(batch_round) + .await + .with_context(|| "Failed to fast forward current round")?; } // Ensure the primary has all of the transmissions. @@ -2023,7 +2063,10 @@ impl Primary { let proposal_cache = { let proposal = self.proposed_batch.write().take(); let signed_proposals = self.signed_proposals.read().clone(); - let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await); + let latest_round = proposal + .as_ref() + .map(Proposal::round) + .unwrap_or(self.latest_proposal_timestamp.read().await.map(|(round, _)| round).unwrap_or(0)); let pending_certificates = self.storage.get_pending_certificates(); ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates) }; @@ -2037,7 +2080,8 @@ impl Primary { #[cfg(test)] mod tests { - use super::*; + use super::{proposal_task::BatchPropose as _, *}; + use snarkos_node_bft_ledger_service::MockLedgerService; use snarkos_node_bft_storage_service::BFTMemoryService; use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators}; @@ -2356,7 +2400,7 @@ mod tests { store_certificate_chain(&primary, &accounts, round, &mut rng); // Sleep for a while to ensure the primary is ready to propose the next round. - tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + tokio::time::sleep(MIN_BATCH_DELAY).await; // Generate a solution and a transaction. let (solution_id, solution) = sample_unconfirmed_solution(&mut rng); @@ -2423,7 +2467,7 @@ mod tests { } // Sleep for a while to ensure the primary is ready to propose the next round. - tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await; + tokio::time::sleep(MIN_BATCH_DELAY).await; // Advance to the next round. assert!(primary.storage.increment_to_next_round(round).is_ok()); @@ -2488,7 +2532,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2527,7 +2571,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2567,7 +2611,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2604,7 +2648,7 @@ mod tests { let round = 1; let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2650,7 +2694,7 @@ mod tests { // Create a valid proposal with an author that isn't the primary. let peer_account = &accounts[1]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( &peer_account.1, primary.ledger.current_committee().unwrap(), @@ -2706,7 +2750,7 @@ mod tests { .get_certificate_for_round_with_author(round - 1, peer_account.1.address()) .expect("No previous proposal exists") .timestamp(); - let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1; + let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1; let proposal = create_test_proposal( &peer_account.1, @@ -2760,7 +2804,7 @@ mod tests { let peer_account = &accounts[2]; let peer_ip = peer_account.0; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng); @@ -2817,15 +2861,21 @@ mod tests { primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap(); // Set the proposal lock to a round ahead of the storage. - let old_proposal_lock_round = *primary.propose_lock.lock().await; - *primary.propose_lock.lock().await = round + 1; + let (old_proposal_round, old_proposal_timestamp) = primary + .latest_proposal_timestamp + .read() + .await + .map(|(round, timestamp)| (round, timestamp)) + .unwrap_or((0, 0)); + *primary.latest_proposal_timestamp.write().await = + Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64)); // Propose a batch and enforce that it fails. assert!(primary.propose_batch().await.is_ok()); assert!(primary.proposed_batch.read().is_none()); // Set the proposal lock back to the old round. - *primary.propose_lock.lock().await = old_proposal_lock_round; + *primary.latest_proposal_timestamp.write().await = Some((old_proposal_round, old_proposal_timestamp)); // Try to propose a batch again. This time, it should succeed. assert!(primary.propose_batch().await.is_ok()); @@ -2870,7 +2920,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2894,6 +2944,8 @@ mod tests { // Check the certificate was created and stored by the primary. assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); + // Manually attempt round advancement (because the handler is not running). + primary.try_increment_to_the_next_round(round + 1).await.unwrap(); // Check the round was incremented. assert_eq!(primary.current_round(), round + 1); } @@ -2933,6 +2985,8 @@ mod tests { // Check the certificate was created and stored by the primary. assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address())); + // Manually attempt round advancement (because the handler is not running). + primary.try_increment_to_the_next_round(round + 1).await.unwrap(); // Check the round was incremented. assert_eq!(primary.current_round(), round + 1); } @@ -2945,7 +2999,7 @@ mod tests { // Create a valid proposal. let round = 1; - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), @@ -2983,7 +3037,7 @@ mod tests { let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng); // Create a valid proposal. - let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64; + let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64; let proposal = create_test_proposal( primary.gateway.account(), primary.ledger.current_committee().unwrap(), diff --git a/node/bft/src/primary/proposal_task.rs b/node/bft/src/primary/proposal_task.rs new file mode 100644 index 0000000000..bc15204345 --- /dev/null +++ b/node/bft/src/primary/proposal_task.rs @@ -0,0 +1,515 @@ +// Copyright (c) 2019-2026 Provable Inc. +// This file is part of the snarkOS library. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{CREATE_BATCH_INTERVAL, MAX_BATCH_DELAY, MIN_BATCH_DELAY}; + +use anyhow::Result; +use colored::Colorize; +use futures::future::BoxFuture; +use snarkvm::{prelude::Network, utilities::flatten_error}; +use std::{marker::PhantomData, sync::Arc}; +use tokio::{ + sync::watch, + time::{Instant, sleep, sleep_until}, +}; +use tracing::{debug, warn}; + +/// Abstracts over batch-proposal operations, allowing the proposal loop to be tested without a +/// real primary. +#[async_trait::async_trait] +pub(super) trait BatchPropose: Send + Sync { + /// Returns the current consensus round. + fn current_round(&self) -> u64; + + /// Returns `None` if the node is already synced; otherwise returns a future that resolves + /// once sync completes. + fn wait_for_synced_if_syncing(&self) -> Option>; + + /// Attempts to propose a batch. + /// + /// Returns `Ok(true)` when a batch was successfully proposed, `Ok(false)` to retry, and + /// `Err` on an unexpected error. + async fn propose_batch(&self) -> Result; +} + +/// Manages batch proposal readiness and drives the batch proposal loop. +/// +/// Holds the readiness state and the logic for the proposal task. The actual task is started by +/// calling [`Self::run`] inside a spawned future (see [`Primary::start_handlers`]). +pub struct ProposalTask { + inner: Arc, + _phantom: PhantomData, +} + +/// Manual `Clone` impl so that `N: Clone` is not required. +impl Clone for ProposalTask { + fn clone(&self) -> Self { + Self { inner: Arc::clone(&self.inner), _phantom: PhantomData } + } +} + +/// The inner state of a [`ProposalTask`], shared via `Arc`. +struct ProposalTaskInner { + /// Tracks whether the primary is ready to propose a new batch. + /// + /// Initialized to `true` so round 1 can be proposed immediately without an explicit signal. + /// Set to `true` by [`ProposalTask::signal`] when a new round starts, + /// and reset to `false` after a batch is successfully proposed. + ready: watch::Sender, +} + +impl Default for ProposalTask { + fn default() -> Self { + let (ready, _) = watch::channel(true); + Self { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData } + } +} + +impl ProposalTask { + /// Signals that the primary is ready to propose a new batch for the current round. + /// + /// Should be called from [`Primary::try_increment_to_the_next_round`] whenever the primary + /// successfully advances to a new round. + pub fn signal(&self) { + self.inner.ready.send_replace(true); + } + + /// Runs the batch proposal loop. This is intended to be spawned as a long-running task. + /// + /// Each iteration covers one full round (wait → propose → wait for signatures). + /// The three stages are implemented as separate methods; see their doc-comments for details. + pub(super) async fn run(self, primary: P) { + let mut ready_rx = self.inner.ready.subscribe(); + + loop { + let round = primary.current_round(); + // TODO(kaimast): the round_start time should be based on the timestamp of the + // previous batch, not the current wall-clock time. + let round_start = Instant::now(); + + if !Self::wait_until_proposal_ready(&primary, &mut ready_rx, round, round_start).await { + continue; // round changed; restart + } + + if !Self::propose(&primary, round).await { + continue; // round changed; restart + } + + // Reset readiness so the next round waits for an explicit signal. + self.inner.ready.send_replace(false); + + Self::wait_for_signatures(&primary, &mut ready_rx, round).await; + } + } + + /// Stage 1: Wait until conditions are met to propose a batch. + /// + /// Blocks until sync is complete, MIN_BATCH_DELAY has elapsed since `round_start`, and either + /// `signal()` fires (leader cert arrived) or MAX_BATCH_DELAY expires without one. + /// + /// Returns `true` if ready to propose, `false` if the round changed (caller should restart). + async fn wait_until_proposal_ready( + primary: &P, + ready_rx: &mut watch::Receiver, + round: u64, + round_start: Instant, + ) -> bool { + loop { + if primary.current_round() != round { + return false; + } + + // A node cannot propose while it is syncing. + if let Some(fut) = primary.wait_for_synced_if_syncing() { + fut.await; + // Re-check round after sync completes. + continue; + } + + // Enforce the minimum inter-proposal delay. + // This is a no-op once the deadline has already passed. + sleep_until(round_start + MIN_BATCH_DELAY).await; + + // Wait for a readiness signal, the MAX_BATCH_DELAY deadline, or a short heartbeat + // that lets the round-change check at the top of the loop fire regularly. + tokio::select! { + _ = sleep_until(round_start + MAX_BATCH_DELAY) => { + debug!("Did not receive leader certificate within MAX_BATCH_DELAY"); + return true; + }, + _ = Self::wait_until_ready(ready_rx) => { + return true; + }, + _ = sleep(CREATE_BATCH_INTERVAL) => { + debug!("Skipping batch proposal for round {round} {}", "(not ready yet)".dimmed()); + } + }; + } + } + + /// Stage 2: Propose a batch. + /// + /// Calls `propose_batch()` with CREATE_BATCH_INTERVAL retries until it returns `Ok(true)` + /// (batch submitted to the network). + /// + /// Returns `true` if the batch was submitted, `false` if the round changed (caller should restart). + async fn propose(primary: &P, round: u64) -> bool { + let mut attempt = 1u32; + loop { + if primary.current_round() != round { + return false; + } + + if attempt > 1 { + sleep(CREATE_BATCH_INTERVAL).await; + debug!("Retrying batch proposal for round {round} (attempt #{attempt})"); + } + + // Note: Do NOT spawn a task around this function call. Proposing a batch is a + // critical path, and only one batch needs to be proposed at a time. + match primary.propose_batch().await { + Ok(true) => return true, // batch submitted; proceed to Stage 3 + Ok(false) => {} // not ready yet; retry + Err(err) => { + warn!("{}", flatten_error(err.context("Cannot propose a batch"))); + } + } + + attempt += 1; + } + } + + /// Stage 3: Wait for the proposed batch to collect enough signatures. + /// + /// Periodically rebroadcasts the batch to non-signers (via `propose_batch`) at most once per + /// MAX_BATCH_DELAY until the round advances. Returns when the round changes. + async fn wait_for_signatures(primary: &P, ready_rx: &mut watch::Receiver, round: u64) { + loop { + if primary.current_round() != round { + return; + } + + // Wait for the rebroadcast interval or an explicit round-advance signal, + // whichever comes first. + tokio::select! { + _ = Self::wait_until_ready(ready_rx) => return, // round advanced + _ = sleep(MAX_BATCH_DELAY) => {} + } + + if primary.current_round() != round { + return; + } + + // Rebroadcast to non-signers (`propose_batch` handles this internally). + match primary.propose_batch().await { + Ok(_) => {} + Err(err) => { + warn!("{}", flatten_error(err.context("Cannot rebroadcast a batch"))); + } + } + } + } + + /// Waits until the readiness watch channel holds `true`. Returns immediately if it already does. + /// + /// Spurious wakeups (e.g. from a reset to `false`) are handled by re-checking the value in a loop. + async fn wait_until_ready(receiver: &mut watch::Receiver) { + loop { + // Fetch the `is_ready` value and return if it is true. + if *receiver.borrow_and_update() { + return; + } + + // Block until the `is_value` changed, or the channel is closed. + if receiver.changed().await.is_err() { + return; + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use snarkvm::prelude::MainnetV0; + use std::{ + sync::{ + Arc, + atomic::{AtomicU32, Ordering}, + }, + time::Duration, + }; + use tokio::sync::Notify; + + /// A minimal [`BatchPropose`] implementation for testing. + /// + /// Always reports round 1 and synced. Records how many times [`propose_batch`] is called and + /// fires a [`Notify`] on each call. + struct DummyProposer { + propose_count: Arc, + proposed_notify: Arc, + } + + #[async_trait::async_trait] + impl BatchPropose for DummyProposer { + fn current_round(&self) -> u64 { + 1 + } + + fn wait_for_synced_if_syncing(&self) -> Option> { + None + } + + async fn propose_batch(&self) -> Result { + self.propose_count.fetch_add(1, Ordering::SeqCst); + self.proposed_notify.notify_one(); + + Ok(true) + } + } + + /// A [`BatchPropose`] implementation that returns round 1 on the very first call to + /// `current_round`, then round 2 for all subsequent calls. + /// + /// This simulates the round advancing between the outer-loop capture and the inner-loop + /// condition check, without any real-time waiting or time mocking. + struct RoundAdvancingProposer { + current_round_calls: Arc, + propose_count: Arc, + } + + #[async_trait::async_trait] + impl BatchPropose for RoundAdvancingProposer { + fn current_round(&self) -> u64 { + let n = self.current_round_calls.fetch_add(1, Ordering::SeqCst); + if n == 0 { 1 } else { 2 } + } + + fn wait_for_synced_if_syncing(&self) -> Option> { + None + } + + async fn propose_batch(&self) -> Result { + self.propose_count.fetch_add(1, Ordering::SeqCst); + Ok(true) + } + } + + /// A [`BatchPropose`] implementation that returns `Ok(false)` a fixed number of times before + /// succeeding. + struct RetryProposer { + retries_before_success: u32, + propose_count: Arc, + proposed_notify: Arc, + } + + #[async_trait::async_trait] + impl BatchPropose for RetryProposer { + fn current_round(&self) -> u64 { + 1 + } + + fn wait_for_synced_if_syncing(&self) -> Option> { + None + } + + async fn propose_batch(&self) -> Result { + let count = self.propose_count.fetch_add(1, Ordering::SeqCst) + 1; + if count <= self.retries_before_success { + Ok(false) + } else { + self.proposed_notify.notify_one(); + Ok(true) + } + } + } + + /// Signals the proposal task and verifies that `propose_batch` is called on the dummy. + #[tokio::test] + async fn test_proposal_task_calls_propose_batch_on_signal() { + // Start with the task not ready so the initial signal is the trigger. + let (ready, _) = watch::channel(false); + let task = ProposalTask:: { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }; + + let proposed_notify = Arc::new(Notify::new()); + let propose_count = Arc::new(AtomicU32::new(0)); + + let proposer = DummyProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() }; + + let task_for_spawn = task.clone(); + tokio::spawn(task_for_spawn.run(proposer)); + + // Before signalling, propose_batch should not have been called. + sleep(Duration::from_millis(50)).await; + assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called before signal"); + + // Signal readiness — the proposal loop should wake up and call propose_batch. + task.signal(); + + tokio::time::timeout(std::time::Duration::from_secs(5), proposed_notify.notified()) + .await + .expect("propose_batch was not called within 5 seconds after signal"); + + assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch was not called"); + } + + /// When the round advances between iterations, `propose_batch` is not called for the old round. + /// + /// `RoundAdvancingProposer` returns round 1 on the first `current_round()` call (outer-loop + /// capture) and round 2 on every subsequent call. The inner-loop condition therefore fails + /// immediately — no time mocking needed. + #[tokio::test] + async fn test_proposal_task_exits_on_round_advancement() { + let propose_count = Arc::new(AtomicU32::new(0)); + let proposer = RoundAdvancingProposer { + current_round_calls: Arc::new(AtomicU32::new(0)), + propose_count: propose_count.clone(), + }; + + // Start not-ready so the task parks in round 2's inner loop without proposing round 1. + let (ready, _) = watch::channel(false); + let task = ProposalTask:: { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }; + + tokio::spawn(task.run(proposer)); + + // Yield once: the task runs through round 1 (inner loop exits immediately because + // current_round() already returns 2) and then parks in round 2's inner loop. + tokio::task::yield_now().await; + + assert_eq!(propose_count.load(Ordering::SeqCst), 0, "propose_batch called despite round advancement"); + } + + /// Tests the following scenario + /// + /// 1. A batch was already certified for the current round, so readiness is `false`. + /// 2. `signal()` is **never** called externally — the BFT cannot advance the round until + /// `propose_batch()` is called (which internally checks the leader-certificate timer). + #[test_log::test(tokio::test)] + async fn test_proposal_task_advances_without_leader_cert() { + // Start NOT ready: simulates a batch that was already certified for the round but the + // round has not yet advanced (the even-round leader cert was missing — e.g. the elected + // leader was one of the freshly-reset minority validators). + let (ready, _) = watch::channel(false); + let task = ProposalTask:: { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }; + + let proposed_notify = Arc::new(Notify::new()); + let propose_count = Arc::new(AtomicU32::new(0)); + + // A proposer that stays on round 1 and returns Ok(true) on every call to + // propose_batch(), simulating try_advance_to_next_round finding the leader-certificate + // timer expired and advancing the round without an external signal(). + struct NoSignalProposer { + propose_count: Arc, + proposed_notify: Arc, + } + + #[async_trait::async_trait] + impl BatchPropose for NoSignalProposer { + fn current_round(&self) -> u64 { + 1 + } + + fn wait_for_synced_if_syncing(&self) -> Option> { + None + } + + async fn propose_batch(&self) -> Result { + self.propose_count.fetch_add(1, Ordering::SeqCst); + self.proposed_notify.notify_one(); + Ok(true) + } + } + + let proposer = + NoSignalProposer { propose_count: propose_count.clone(), proposed_notify: proposed_notify.clone() }; + + // signal() is intentionally never called — the task must retry on its own. + tokio::spawn(task.run(proposer)); + + // Allow enough time for MAX_BATCH_DELAY (2.5 s) to elapse plus the CREATE_BATCH_INTERVAL + // (250 ms) retry window. Use 10 s to give generous headroom on slow CI machines. + tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified()) + .await + .expect("propose_batch was not called"); + + assert!(propose_count.load(Ordering::SeqCst) >= 1, "propose_batch should have been called at least once"); + } + + /// After the leader-certificate timer fires (MAX_BATCH_DELAY elapses without an explicit + /// `signal()`), the task should still retry `propose_batch` when it returns `Ok(false)` and + /// eventually succeed once it returns `Ok(true)`. + /// + /// This models the real primary: when a round is already certified but the round has not yet + /// advanced (e.g. the elected leader was a freshly-reset minority validator), `propose_batch` + /// returns `Ok(false)` until `try_advance_to_next_round` can make progress. + #[test_log::test(tokio::test)] + async fn test_proposal_task_retries_after_leader_timeout() { + const RETRIES: u32 = 2; + + // Start NOT ready — no external signal will be sent. The task must wait for + // MAX_BATCH_DELAY to fire, then retry until propose_batch succeeds. + let (ready, _) = watch::channel(false); + let task = ProposalTask:: { inner: Arc::new(ProposalTaskInner { ready }), _phantom: PhantomData }; + + let proposed_notify = Arc::new(Notify::new()); + let propose_count = Arc::new(AtomicU32::new(0)); + let proposer = RetryProposer { + retries_before_success: RETRIES, + propose_count: propose_count.clone(), + proposed_notify: proposed_notify.clone(), + }; + + // signal() is intentionally never called — the leader timeout arm must trigger. + tokio::spawn(task.run(proposer)); + + // Allow enough time for MAX_BATCH_DELAY (2.5 s) plus RETRIES × CREATE_BATCH_INTERVAL (250 ms each). + // Use 10 s to give generous headroom on slow CI machines. + tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified()) + .await + .expect("propose_batch did not succeed within 10 seconds after leader timeout"); + + // Stage 3 may make additional rebroadcast calls after success, so use >. + assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1); + } + + /// When `propose_batch` returns `Ok(false)`, the task retries within the same round until + /// it succeeds. + #[tokio::test] + async fn test_proposal_task_retries_on_false() { + const RETRIES: u32 = 2; + + // Default starts ready, so no signal needed. + let task = ProposalTask::::default(); + + let proposed_notify = Arc::new(Notify::new()); + let propose_count = Arc::new(AtomicU32::new(0)); + let proposer = RetryProposer { + retries_before_success: RETRIES, + propose_count: propose_count.clone(), + proposed_notify: proposed_notify.clone(), + }; + + tokio::spawn(task.run(proposer)); + + // The task internally waits MIN_BATCH_DELAY before the first attempt; allow up to 10s. + tokio::time::timeout(std::time::Duration::from_secs(10), proposed_notify.notified()) + .await + .expect("propose_batch did not succeed within 10 seconds"); + + // Stage 3 may make additional rebroadcast calls after success, so use >. + assert!(propose_count.load(Ordering::SeqCst) > RETRIES, "expected at least {} total attempts", RETRIES + 1); + } +} diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index bf8bfab8cc..447cb55e31 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -15,7 +15,7 @@ use crate::{ Gateway, - MAX_FETCH_TIMEOUT_IN_MS, + MAX_FETCH_TIMEOUT, Transport, events::{CertificateRequest, CertificateResponse, Event}, helpers::{Pending, Storage, SyncReceiver, fmt_id, max_redundant_requests}, @@ -23,7 +23,6 @@ use crate::{ spawn_blocking, }; -use snarkos_node_network::PeerPoolHandling; use snarkos_node_sync::{BftSyncMode, BlockSync, InsertBlockResponseError, Ping, locators::BlockLocators}; use snarkos_utilities::CallbackHandle; @@ -136,6 +135,18 @@ impl Sync { } } + /// Waits until the node is synced (has connected peers and is block-synced). + /// Returns immediately if already synced. + pub async fn wait_for_synced(&self) { + self.block_sync.wait_for_synced().await; + } + + /// Returns `None` if the node is already synced. + /// Otherwise, returns a future that completes once the node becomes synced. + pub fn wait_for_synced_if_syncing(&self) -> Option> { + self.block_sync.wait_for_synced_if_syncing() + } + /// Initializes the sync module and sync the storage with the ledger at bootup. pub fn initialize(&self, sync_callback: Option>>) -> Result<()> { // If a callback was provided, set it. @@ -205,7 +216,7 @@ impl Sync { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(MAX_FETCH_TIMEOUT).await; // Remove the expired pending transmission requests. let self__ = self_.clone(); @@ -984,12 +995,6 @@ impl Sync { impl Sync { /// Returns `true` if the node is synced and has connected peers. pub fn is_synced(&self) -> bool { - // Ensure the validator is connected to other validators, - // not just clients. - if self.gateway.number_of_connected_peers() == 0 { - return false; - } - self.block_sync.is_block_synced() } @@ -1045,7 +1050,7 @@ impl Sync { } // Wait for the certificate to be fetched. // TODO (raychu86): Consider making the timeout dynamic based on network traffic and/or the number of validators. - tokio::time::timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) + tokio::time::timeout(MAX_FETCH_TIMEOUT, callback_receiver) .await .with_context(|| format!("Unable to fetch batch certificate {} (timeout)", fmt_id(certificate_id)))? .with_context(|| format!("Unable to fetch batch certificate {}", fmt_id(certificate_id))) diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index e20d7d9eeb..aeb033b16b 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -16,7 +16,7 @@ #[cfg(not(test))] use crate::Gateway; use crate::{ - MAX_FETCH_TIMEOUT_IN_MS, + MAX_FETCH_TIMEOUT, MAX_WORKERS, ProposedBatch, Transport, @@ -43,7 +43,7 @@ use locktick::parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock}; use rand::seq::IteratorRandom; -use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; +use std::{future::Future, net::SocketAddr, sync::Arc}; use tokio::{sync::oneshot, task::JoinHandle, time::timeout}; /// A worker's main role is maintaining a queue of verified ("ready") transmissions, @@ -448,7 +448,7 @@ impl Worker { self.spawn(async move { loop { // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + tokio::time::sleep(MAX_FETCH_TIMEOUT).await; // Remove the expired pending certificate requests. let self__ = self_.clone(); @@ -533,9 +533,9 @@ impl Worker { self.format_transmission_id(transmission_id) ); } - // Wait for the transmission to be fetched. - let transmission = timeout(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS), callback_receiver) + // Wait for the transmission to be fetched. + let transmission = timeout(MAX_FETCH_TIMEOUT, callback_receiver) .await .with_context(|| { format!("Unable to fetch transmission {} (timeout)", self.format_transmission_id(transmission_id)) @@ -618,7 +618,7 @@ mod tests { use bytes::Bytes; use mockall::mock; use rand::RngExt; - use std::{io, ops::Range}; + use std::{io, ops::Range, time::Duration}; type CurrentNetwork = snarkvm::prelude::MainnetV0; diff --git a/node/bft/tests/bft_e2e.rs b/node/bft/tests/bft_e2e.rs index 93115a3215..84e732e1fa 100644 --- a/node/bft/tests/bft_e2e.rs +++ b/node/bft/tests/bft_e2e.rs @@ -21,11 +21,11 @@ mod components; use crate::common::primary::{TestNetwork, TestNetworkConfig}; use deadline::deadline; use itertools::Itertools; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::MAX_FETCH_TIMEOUT; use std::time::Duration; use tokio::time::sleep; -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "long-running e2e test"] async fn test_state_coherence() { const N: u16 = 4; @@ -37,8 +37,6 @@ async fn test_state_coherence() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: true, }) }) @@ -50,7 +48,7 @@ async fn test_state_coherence() { std::future::pending::<()>().await; } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "fails"] async fn test_resync() { // Start N nodes, connect them and start the cannons for each. @@ -62,8 +60,6 @@ async fn test_resync() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: false, }) }) @@ -83,7 +79,6 @@ async fn test_resync() { bft: true, connect_all: false, fire_transmissions: None, - log_level: None, log_connections: false, }); spare_network.start().await; @@ -100,7 +95,7 @@ async fn test_resync() { deadline!(Duration::from_secs(20), move || { network_clone.is_round_reached(RECOVERY_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_threshold() { // Start N nodes but don't connect them. const N: u16 = 4; @@ -112,8 +107,6 @@ async fn test_quorum_threshold() { bft: true, connect_all: false, fire_transmissions: None, - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -129,7 +122,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -140,7 +133,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -158,7 +151,7 @@ async fn test_quorum_threshold() { deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_break() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -169,8 +162,6 @@ async fn test_quorum_break() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -192,7 +183,8 @@ async fn test_quorum_break() { assert!(network.is_halted().await); } -#[tokio::test(flavor = "multi_thread")] +/// Tests that the all validators agree on the same leader for every even round. +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_leader_election_consistency() { // The minimum and maximum rounds to check for leader consistency. // From manual experimentation, the minimum round that works is 4. @@ -209,8 +201,6 @@ async fn test_leader_election_consistency() { bft: true, connect_all: true, fire_transmissions: Some(CANNON_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -249,11 +239,11 @@ async fn test_leader_election_consistency() { println!("Found {} validators with a leader ({} out of sync)", leaders.len(), validators.len() - leaders.len()); // Assert that all leaders are equal - assert!(leaders.iter().all_equal()); + assert!(leaders.iter().all_equal(), "Leaders are not equal: {leaders:?} for round {target_round}"); } } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "run multiple times to see failure"] async fn test_transient_break() { // Start N nodes, connect them and start the cannons for each. @@ -265,8 +255,6 @@ async fn test_transient_break() { bft: true, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(6), log_connections: false, }) }) diff --git a/node/bft/tests/common/primary.rs b/node/bft/tests/common/primary.rs index 54cceb3d99..11301e0e2b 100644 --- a/node/bft/tests/common/primary.rs +++ b/node/bft/tests/common/primary.rs @@ -16,13 +16,13 @@ use crate::common::{ CurrentNetwork, TranslucentLedgerService, - utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions, initialize_logger}, + utils::{fire_unconfirmed_solutions, fire_unconfirmed_transactions}, }; use snarkos_account::Account; use snarkos_node_bft::{ BFT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, MEMORY_POOL_PORT, Primary, helpers::{PrimarySender, Storage, init_primary_channels}, @@ -78,8 +78,6 @@ pub struct TestNetworkConfig { pub connect_all: bool, /// If `Some(i)` is set, the cannons will fire every `i` milliseconds. pub fire_transmissions: Option, - /// The log level to use for the test. - pub log_level: Option, /// If this is set to `true`, the number of connections is logged every 5 seconds. pub log_connections: bool, } @@ -140,10 +138,6 @@ impl TestNetwork { pub fn new(config: TestNetworkConfig) -> Self { let mut rng = TestRng::default(); - if let Some(log_level) = config.log_level { - initialize_logger(log_level); - } - let (accounts, committee) = new_test_committee(config.num_nodes, &mut rng); let bonded_balances: IndexMap<_, _> = committee .members() @@ -259,9 +253,10 @@ impl TestNetwork { pub async fn connect_validators(&self, first_id: u16, second_id: u16) { let first_validator = self.validators.get(&first_id).unwrap(); let second_validator_ip = self.validators.get(&second_id).unwrap().primary.gateway().local_ip(); - let _ = first_validator.primary.gateway().connect(second_validator_ip); - // Give the connection time to be established. - sleep(Duration::from_millis(100)).await; + let gateway = first_validator.primary.gateway(); + let handle = gateway.connect(second_validator_ip).expect("connection attempt failed"); + // Await the full TCP + handshake completion instead of relying on a fixed sleep. + handle.await.unwrap().expect("connecting validators failed"); } // Connects all nodes to each other. @@ -323,7 +318,7 @@ impl TestNetwork { // Checks if all the nodes have stopped progressing. pub async fn is_halted(&self) -> bool { let halt_round = self.validators.values().map(|v| v.primary.current_round()).max().unwrap(); - sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS * 2)).await; + sleep(MAX_BATCH_DELAY * 2).await; self.validators.values().all(|v| v.primary.current_round() <= halt_round) } diff --git a/node/bft/tests/narwhal_e2e.rs b/node/bft/tests/narwhal_e2e.rs index 177db6a159..608d379184 100644 --- a/node/bft/tests/narwhal_e2e.rs +++ b/node/bft/tests/narwhal_e2e.rs @@ -17,14 +17,14 @@ mod common; use crate::common::primary::{TestNetwork, TestNetworkConfig}; -use snarkos_node_bft::MAX_FETCH_TIMEOUT_IN_MS; +use snarkos_node_bft::MAX_FETCH_TIMEOUT; use std::time::Duration; use deadline::deadline; use tokio::time::sleep; -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] #[ignore = "long-running e2e test"] async fn test_state_coherence() { const N: u16 = 4; @@ -36,8 +36,6 @@ async fn test_state_coherence() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: Some(0), log_connections: true, }) }) @@ -52,7 +50,7 @@ async fn test_state_coherence() { std::future::pending::<()>().await; } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_threshold() { // Start N nodes but don't connect them. const N: u16 = 4; @@ -64,8 +62,6 @@ async fn test_quorum_threshold() { bft: false, connect_all: false, fire_transmissions: None, - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -81,7 +77,7 @@ async fn test_quorum_threshold() { // Start the cannons for node 0. network.fire_transmissions_at(0, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -92,7 +88,7 @@ async fn test_quorum_threshold() { network.connect_validators(0, 1).await; network.fire_transmissions_at(1, TRANSMISSION_INTERVAL_MS); - sleep(Duration::from_millis(MAX_FETCH_TIMEOUT_IN_MS)).await; + sleep(MAX_FETCH_TIMEOUT).await; // Check each node is still at round 1. for validator in network.validators.values() { @@ -110,7 +106,7 @@ async fn test_quorum_threshold() { deadline!(Duration::from_secs(20), move || { net.is_round_reached(TARGET_ROUND) }); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_quorum_break() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -121,8 +117,6 @@ async fn test_quorum_break() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) @@ -144,7 +138,7 @@ async fn test_quorum_break() { assert!(network.is_halted().await); } -#[tokio::test(flavor = "multi_thread")] +#[test_log::test(tokio::test(flavor = "multi_thread"))] async fn test_storage_coherence() { // Start N nodes, connect them and start the cannons for each. const N: u16 = 4; @@ -155,8 +149,6 @@ async fn test_storage_coherence() { bft: false, connect_all: true, fire_transmissions: Some(TRANSMISSION_INTERVAL_MS), - // Set this to Some(0..=4) to see the logs. - log_level: None, log_connections: true, }) }) diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index 871b355b11..71bf06c9ab 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -27,7 +27,7 @@ extern crate snarkos_node_metrics as metrics; use snarkos_account::Account; use snarkos_node_bft::{ BFT, - MAX_BATCH_DELAY_IN_MS, + MAX_BATCH_DELAY, Primary, helpers::{ ConsensusReceiver, @@ -64,8 +64,11 @@ use locktick::parking_lot::{Mutex, RwLock}; use lru::LruCache; #[cfg(not(feature = "locktick"))] use parking_lot::{Mutex, RwLock}; -use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration}; -use tokio::{sync::oneshot, task::JoinHandle}; +use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc}; +use tokio::{ + sync::{Notify, oneshot}, + task::JoinHandle, +}; #[cfg(feature = "metrics")] use std::collections::HashMap; @@ -113,6 +116,8 @@ pub struct Consensus { ping: Arc>, /// The block sync logic. block_sync: Arc>, + /// Notifies when a block is committed, and relays it to the primary. + block_commit_notify: Arc, } impl Consensus { @@ -163,6 +168,7 @@ impl Consensus { transmissions_tracker: Default::default(), handles: Default::default(), ping: ping.clone(), + block_commit_notify: Arc::new(Notify::new()), }; info!("Starting the consensus instance..."); @@ -501,13 +507,19 @@ impl Consensus { // Process the unconfirmed transactions in the memory pool. // - // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted - // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions. + // This loop wakes up either when a block is committed (signaled via notify) or after a timeout. + // When a block is committed, the BFT workers have freed capacity for more transmissions. + // The timeout serves as a fallback for startup or idle periods when blocks are not being produced. + // + // TODO(kaimast): wake up the loop after a proposal is created, not only when a block commits. let self_ = self.clone(); self.spawn(async move { loop { - // Sleep briefly. - tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await; + // Wait for either a block commit notification or timeout. + tokio::select! { + _ = self_.block_commit_notify.notified() => {} + _ = tokio::time::sleep(MAX_BATCH_DELAY) => {} + } // Process the unconfirmed transactions in the memory pool. if let Err(err) = self_.process_unconfirmed_transactions().await { warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions"))); @@ -538,12 +550,14 @@ impl Consensus { let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") }; // If the block failed to advance, reinsert the transmissions into the memory pool. - if result.is_err() { - // On failure, reinsert the transmissions into the memory pool. - self.reinsert_transmissions(transmissions).await; + match result { + Ok(true) => { + // On success, notify that the BFT workers have freed capacity for more transmissions. + self.block_commit_notify.notify_one(); + } + Ok(false) | Err(_) => self.reinsert_transmissions(transmissions).await, } - // Send the callback **after** advancing to the next block. - // Note: We must await the block to be advanced before sending the callback. + callback.send(result).ok(); } @@ -551,7 +565,7 @@ impl Consensus { /// /// # Returns /// - `Ok(true)` if the ledger was advanced to the next block. - /// - `Ok(false)` if the block may be valide but the ledger already advanced. + /// - `Ok(false)` if the block may be valid but the ledger already advanced. /// - `Err(anyhow::Error)` for incorrect blocks and any other error. fn try_advance_to_next_block( &self, @@ -568,10 +582,18 @@ impl Consensus { // Create the candidate next block. let ledger_update = self.ledger.begin_ledger_update()?; - let block = match ledger_update - .prepare_advance_to_next_quorum_block(subdag, transmissions) - .and_then(|block| ledger_update.check_next_block(block)) - { + let prepare_instant = std::time::Instant::now(); + let block = match ledger_update.prepare_advance_to_next_quorum_block(subdag, transmissions) { + Ok(block) => block, + Err(err) => return Err(err.into_anyhow()), + }; + let prepare_elapsed = prepare_instant.elapsed(); + trace!("prepare_advance_to_next_quorum_block took {:.3}s", prepare_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::PREPARE_ADVANCE_SECS, prepare_elapsed.as_secs_f64()); + + let check_instant = std::time::Instant::now(); + let block = match ledger_update.check_next_block(block) { Ok(block) => block, Err(CheckBlockError::BlockAlreadyExists { .. }) => { debug!("The given block hash already exists in the ledger"); @@ -587,11 +609,20 @@ impl Consensus { } Err(err) => return Err(err.into_anyhow()), }; + let check_elapsed = check_instant.elapsed(); + trace!("check_next_block took {:.3}s", check_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::CHECK_NEXT_BLOCK_SECS, check_elapsed.as_secs_f64()); let block_height = block.height(); // Advance to the next block. + let advance_instant = std::time::Instant::now(); ledger_update.advance_to_next_block(&block)?; + let advance_elapsed = advance_instant.elapsed(); + trace!("advance_to_next_block took {:.3}s", advance_elapsed.as_secs_f64()); + #[cfg(feature = "metrics")] + metrics::histogram(metrics::consensus::ADVANCE_TO_NEXT_BLOCK_SECS, advance_elapsed.as_secs_f64()); #[cfg(feature = "telemetry")] // Fetch the committee lookback for the latest round. @@ -648,34 +679,36 @@ impl Consensus { // If telemetry is enabled, update participation scores. #[cfg(feature = "telemetry")] - match latest_committee { - Ok(latest_committee) => { - // Retrieve the individual participation scores. - let participation_scores = self - .bft() - .primary() - .gateway() - .validator_telemetry() - .get_participation_scores(&latest_committee); - - // Export the certificate and signature participation scores as individual gauges. - for (address, (certificate_score, signature_score)) in participation_scores { - let address_str = address.to_string(); - metrics::gauge_label( - metrics::consensus::VALIDATOR_CERTIFICATE_PARTICIPATION, - "validator_address", - address_str.clone(), - certificate_score, - ); - metrics::gauge_label( - metrics::consensus::VALIDATOR_SIGNATURE_PARTICIPATION, - "validator_address", - address_str, - signature_score, - ); + { + match latest_committee { + Ok(latest_committee) => { + // Retrieve the individual participation scores. + let participation_scores = self + .bft() + .primary() + .gateway() + .validator_telemetry() + .get_participation_scores(&latest_committee); + + // Export the certificate and signature participation scores as individual gauges. + for (address, (certificate_score, signature_score)) in participation_scores { + let address_str = address.to_string(); + metrics::gauge_label( + metrics::consensus::VALIDATOR_CERTIFICATE_PARTICIPATION, + "validator_address", + address_str.clone(), + certificate_score, + ); + metrics::gauge_label( + metrics::consensus::VALIDATOR_SIGNATURE_PARTICIPATION, + "validator_address", + address_str, + signature_score, + ); + } } + Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))), } - Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))), } } @@ -690,7 +723,7 @@ impl Consensus { match self.reinsert_transmission(transmission_id, transmission).await { Ok(true) => {} Ok(false) => debug!( - "Unable to reinsert transmission {}.{} into the memory pool. Already exists.", + "Unable to reinsert transmission {}:{} into the memory pool. Already exists.", fmt_id(transmission_id), fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed() ), diff --git a/node/metrics/src/names.rs b/node/metrics/src/names.rs index f30837dff0..da70215f71 100644 --- a/node/metrics/src/names.rs +++ b/node/metrics/src/names.rs @@ -99,6 +99,12 @@ pub mod consensus { pub const COMMITTED_CERTIFICATES: &str = "snarkos_consensus_committed_certificates_total"; pub const BLOCK_LATENCY: &str = "snarkos_consensus_block_latency_secs"; pub const BLOCK_LAG: &str = "snarkos_consensus_block_lag_ms"; + /// Time spent in prepare_advance_to_next_quorum_block (block construction). + pub const PREPARE_ADVANCE_SECS: &str = "snarkos_consensus_prepare_advance_secs"; + /// Time spent in check_next_block. + pub const CHECK_NEXT_BLOCK_SECS: &str = "snarkos_consensus_check_next_block_secs"; + /// Time spent in advance_to_next_block (ledger write). + pub const ADVANCE_TO_NEXT_BLOCK_SECS: &str = "snarkos_consensus_advance_to_next_block_secs"; pub const UNCONFIRMED_TRANSACTIONS: &str = "snarkos_consensus_unconfirmed_transactions_total"; pub const UNCONFIRMED_SOLUTIONS: &str = "snarkos_consensus_unconfirmed_solutions_total"; pub const TRANSMISSION_LATENCY: &str = "snarkos_consensus_transmission_latency"; diff --git a/node/router/tests/cleanups.rs b/node/router/tests/cleanups.rs index 2058ae4394..8e30109340 100644 --- a/node/router/tests/cleanups.rs +++ b/node/router/tests/cleanups.rs @@ -97,6 +97,12 @@ async fn test_connection_cleanups() { // Register final heap use. let heap_after_loop = PEAK_ALLOC.current_usage(); - // Final heap use should equal that after the first connection. - assert_eq!(heap_after_one_conn.unwrap(), heap_after_loop); + // Final heap use should be close to that after the first connection. + // We allow up to 1 KiB of growth to accommodate small per-task allocations that + // tokio's runtime retains internally across connections (e.g. sharded queue state). + let heap_growth = heap_after_loop.saturating_sub(heap_after_one_conn.unwrap()); + assert!( + heap_growth <= 1024, + "heap grew by {heap_growth} bytes after {NUM_CONNECTIONS} connections — possible memory leak" + ); } diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index fd76998ccb..22bdb9fa08 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -17,6 +17,7 @@ use crate::{ helpers::{PeerPair, PrepareSyncRequest, SyncRequest}, locators::BlockLocators, }; +use futures::future::BoxFuture; use snarkos_node_bft_ledger_service::{BeginLedgerUpdateError, LedgerService}; use snarkos_node_network::ConnectionMode; use snarkos_node_router::messages::DataBlocks; @@ -30,6 +31,7 @@ use snarkvm::{ }; use anyhow::{Context, Result, anyhow, bail, ensure}; +use futures::FutureExt; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; #[cfg(feature = "locktick")] @@ -232,6 +234,9 @@ pub struct BlockSync { /// Tracks failed requests that need to be re-issued. failed_requests: Mutex>, + + /// Condition variable that wakes up waiting tasks when the node is synced. + synced_notify: Notify, } impl BlockSync { @@ -253,6 +258,7 @@ impl BlockSync { metrics: Default::default(), prepare_requests_lock: Default::default(), failed_requests: Default::default(), + synced_notify: Default::default(), } } @@ -260,6 +266,9 @@ impl BlockSync { /// or block request has been fully processed (either successfully or unsuccessfully). /// /// Used by the outgoing task. + /// + /// # Concurrency + /// Only one task can wait on this at a time. pub async fn wait_for_peer_update(&self) { self.peer_notify.notified().await } @@ -267,6 +276,9 @@ impl BlockSync { /// Blocks until there is a new response to a block request. /// /// Used by the incoming task. + /// + /// # Concurrency + /// Only one task can wait on this at a time. pub async fn wait_for_block_responses(&self) { self.response_notify.notified().await } @@ -277,6 +289,56 @@ impl BlockSync { self.sync_state.read().is_block_synced() } + /// This futures blocks until the node is synced. + /// + /// # Concurrency + /// Multiple tasks can wait on this at the same time safely. + pub async fn wait_for_synced(&self) { + loop { + let mut fut = std::pin::pin!(self.synced_notify.notified()); + + { + let sync_state = self.sync_state.read(); + if sync_state.is_block_synced() { + return; + } + + // Register this task as waiting before dropping the lock. + fut.as_mut().enable(); + } + + fut.await; + } + } + + /// Similar as [`Self::wait_for_synced`] but returns `None` if the node is already synced. + /// Otherwise, it will return a future that behaves like `wait_for_synced`. + /// + /// # Concurrency + /// * This method is atomic, unlike calling `is_synced` and `wait_for_synced` sequentially. + /// * Multiple tasks can wait on this at the same time safely. + pub fn wait_for_synced_if_syncing(&self) -> Option> { + let mut notified = Box::pin(self.synced_notify.notified()); + + { + let sync_state = self.sync_state.read(); + if sync_state.is_block_synced() { + return None; + } + + // Register this task as waiting before dropping the lock. + notified.as_mut().enable(); + } + + Some( + async move { + notified.await; + self.wait_for_synced().await; + } + .boxed(), + ) + } + /// Returns the number of blocks the node is behind the greatest peer height, /// or `None` if no peers are connected yet. #[inline] @@ -873,11 +935,22 @@ impl BlockSync { } // -- Finally, update sync state and notify the sync loop about the change. -- - if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { - self.sync_state.write().set_greatest_peer_height(greatest_peer_height); + let is_synced = if let Some(greatest_peer_height) = + self.locators.read().values().map(|l| l.latest_locator_height()).max() + { + let mut sync_state = self.sync_state.write(); + sync_state.set_greatest_peer_height(greatest_peer_height); + sync_state.is_block_synced() } else { error!("Got new block locators but greatest peer height is zero."); + false + }; + + // For the unlikely case a peer's height gets lowered. + if is_synced { + self.synced_notify.notify_waiters(); } + // Even if the greatest peer height did not change, we still received new block locators // that the sync loop might need to proceed. self.peer_notify.notify_one(); @@ -901,12 +974,25 @@ impl BlockSync { // Remove all block requests to the peer. self.remove_block_requests_to_peer(peer_ip); - // Update sync state, because the greatest peer height may have decreased. - if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() { - self.sync_state.write().set_greatest_peer_height(greatest_peer_height); - } else { - // There are no more peers left. - self.sync_state.write().clear_greatest_peer_height(); + let synced = { + // Do not lock sync state and locators at the same time. + let max_height = self.locators.read().values().map(|l| l.latest_locator_height()).max(); + let mut sync_state = self.sync_state.write(); + + // Update sync state, because the greatest peer height may have decreased. + if let Some(greatest_peer_height) = max_height { + sync_state.set_greatest_peer_height(greatest_peer_height); + } else { + // There are no more peers left. + sync_state.clear_greatest_peer_height(); + } + + sync_state.is_block_synced() + }; + + // For the case where the maximum peer height gets lowered. + if synced { + self.synced_notify.notify_waiters(); } // Notify the sync loop that something changed. @@ -1117,15 +1203,19 @@ impl BlockSync { /// This is a no-op if `new_height` is equal or less to the current sync height. pub fn set_sync_height(&self, new_height: u32) { // Scope state lock to avoid locking state and metrics at the same time. - let fully_synced = { + let (synced, fully_synced) = { let mut state = self.sync_state.write(); state.set_sync_height(new_height); - !state.can_issue_new_block_requests() + (state.is_block_synced(), !state.can_issue_new_block_requests()) }; if fully_synced { self.metrics.mark_fully_synced(); } + + if synced { + self.synced_notify.notify_waiters(); + } } /// Inserts a block request for the given height. @@ -1699,6 +1789,7 @@ mod tests { common_ancestors: RwLock::new(sync.common_ancestors.read().clone()), requests: RwLock::new(sync.requests.read().clone()), sync_state: RwLock::new(sync.sync_state.read().clone()), + synced_notify: Notify::new(), advance_with_sync_blocks_lock: Default::default(), metrics: Default::default(), prepare_requests_lock: Default::default(),