Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f54ed3c
feat(node/consensus): wake up unconfirmed transmission task when a ne…
kaimast Jan 9, 2026
48ed8d7
misc(node/bft): use `test_log` to only print logs of failed tests
kaimast Feb 8, 2026
4321348
refactor(node/bft): introduce dedicated batch proposal task
kaimast Jan 11, 2026
d91d565
refactor(node/bft): use Duration for timing constants
kaimast Mar 5, 2026
0c0b6b6
feat(node/bft): wake up primary when sync is finished
kaimast Mar 9, 2026
695130c
refactor(node/bft): isolate and test proposal logic
kaimast Mar 24, 2026
90e070c
fix(node/bft): prevent concurrent subDAG commit attempts
kaimast Mar 26, 2026
1949178
fix(node/bft): wake up on proposal task on max leader delay
kaimast Mar 30, 2026
cb2caa2
ci: enable reset tests for this branch
kaimast Mar 30, 2026
cb9f143
fix(node/bft): ensure `recently_committed` and `gc_round` are updated
kaimast Mar 30, 2026
da71e92
chore: run cargo update
kaimast Apr 1, 2026
9dbd343
Merge remote-tracking branch 'origin/staging' into feat/batch-triggers
kaimast Apr 6, 2026
0d65b5f
misc(node): fix comments
kaimast Apr 6, 2026
3a10cef
fix(node/bft): avoid busy wait
kaimast Apr 8, 2026
942daae
fix(node/bft): use higher precision when multiplying duration constants
kaimast Apr 7, 2026
6d7e32e
refactor(node/bft): simplify proposal_task logic
kaimast Apr 14, 2026
204c2fd
chore: run cargo update
kaimast Apr 14, 2026
e1ca8de
refactor(node/consensus): make Consensus::reinsert_transmissions infa…
kaimast Apr 14, 2026
89d420d
Merge remote-tracking branch 'origin/staging' into feat/batch-triggers
kaimast Apr 14, 2026
1d9e329
refactor(node/bft): replace `Notify` with `sync::watch`
kaimast Apr 15, 2026
350131a
fix(node/router): fix failing cleanup test with newer version of tokio
kaimast Apr 15, 2026
0fe77ae
fix
kaimast Apr 17, 2026
15f7881
fix(node/bft): handle batch rebroadcasts correctly
kaimast Apr 21, 2026
f1af61f
build: add exception for RUSTSEC-2026-0097
kaimast Apr 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ workflows:
filters:
branches:
only:
- ci/reenable-check-logs
- fix/batch-triggers
- canary
- testnet
- mainnet
Expand All @@ -807,7 +807,7 @@ workflows:
filters:
branches:
only:
- ci/reenable-check-logs
- fix/batch-triggers
- canary
- testnet
- mainnet
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions node/bft/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a ti
```
Note that in this quote `2f + 1` should really be `n - f`.

#### Batch Proposal

Batch proposals are driven by a dedicated **batch proposal task** that runs in a loop and is the only place that calls `propose_batch()`. This keeps proposal on a single execution path and avoids concurrent proposal attempts.

Each iteration of the inner loop waits for the first of these to fire before calling `propose_batch()`:

1. **Ready notification** (`is_ready_notify`) — When the primary advances to a new round (e.g. after a certificate is committed, or in the Narwhal case when storage increments the round), it signals readiness via `is_ready_notify`. The task wakes up and, if the node is synced, calls `propose_batch()`.
2. **Delay timeout** — If not sufficient time has elapsed, the task sets a timer for `MAX_BATCH_DELAY − elapsed`.
3. **Sync completion** If the node is currently syncing, it waits for the state to change to `Synced`. This lets the task wake up as soon as sync finishes without polling.

The primary tracks the latest proposed **(round, timestamp)** in `latest_proposed_batch`. This state is used to: avoid proposing the same round twice; rate-limit the primary's own proposals (via a dedicated check against the previous proposal timestamp); and decide whether to advance when a certificate is received. Peer proposal timestamps are validated separately so that the primary does not accept batches proposed too soon after a peer's previous proposal.

### Ledger Advancement

The BFT module also advances the ledger as new certificates are added to the DAG. There are two different ways the ledger can advance.
Expand Down
97 changes: 67 additions & 30 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

use crate::{
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
primary::{Primary, PrimaryCallback},
sync::SyncCallback,
Expand Down Expand Up @@ -42,6 +42,8 @@ use colored::Colorize;
use indexmap::{IndexMap, IndexSet};
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(feature = "locktick")]
use locktick::tokio::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{
Expand All @@ -52,6 +54,8 @@ use std::{
atomic::{AtomicI64, Ordering},
},
};
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex;
use tokio::sync::{OnceCell, oneshot};

#[derive(Clone)]
Expand All @@ -66,6 +70,12 @@ pub struct BFT<N: Network> {
leader_certificate_timer: Arc<AtomicI64>,
/// The consensus sender.
consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
/// Ensures only one call to `commit_leader_certificate` runs at a time.
///
/// Without this, a second certificate crossing the availability threshold while the consensus
/// callback for a prior commit is still in-flight would re-walk already-committed rounds
/// (because `last_committed_round` hasn't been updated yet), causing duplicate subdag commits.
commit_lock: Arc<Mutex<()>>,
}

impl<N: Network> BFT<N> {
Expand Down Expand Up @@ -98,6 +108,7 @@ impl<N: Network> BFT<N> {
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
consensus_sender: Default::default(),
commit_lock: Default::default(),
})
}

Expand Down Expand Up @@ -207,7 +218,13 @@ impl<N: Network> BFT<N> {
#[async_trait::async_trait]
impl<N: Network> PrimaryCallback<N> for BFT<N> {
/// Notification that a new round has started.
fn update_to_next_round(&self, current_round: u64) -> bool {
///
/// # Arguments
/// * `current_round` - the round the caller is in (to avoid race conditions)
///
/// # Returns
/// `true` if the BFT moved to the next round.
fn try_advance_to_next_round(&self, current_round: u64) -> bool {
// Ensure the current round is at least the storage round (this is a sanity check).
let storage_round = self.storage().current_round();
if current_round < storage_round {
Expand Down Expand Up @@ -482,7 +499,7 @@ impl<N: Network> BFT<N> {
///
/// This is always true for a new BFT instance.
fn is_timer_expired(&self) -> bool {
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
}

/// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
Expand Down Expand Up @@ -580,21 +597,40 @@ impl<N: Network> BFT<N> {
#[cfg(debug_assertions)]
trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());

// Serialize all commits so that `last_committed_round` is up-to-date before the next call
// re-walks the DAG, preventing duplicate subdag commits.
let _commit_guard = self.commit_lock.lock().await;

// Fetch the leader round.
let latest_leader_round = leader_certificate.round();

// Determine the list of all previous leader certificates since the last committed round.
// The order of the leader certificates is from **newest** to **oldest**.
let mut leader_certificates = vec![leader_certificate.clone()];
// Whether the consensus callback should be skipped (true when the round is already committed).
// When `latest_leader_round == last_committed_round` the round was already committed by a
// concurrent call that beat us to the lock, or by a prior session whose DAG state was
// reconstructed without populating `recently_committed`. In both cases we still re-run
// DFS + GC to ensure `recently_committed` and `gc_round` are populated, but we must NOT
// send a duplicate subdag to the consensus callback.
let skip_consensus;
{
// Retrieve the leader round and the latest round we committed.
let leader_round = leader_certificate.round();

// Read-lock the DAG.
// We need to hold the lock, so we do not later fail to re-acquire it.
let dag = self.dag.read();

// Re-check under the lock: another call may have committed this round while we were waiting.
if latest_leader_round < dag.last_committed_round() {
trace!("Skipping already-committed leader round {latest_leader_round}");
return Ok(());
}
skip_consensus = latest_leader_round == dag.last_committed_round();

#[cfg(debug_assertions)]
trace!("Attempting to commit leader certificate for round {}...", latest_leader_round);

let mut current_certificate = leader_certificate;
for round in (dag.last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2) {
for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) {
// Retrieve the previous committee for the leader round.
let previous_committee_lookback =
self.ledger().get_committee_lookback_for_round(round).with_context(|| {
Expand Down Expand Up @@ -721,25 +757,28 @@ impl<N: Network> BFT<N> {
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(_)) => (),
Ok(Err(err)) => {
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
error!("{}", &flatten_error(err));
return Ok(());
}
Err(err) => {
let err: anyhow::Error = err.into();
let err = err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
error!("{}", flatten_error(err));
return Ok(());
// Trigger consensus (skipped if the round was already committed by a prior call).
if !skip_consensus {
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(_)) => (),
Ok(Err(err)) => {
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
error!("{}", &flatten_error(err));
return Ok(());
}
Err(err) => {
let err: anyhow::Error = err.into();
let err =
err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
error!("{}", flatten_error(err));
return Ok(());
}
}
}
}
Expand Down Expand Up @@ -881,7 +920,7 @@ impl<N: Network> BFT<N> {
mod tests {
use crate::{
BFT,
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
PrimaryCallback,
helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
sync::SyncCallback,
Expand Down Expand Up @@ -1114,9 +1153,7 @@ mod tests {
assert!(!result);
}
// Wait for the timer to expire.
let leader_certificate_timeout =
std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
std::thread::sleep(leader_certificate_timeout);
std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
// Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if bft_timer.is_timer_expired() {
Expand Down
6 changes: 3 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::helpers::Telemetry;
use crate::{
CONTEXT,
MAX_BATCH_DELAY_IN_MS,
MAX_BATCH_DELAY,
MEMORY_POOL_PORT,
Worker,
events::{DisconnectReason, EventCodec, PrimaryPing},
Expand Down Expand Up @@ -99,9 +99,9 @@ use tokio_stream::StreamExt;
use tokio_util::codec::Framed;

/// The maximum interval of events to cache.
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
/// The maximum interval of requests to cache.
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds

/// The maximum number of connection attempts in an interval.
#[cfg(not(test))]
Expand Down
6 changes: 3 additions & 3 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::{Gateway, MAX_FETCH_TIMEOUT_IN_MS};
use crate::{Gateway, MAX_FETCH_TIMEOUT};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_network::PeerPoolHandling;
use snarkvm::{
Expand All @@ -36,8 +36,8 @@ use time::OffsetDateTime;
use tokio::sync::oneshot;

/// The maximum number of seconds to wait before expiring a callback.
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds.
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64;
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT` when converting to seconds.
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT.as_secs() as i64;

