Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions node/bft/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ f + 1 vertices that vote for the anchor, or 2f + 1 vertices that do not, or a ti
```
Note that in this quote `2f + 1` should really be `n - f`.

#### Batch Proposal

Batch proposals are driven by a dedicated **batch proposal task** that runs in a loop and is the only place that calls `propose_batch()`. This keeps proposal on a single execution path and avoids concurrent proposal attempts.

Each iteration of the inner loop waits for the first of these to fire before calling `propose_batch()`:

1. **Ready notification** (`is_ready_notify`) — When the primary advances to a new round (e.g. after a certificate is committed, or in the Narwhal case when storage increments the round), it signals readiness via `is_ready_notify`. The task wakes up and, if the node is synced, calls `propose_batch()`.
2. **Delay timeout** — If not sufficient time has elapsed, the task sets a timer for `MAX_BATCH_DELAY − elapsed`.
3. **Sync completion** If the node is currently syncing, it waits for the state to change to `Synced`. This lets the task wake up as soon as sync finishes without polling.

The primary tracks the latest proposed **(round, timestamp)** in `latest_proposed_batch`. This state is used to: avoid proposing the same round twice; rate-limit the primary's own proposals (via a dedicated check against the previous proposal timestamp); and decide whether to advance when a certificate is received. Peer proposal timestamps are validated separately so that the primary does not accept batches proposed too soon after a peer's previous proposal.

### Ledger Advancement

The BFT module also advances the ledger as new certificates are added to the DAG. There are two different ways the ledger can advance.
Expand Down
18 changes: 11 additions & 7 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

use crate::{
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
primary::{Primary, PrimaryCallback},
sync::SyncCallback,
Expand Down Expand Up @@ -207,7 +207,13 @@ impl<N: Network> BFT<N> {
#[async_trait::async_trait]
impl<N: Network> PrimaryCallback<N> for BFT<N> {
/// Notification that a new round has started.
fn update_to_next_round(&self, current_round: u64) -> bool {
///
/// # Arguments
/// * `current_round` - the round the caller is in (to avoid race conditions)
///
/// # Returns
/// `true` if the BFT moved to the next round.
fn try_advance_to_next_round(&self, current_round: u64) -> bool {
// Ensure the current round is at least the storage round (this is a sanity check).
let storage_round = self.storage().current_round();
if current_round < storage_round {
Expand Down Expand Up @@ -482,7 +488,7 @@ impl<N: Network> BFT<N> {
///
/// This is always true for a new BFT instance.
fn is_timer_expired(&self) -> bool {
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
}

/// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
Expand Down Expand Up @@ -877,7 +883,7 @@ impl<N: Network> BFT<N> {
mod tests {
use crate::{
BFT,
MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
MAX_LEADER_CERTIFICATE_DELAY,
PrimaryCallback,
helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
sync::SyncCallback,
Expand Down Expand Up @@ -1109,9 +1115,7 @@ mod tests {
assert!(!result);
}
// Wait for the timer to expire.
let leader_certificate_timeout =
std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
std::thread::sleep(leader_certificate_timeout);
std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
// Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
if bft_timer.is_timer_expired() {
Expand Down
6 changes: 3 additions & 3 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::helpers::Telemetry;
use crate::{
CONTEXT,
MAX_BATCH_DELAY_IN_MS,
MAX_BATCH_DELAY,
MEMORY_POOL_PORT,
Worker,
events::{DisconnectReason, EventCodec, PrimaryPing},
Expand Down Expand Up @@ -102,9 +102,9 @@ use tokio_stream::StreamExt;
use tokio_util::codec::Framed;

/// The maximum interval of events to cache.
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_EVENTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds
/// The maximum interval of requests to cache.
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY_IN_MS / 1000) as i64; // seconds
const CACHE_REQUESTS_INTERVAL: i64 = (MAX_BATCH_DELAY.as_secs()) as i64; // seconds

/// The maximum number of connection attempts in an interval.
#[cfg(not(test))]
Expand Down
6 changes: 3 additions & 3 deletions node/bft/src/helpers/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_FETCH_TIMEOUT_IN_MS;
use crate::MAX_FETCH_TIMEOUT;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkvm::{
console::network::{Network, consensus_config_value},
Expand All @@ -35,8 +35,8 @@ use time::OffsetDateTime;
use tokio::sync::oneshot;

/// The maximum number of seconds to wait before expiring a callback.
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT_IN_MS` when converting to seconds.
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT_IN_MS.div_ceil(1000) as i64;
/// We ensure that we don't truncate `MAX_FETCH_TIMEOUT` when converting to seconds.
pub(crate) const CALLBACK_EXPIRATION_IN_SECS: i64 = MAX_FETCH_TIMEOUT.as_secs() as i64;

