Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f54ed3c
feat(node/consensus): wake up unconfirmed transmission task when a ne…
kaimast Jan 9, 2026
48ed8d7
misc(node/bft): use `test_log` to only print logs of failed tests
kaimast Feb 8, 2026
4321348
refactor(node/bft): introduce dedicated batch proposal task
kaimast Jan 11, 2026
d91d565
refactor(node/bft): use Duration for timing constants
kaimast Mar 5, 2026
0c0b6b6
feat(node/bft): wake up primary when sync is finished
kaimast Mar 9, 2026
695130c
refactor(node/bft): isolate and test proposal logic
kaimast Mar 24, 2026
90e070c
fix(node/bft): prevent concurrent subDAG commit attempts
kaimast Mar 26, 2026
1949178
fix(node/bft): wake up on proposal task on max leader delay
kaimast Mar 30, 2026
cb2caa2
ci: enable reset tests for this branch
kaimast Mar 30, 2026
cb9f143
fix(node/bft): ensure `recently_committed` and `gc_round` are updated
kaimast Mar 30, 2026
da71e92
chore: run cargo update
kaimast Apr 1, 2026
9dbd343
Merge remote-tracking branch 'origin/staging' into feat/batch-triggers
kaimast Apr 6, 2026
0d65b5f
misc(node): fix comments
kaimast Apr 6, 2026
3a10cef
fix(node/bft): avoid busy wait
kaimast Apr 8, 2026
942daae
fix(node/bft): use higher precision when multiplying duration constants
kaimast Apr 7, 2026
6d7e32e
refactor(node/bft): simplify proposal_task logic
kaimast Apr 14, 2026
204c2fd
chore: run cargo update
kaimast Apr 14, 2026
e1ca8de
refactor(node/consensus): make Consensus::reinsert_transmissions infa…
kaimast Apr 14, 2026
89d420d
Merge remote-tracking branch 'origin/staging' into feat/batch-triggers
kaimast Apr 14, 2026
1d9e329
refactor(node/bft): replace `Notify` with `sync::watch`
kaimast Apr 15, 2026
350131a
fix(node/router): fix failing cleanup test with newer version of tokio
kaimast Apr 15, 2026
0fe77ae
fix
kaimast Apr 17, 2026
15f7881
fix(node/bft): handle batch rebroadcasts correctly
kaimast Apr 21, 2026
f1af61f
build: add exception for RUSTSEC-2026-0097
kaimast Apr 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ ignore = [
"RUSTSEC-2026-0002",
# TODO remove once we migrate away from bincode, or it becomes maintained again.
"RUSTSEC-2025-0141",
# TODO remove once termwiz/terminfo update to phf 0.13+, which drops the rand 0.8 dependency.
"RUSTSEC-2026-0097",
]
4 changes: 2 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ workflows:
filters:
branches:
only:
- ci/reenable-check-logs
- fix/batch-triggers
- canary
- testnet
- mainnet
Expand All @@ -807,7 +807,7 @@ workflows:
filters:
branches:
only:
- ci/reenable-check-logs
- fix/batch-triggers
- canary
- testnet
- mainnet
Expand Down
22 changes: 11 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 38 additions & 20 deletions node/bft/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,44 @@ The `snarkos-node-bft` crate provides a node implementation for a BFT-based memo

The primary is the coordinator, responsible for advancing rounds and broadcasting the anchor.

#### Triggering Round Advancement

Each round runs until one of two conditions is met:
1. The coinbase target has been reached, or
2. The round has reached its timeout (currently set to 10 seconds)

#### Advancing Rounds

As described in the paper [Bullshark: The Partially Synchronous Version](https://arxiv.org/abs/2209.05633),
the BFT generally advances rounds when `n − f` vertices are delivered, however:
```
The problem in advancing rounds whenever n − f vertices are delivered is that parties
might not vote for the anchor even if the party that broadcast it is just slightly slower
than the fastest n − f parties. To deal with this, the BFT integrates timeouts into
the DAG construction. If the first n − f vertices a party p gets in an even-numbered round r
do not include the anchor of round r, then p sets a timer and waits for the anchor
until the timer expires. Similarly, in an odd-numbered round, parties wait for either
f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a timeout.
```
Note that in this quote `2f + 1` should really be `n - f`.
#### Round Advancement

A round advances once a quorum (`n - f`) of validators have submitted certificates for that round
and the following round-type-specific conditions are met:

- **Even rounds**: the elected leader's certificate is present among the quorum, confirming the
leader was reachable. If the leader's certificate is absent, the node waits up to
`MAX_LEADER_CERTIFICATE_DELAY` before advancing without it.
- **Odd rounds**: at least `f + 1` certificates from the current round reference the previous
even round's leader certificate (availability threshold), or `n - f` do not (non-leader
quorum). If neither threshold is reached, the node again falls back to the timeout.