/// Returns the maximum number of redundant requests for the number of validators in the specified round.
pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> Result<usize> {
Expand Down
10 changes: 5 additions & 5 deletions node/bft/src/helpers/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
use crate::MAX_TIMESTAMP_DELTA;
use snarkvm::prelude::{Result, bail};

use time::OffsetDateTime;
Expand All @@ -36,7 +36,7 @@ pub fn to_utc_datetime(timestamp: i64) -> OffsetDateTime {
/// Sanity checks the timestamp for liveness.
pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
// Ensure the timestamp is within range.
if timestamp > (now() + MAX_TIMESTAMP_DELTA_IN_SECS) {
if timestamp > (now() + MAX_TIMESTAMP_DELTA.as_secs() as i64) {
bail!("Timestamp {timestamp} is too far in the future")
}
Ok(())
Expand All @@ -45,17 +45,17 @@ pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
#[cfg(test)]
mod prop_tests {
use super::*;
use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
use crate::MAX_TIMESTAMP_DELTA;

use proptest::prelude::*;
use test_strategy::proptest;

fn any_valid_timestamp() -> BoxedStrategy<i64> {
(Just(now()), 0..MAX_TIMESTAMP_DELTA_IN_SECS).prop_map(|(now, delta)| now + delta).boxed()
(Just(now()), 0..(MAX_TIMESTAMP_DELTA.as_secs() as i64)).prop_map(|(now, delta)| now + delta).boxed()
}

fn any_invalid_timestamp() -> BoxedStrategy<i64> {
(Just(now()), MAX_TIMESTAMP_DELTA_IN_SECS..).prop_map(|(now, delta)| now + delta).boxed()
(Just(now()), (MAX_TIMESTAMP_DELTA.as_secs() as i64)..).prop_map(|(now, delta)| now + delta).boxed()
}

#[proptest]
Expand Down
42 changes: 30 additions & 12 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub use snarkos_node_bft_events as events;
pub use snarkos_node_bft_ledger_service as ledger_service;
pub use snarkos_node_bft_storage_service as storage_service;

use std::time::Duration;

pub mod helpers;

mod bft;
Expand All @@ -51,24 +53,40 @@ pub const CONTEXT: &str = "[MemoryPool]";
/// The port on which the memory pool listens for incoming connections.
pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The minimum number of seconds to wait before proposing a batch.
pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds
/// The maximum number of seconds before the timestamp is considered expired.
pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum time to wait before proposing a batch.
pub const MAX_BATCH_DELAY: Duration = Duration::from_millis(2500);

/// The minimum time that needs to elapse between two consecutive batch proposals.
/// This creates a lower bound on the block interval, and ensures the network will not be overwhelmed with too many blocks/certificates.
pub const MIN_BATCH_DELAY: Duration = Duration::from_secs(1);

/// The time a primary waits between attempts to create a new batch (only relevant after `MIN_BATCH_DELAY` has passed).
/// This only serves as a failsafe in case the task does not get woken up through other means.
/// Lowering it too much would be wasteful.
pub const CREATE_BATCH_INTERVAL: Duration = Duration::from_millis(250);

/// The maximum time to wait before timing out on a fetch.
/// TODO(kaimast): directy multiply by constant once the `const_trait_impl` feature is stable.
pub const MAX_FETCH_TIMEOUT: Duration = Duration::from_millis(3 * (MAX_BATCH_DELAY.as_millis() as u64));

/// The maximum time allowed for the leader to send their certificate.
/// After this time, the node will consider the leader as failed and try to advance the round without it.
pub const MAX_LEADER_CERTIFICATE_DELAY: Duration = Duration::from_millis(2 * (MAX_BATCH_DELAY.as_millis() as u64));

/// The maximum difference allowed between our local time and a certificate's timestamp, for the node to sign the certificate.
/// This prevents malicious actors from proposing certificates with timestamps that are too log or too far in the future)
/// w
pub const MAX_TIMESTAMP_DELTA: Duration = Duration::from_secs(10);

/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // worker(s)

/// The interval at which each primary broadcasts a ping to every other node.
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
pub const PRIMARY_PING_INTERVAL: Duration = Duration::from_millis(2 * (MAX_BATCH_DELAY.as_millis() as u64));

/// The interval at which each worker broadcasts a ping to every other node.
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms
pub const WORKER_PING_INTERVAL: Duration = Duration::from_millis(4 * (MAX_BATCH_DELAY.as_millis() as u64));

/// A helper macro to spawn a blocking task.
#[macro_export]
Expand Down
Loading