/// Returns the maximum number of redundant requests for the number of validators in the specified round.
pub fn max_redundant_requests<N: Network>(ledger: Arc<dyn LedgerService<N>>, round: u64) -> Result<usize> {
Expand Down
10 changes: 5 additions & 5 deletions node/bft/src/helpers/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
use crate::MAX_TIMESTAMP_DELTA;
use snarkvm::prelude::{Result, bail};

use time::OffsetDateTime;
Expand All @@ -36,7 +36,7 @@ pub fn to_utc_datetime(timestamp: i64) -> OffsetDateTime {
/// Sanity checks the timestamp for liveness.
pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
// Ensure the timestamp is within range.
if timestamp > (now() + MAX_TIMESTAMP_DELTA_IN_SECS) {
if timestamp > (now() + MAX_TIMESTAMP_DELTA.as_secs() as i64) {
bail!("Timestamp {timestamp} is too far in the future")
}
Ok(())
Expand All @@ -45,17 +45,17 @@ pub fn check_timestamp_for_liveness(timestamp: i64) -> Result<()> {
#[cfg(test)]
mod prop_tests {
use super::*;
use crate::MAX_TIMESTAMP_DELTA_IN_SECS;
use crate::MAX_TIMESTAMP_DELTA;

use proptest::prelude::*;
use test_strategy::proptest;

fn any_valid_timestamp() -> BoxedStrategy<i64> {
(Just(now()), 0..MAX_TIMESTAMP_DELTA_IN_SECS).prop_map(|(now, delta)| now + delta).boxed()
(Just(now()), 0..(MAX_TIMESTAMP_DELTA.as_secs() as i64)).prop_map(|(now, delta)| now + delta).boxed()
}

fn any_invalid_timestamp() -> BoxedStrategy<i64> {
(Just(now()), MAX_TIMESTAMP_DELTA_IN_SECS..).prop_map(|(now, delta)| now + delta).boxed()
(Just(now()), (MAX_TIMESTAMP_DELTA.as_secs() as i64)..).prop_map(|(now, delta)| now + delta).boxed()
}

#[proptest]
Expand Down
36 changes: 24 additions & 12 deletions node/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub use snarkos_node_bft_events as events;
pub use snarkos_node_bft_ledger_service as ledger_service;
pub use snarkos_node_bft_storage_service as storage_service;

use std::time::Duration;

pub mod helpers;

mod bft;
Expand All @@ -51,24 +53,34 @@ pub const CONTEXT: &str = "[MemoryPool]";
/// The port on which the memory pool listens for incoming connections.
pub const MEMORY_POOL_PORT: u16 = 5000; // port

/// The maximum number of milliseconds to wait before proposing a batch.
pub const MAX_BATCH_DELAY_IN_MS: u64 = 2500; // ms
/// The minimum number of seconds to wait before proposing a batch.
pub const MIN_BATCH_DELAY_IN_SECS: u64 = 1; // seconds
/// The maximum number of milliseconds to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT_IN_MS: u64 = 3 * MAX_BATCH_DELAY_IN_MS; // ms
/// The maximum number of seconds allowed for the leader to send their certificate.
pub const MAX_LEADER_CERTIFICATE_DELAY_IN_SECS: i64 = 2 * MAX_BATCH_DELAY_IN_MS as i64 / 1000; // seconds
/// The maximum number of seconds before the timestamp is considered expired.
pub const MAX_TIMESTAMP_DELTA_IN_SECS: i64 = 10; // seconds
/// The maximum time to wait before proposing a batch.
pub const MAX_BATCH_DELAY: Duration = Duration::from_millis(2500);

/// The minimum time that needs to elapse between two consecutive batch proposals.
/// This creates a lower bound on the block interval, and ensures the network will not be overwhelmed with too many blocks/certificates.
pub const MIN_BATCH_DELAY: Duration = Duration::from_secs(1);

/// The time a primary waits between attempts to create a new batch (only relevant after `MIN_BATCH_DELAY` has passed).
/// This only serves as a failsafe in case the task does not get woken up through other means.
/// eowering it too much will most likely waste
pub const CREATE_BATCH_INTERVAL: Duration = Duration::from_millis(250);

/// The maximum time to wait before timing out on a fetch.
pub const MAX_FETCH_TIMEOUT: Duration = Duration::from_secs(3 * MAX_BATCH_DELAY.as_secs());
/// The maximum time allowed for the leader to send their certificate.
/// After this time, the node will consider the leader as failed and try to advance the round without it.
pub const MAX_LEADER_CERTIFICATE_DELAY: Duration = Duration::from_secs(2 * MAX_BATCH_DELAY.as_secs());
/// The maximum difference allowed between our local time and a certificate's timestamp, for the node to sign the certificate.
/// This prevents malicious actors from proposing certificates with timestamps that are too log or too far in the future)
pub const MAX_TIMESTAMP_DELTA: Duration = Duration::from_secs(10);
/// The maximum number of workers that can be spawned.
pub const MAX_WORKERS: u8 = 1; // worker(s)

/// The interval at which each primary broadcasts a ping to every other node.
/// Note: If this is updated, be sure to update `MAX_BLOCKS_BEHIND` to correspond properly.
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
pub const PRIMARY_PING_INTERVAL: Duration = Duration::from_secs(2 * MAX_BATCH_DELAY.as_secs());
/// The interval at which each worker broadcasts a ping to every other node.
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms
pub const WORKER_PING_INTERVAL: Duration = Duration::from_secs(4 * MAX_BATCH_DELAY.as_secs());

/// A helper macro to spawn a blocking task.
#[macro_export]
Expand Down
Loading