In both cases the timeout is `MAX_LEADER_CERTIFICATE_DELAY` (currently 5 seconds), reset at the
start of each round. This follows the [Bullshark](https://arxiv.org/abs/2209.05633) protocol.

#### Batch Proposal

Batch proposals are driven by a dedicated **`ProposalTask`** that runs in a loop and is the only place that calls `Primary::propose_batch()`.
This keeps proposal on a single execution path and avoids concurrent proposal attempts. Each loop iteration covers one full round and proceeds through three stages:

**Stage 1 — Wait until ready to propose**

The task blocks until all of the following conditions are satisfied:
1. The node is synced. If it is currently syncing, the task waits via `wait_for_synced_if_syncing()` before continuing.
2. `MIN_BATCH_DELAY` has elapsed since the start of the round, enforcing a minimum inter-proposal interval.
3. One of two events fires:
- **Ready signal** — `ProposalTask::signal()` is called from `try_increment_to_the_next_round()` when the primary successfully advances to a new round (e.g. after a leader certificate is committed). This is delivered via a `watch` channel.
- **`MAX_BATCH_DELAY` timeout** — If no signal arrives within `MAX_BATCH_DELAY` of the round start, the task proceeds anyway. This handles the case where the elected leader's certificate never arrives.

A short `CREATE_BATCH_INTERVAL` heartbeat keeps the round-change check alive while waiting.

**Stage 2 — Propose**

The task calls `propose_batch()` in a loop until it returns `Ok(true)` (batch submitted). On `Ok(false)` or a transient error it retries every `CREATE_BATCH_INTERVAL`. If the round advances during retries, the task restarts from Stage 1.

**Stage 3 — Wait for signatures**

Once the batch is broadcast, the task periodically calls `propose_batch()` every `MAX_BATCH_DELAY` to rebroadcast to any validators that have not yet signed. It exits this stage as soon as the round advances (detected either via the ready signal or by polling `current_round()`).

### Ledger Advancement

Expand Down
97 changes: 67 additions & 30 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

use crate::{
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
primary::{Primary, PrimaryCallback},
sync::SyncCallback,
Expand Down Expand Up @@ -42,6 +42,8 @@ use colored::Colorize;
use indexmap::{IndexMap, IndexSet};
#[cfg(feature = "locktick")]
use locktick::parking_lot::RwLock;
#[cfg(feature = "locktick")]
use locktick::tokio::Mutex;
#[cfg(not(feature = "locktick"))]
use parking_lot::RwLock;
use std::{
Expand All @@ -52,6 +54,8 @@ use std::{
atomic::{AtomicI64, Ordering},
},
};
#[cfg(not(feature = "locktick"))]
use tokio::sync::Mutex;
use tokio::sync::{OnceCell, oneshot};

#[derive(Clone)]
Expand All @@ -66,6 +70,12 @@ pub struct BFT<N: Network> {
leader_certificate_timer: Arc<AtomicI64>,
/// The consensus sender.
consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
/// Ensures only one call to `commit_leader_certificate` runs at a time.
///
/// Without this, a second certificate crossing the availability threshold while the consensus
/// callback for a prior commit is still in-flight would re-walk already-committed rounds
/// (because `last_committed_round` hasn't been updated yet), causing duplicate subdag commits.
commit_lock: Arc<Mutex<()>>,
}

impl<N: Network> BFT<N> {
Expand Down Expand Up @@ -98,6 +108,7 @@ impl<N: Network> BFT<N> {
leader_certificate: Default::default(),
leader_certificate_timer: Default::default(),
consensus_sender: Default::default(),
commit_lock: Default::default(),
})
}

Expand Down Expand Up @@ -207,7 +218,13 @@ impl<N: Network> BFT<N> {
#[async_trait::async_trait]
impl<N: Network> PrimaryCallback<N> for BFT<N> {
/// Notification that a new round has started.
fn update_to_next_round(&self, current_round: u64) -> bool {
///
/// # Arguments
/// * `current_round` - the round the caller is in (to avoid race conditions)
///
/// # Returns
/// `true` if the BFT moved to the next round.
fn try_advance_to_next_round(&self, current_round: u64) -> bool {
// Ensure the current round is at least the storage round (this is a sanity check).
let storage_round = self.storage().current_round();
if current_round < storage_round {
Expand Down Expand Up @@ -482,7 +499,7 @@ impl<N: Network> BFT<N> {
///
/// This is always true for a new BFT instance.
fn is_timer_expired(&self) -> bool {
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
}

/// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
Expand Down Expand Up @@ -580,21 +597,40 @@ impl<N: Network> BFT<N> {
#[cfg(debug_assertions)]
trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());

// Serialize all commits so that `last_committed_round` is up-to-date before the next call
// re-walks the DAG, preventing duplicate subdag commits.
let _commit_guard = self.commit_lock.lock().await;

// Fetch the leader round.
let latest_leader_round = leader_certificate.round();

// Determine the list of all previous leader certificates since the last committed round.
// The order of the leader certificates is from **newest** to **oldest**.
let mut leader_certificates = vec![leader_certificate.clone()];
// Whether the consensus callback should be skipped (true when the round is already committed).
// When `latest_leader_round == last_committed_round` the round was already committed by a
// concurrent call that beat us to the lock, or by a prior session whose DAG state was
// reconstructed without populating `recently_committed`. In both cases we still re-run
// DFS + GC to ensure `recently_committed` and `gc_round` are populated, but we must NOT
// send a duplicate subdag to the consensus callback.
let skip_consensus;
{
// Retrieve the leader round and the latest round we committed.
let leader_round = leader_certificate.round();

// Read-lock the DAG.
// We need to hold the lock, so we do not later fail to re-acquire it.
let dag = self.dag.read();

// Re-check under the lock: another call may have committed this round while we were waiting.
if latest_leader_round < dag.last_committed_round() {
trace!("Skipping already-committed leader round {latest_leader_round}");
return Ok(());
}
skip_consensus = latest_leader_round == dag.last_committed_round();

#[cfg(debug_assertions)]
trace!("Attempting to commit leader certificate for round {}...", latest_leader_round);

let mut current_certificate = leader_certificate;
for round in (dag.last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2) {
for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) {
// Retrieve the previous committee for the leader round.
let previous_committee_lookback =
self.ledger().get_committee_lookback_for_round(round).with_context(|| {
Expand Down Expand Up @@ -721,25 +757,28 @@ impl<N: Network> BFT<N> {
"BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
);

// Trigger consensus.
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(_)) => (),
Ok(Err(err)) => {
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
error!("{}", &flatten_error(err));
return Ok(());
}
Err(err) => {
let err: anyhow::Error = err.into();
let err = err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
error!("{}", flatten_error(err));
return Ok(());
// Trigger consensus (skipped if the round was already committed by a prior call).
if !skip_consensus {
if let Some(consensus_sender) = self.consensus_sender.get() {
// Initialize a callback sender and receiver.
let (callback_sender, callback_receiver) = oneshot::channel();
// Send the subdag and transmissions to consensus.
consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
// Await the callback to continue.
match callback_receiver.await {
Ok(Ok(_)) => (),
Ok(Err(err)) => {
let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
error!("{}", &flatten_error(err));
return Ok(());
}
Err(err) => {
let err: anyhow::Error = err.into();
let err =
err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
error!("{}", flatten_error(err));
return Ok(());
}
}
}
}
Expand Down Expand Up @@ -881,7 +920,7 @@ impl<N: Network> BFT<N> {
mod tests {
use crate::{
BFT,
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
PrimaryCallback,
helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
sync::SyncCallback,
Expand Down Expand Up @@ -1114,9 +1153,7 @@ mod tests {
assert!(!result);
}
// Wait for the timer to expire.
let leader_certificate_timeout =
std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
std::thread::sleep(leader_certificate_timeout);
std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
// Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if bft_timer.is_timer_expired() {
Expand Down
6 changes: 3 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::helpers::Telemetry;
use crate::{
CONTEXT,
MAX_BATCH_DELAY_IN_MS,
MAX_BATCH_DELAY,
MEMORY_POOL_PORT,
Worker,
events::{DisconnectReason, EventCodec, PrimaryPing},
Expand Down Expand Up @@ -99,9 +99,9 @@ use tokio_stream::StreamExt;
use tokio_util::codec::Framed;

/// The maximum interval of events to cache.
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
/// The maximum interval of requests to cache.
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds

/// The maximum number of connection attempts in an interval.
#[cfg(not(test))]
Expand Down
Loading