Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 51 additions & 37 deletions node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,9 +689,11 @@ impl<N: Network> BlockSync<N> {
}
}

// Update `is_synced`.
// Update sync state, because the greatest peer height may have decreased.
if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
} else {
error!("Got new block locators but greatest peer height is zero.");
}

// Notify the sync loop that something changed.
Expand All @@ -713,6 +715,14 @@ impl<N: Network> BlockSync<N> {
// Remove all block requests to the peer.
self.remove_block_requests_to_peer(peer_ip);

// Update sync state, because the greatest peer height may have decreased.
if let Some(greatest_peer_height) = self.locators.read().values().map(|l| l.latest_locator_height()).max() {
self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
} else {
// There are no more peers left.
self.sync_state.write().clear_greatest_peer_height();
}

// Notify the sync loop that something changed.
self.peer_notify.notify_one();
}
Expand All @@ -730,8 +740,10 @@ impl<N: Network> BlockSync<N> {
/// This should be called by at most one task at a time.
///
/// # Usage
/// - For validators, the primary spawns one task that periodically calls `bft::Sync::try_block_sync`. There is no possibility of multiple calls to it at a time.
/// - For clients, `Client::initialize_sync` also spawns exactly one task that periodically calls this function.
/// - For validators, the primary spawns exactly one task that periodically calls
/// `bft::Sync::try_issuing_block_requests`. There is no possibility of concurrent calls to it.
/// - For clients, `Client::initialize_sync` spawn exactly one task that periodically calls
/// `Client::try_issuing_block_requests` which calls this function.
/// - Provers do not call this function.
pub fn prepare_block_requests(&self) -> BlockRequestBatch<N> {
// Used to print more information when we max out on requests.
Expand All @@ -750,57 +762,56 @@ impl<N: Network> BlockSync<N> {
// Ensure to not exceed the maximum number of outstanding block requests.
let max_outstanding_block_requests =
(MAX_BLOCK_REQUESTS as u32) * (DataBlocks::<N>::MAXIMUM_NUMBER_OF_BLOCKS as u32);

// Ensure there is a finite bound on the number of block respnoses we receive, that have not been processed yet.
let max_total_requests = 4 * max_outstanding_block_requests;

let max_new_blocks_to_request =
max_outstanding_block_requests.saturating_sub(self.num_outstanding_block_requests() as u32);

// Prepare the block requests.
// Prepare the block requests and sync peers, or returns an empty result if there is nothing to request.
if self.num_total_block_requests() >= max_total_requests as usize {
trace!(
"We are already requested at least {max_total_requests} blocks that have not been fully processed yet. Will not issue more."
);

print_requests();

// Return an empty list of block requests.
(Default::default(), Default::default())
Default::default()
} else if max_new_blocks_to_request == 0 {
trace!(
"Already reached the maximum number of outstanding blocks ({max_outstanding_block_requests}). Will not issue more."
);
print_requests();

// Return an empty list of block requests.
(Default::default(), Default::default())
print_requests();
Default::default()
} else if let Some((sync_peers, min_common_ancestor)) = self.find_sync_peers_inner(current_height) {
// Retrieve the highest block height.
// Retrieve the greatest block height of any connected peer.
// We do not need to update the sync state here, as that already happens when the block locators are received.
let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
// Update the state of `is_block_synced` for the sync module.
self.sync_state.write().set_greatest_peer_height(greatest_peer_height);
// Return the list of block requests.
(
self.construct_requests(
&sync_peers,
current_height,
min_common_ancestor,
max_new_blocks_to_request,
greatest_peer_height,
),
sync_peers,
)

// Construct the list of block requests.
let requests = self.construct_requests(
&sync_peers,
current_height,
min_common_ancestor,
max_new_blocks_to_request,
greatest_peer_height,
);

(requests, sync_peers)
} else if self.requests.read().is_empty() {
// This can happen during a race condition where the node just finished syncing.
// It does not make sense to log or change the sync status here.
// Checking the sync status here also does not make sense, as the node might as well have switched back
// from `synced` to `syncing` between calling `find_sync_peers_inner` and this line.

Default::default()
} else {
// Update `is_block_synced` if there are no pending requests or responses.
if self.requests.read().is_empty() {
trace!("All requests have been processed. Will set block synced to true.");
// Update the state of `is_block_synced` for the sync module.
// TODO(kaimast): remove this workaround
self.sync_state.write().set_greatest_peer_height(0);
} else {
trace!("No new blocks can be requests, but there are still outstanding requests.");
}
// This happens if we already requested all advertised blocks.
trace!("No new blocks can be requested, but there are still outstanding requests.");

// Return an empty list of block requests.
(Default::default(), Default::default())
print_requests();
Default::default()
}
}

Expand Down Expand Up @@ -1043,8 +1054,11 @@ impl<N: Network> BlockSync<N> {
return None;
};

// Retrieve the highest block height.
let greatest_peer_height = sync_peers.values().map(|l| l.latest_locator_height()).max().unwrap_or(0);
// Retrieve the greatest block height of any connected peer.
let Some(greatest_peer_height) = sync_peers.values().map(|l| l.latest_locator_height()).max() else {
warn!("Cannot re-request blocks because no peers are connected");
Comment thread
vicsn marked this conversation as resolved.
return None;
};

debug!("Re-requesting blocks starting at height {start}");

Expand Down
91 changes: 61 additions & 30 deletions node/sync/src/block_sync/sync_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,33 @@ use super::MAX_BLOCKS_BEHIND;

use std::{cmp::Ordering, time::Instant};

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum SyncStatus {
Unsynced, // Never synced or no peers
Syncing, // In progress
Synced, // Fully synced with peers
}

#[derive(Clone)]
pub(super) struct SyncState {
/// The height we synced to already
/// Note: This can be greater than the current ledger height,
/// if blocks are not fully committed yet
sync_height: u32,
/// The largest height of a peer's block locator.
/// Is `None` if we never received a peer loator
/// Is `None` if we never received a peer locator.
greatest_peer_height: Option<u32>,
/// Are we synced?
/// Allows keeping track of when the sync state changes.
is_synced: bool,
status: SyncStatus,
/// Last time the sync state changed
last_change: Instant,
}

impl Default for SyncState {
fn default() -> Self {
Self {
sync_height: 0,
greatest_peer_height: None,
// Start as "synced" by default. Otherwise, validators will never propose certificates.
is_synced: true,
last_change: Instant::now(),
}
// `status` is set to `Synced` by default to ensure validators of a newly created chain generate blocks.
Self { sync_height: 0, greatest_peer_height: None, status: SyncStatus::Synced, last_change: Instant::now() }
}
}

Expand All @@ -55,7 +57,7 @@ impl SyncState {
/// Did we catch up with the greatest known peer height?
/// This will return false if we never synced from a peer.
pub fn is_block_synced(&self) -> bool {
self.is_synced
self.status == SyncStatus::Synced
}

/// Returns `true` if there a blocks to sync from other nodes.
Expand Down Expand Up @@ -113,47 +115,76 @@ impl SyncState {
self.update_is_block_synced();
}

/// Remove the greatest peer height (used when all peers disconnect).
pub fn clear_greatest_peer_height(&mut self) {
// No-op if there is no change.
if self.greatest_peer_height.is_none() {
return;
}

self.greatest_peer_height = None;
self.update_is_block_synced();
}

/// Updates the state of `is_block_synced` for the sync module.
fn update_is_block_synced(&mut self) {
trace!(
"Updating is_block_synced: greatest_peer_height={greatest_peer:?}, current_height={current}, is_synced={is_synced}",
"Updating is_block_synced: greatest_peer_height={greatest_peer:?}, current_height={current}, status={status:?}",
greatest_peer = self.greatest_peer_height,
current = self.sync_height,
is_synced = self.is_synced,
status = self.status,
);

let num_blocks_behind = self.num_blocks_behind();
let old_sync_val = self.is_synced;
let old_status = self.status;

// If there are no block locators, we consider ourselves synced.
// Otherwise, validators will never propose certificates.
let new_sync_val = num_blocks_behind.is_none_or(|num| num <= MAX_BLOCKS_BEHIND);

// Print a message if the state changed
if new_sync_val != old_sync_val {
// Measure how long sync took.
let now = Instant::now();
let elapsed = now.saturating_duration_since(self.last_change).as_secs();
self.last_change = now;

if new_sync_val {
let elapsed =
if elapsed < 60 { format!("{elapsed} seconds") } else { format!("{} minutes", elapsed / 60) };
let new_status = match num_blocks_behind {
Some(num) if num <= MAX_BLOCKS_BEHIND => SyncStatus::Synced,
Some(_) => SyncStatus::Syncing,
None => SyncStatus::Unsynced,
};

// Return early if the state is unchanged
if new_status == old_status {
return;
}

debug!("Block sync state changed to \"synced\". It took {elapsed} to catch up with the network.");
} else {
// Measure how long sync took.
let now = Instant::now();
let elapsed = now.saturating_duration_since(self.last_change).as_secs();

self.status = new_status;
self.last_change = now;

match self.status {
SyncStatus::Synced => {
if old_status == SyncStatus::Syncing {
let elapsed =
if elapsed < 60 { format!("{elapsed} seconds") } else { format!("{} minutes", elapsed / 60) };

debug!("Block sync state changed to \"synced\". It took {elapsed} to catch up with the network.");
} else {
// If we move directly from unsynced to synced, it means we connected to a peer with a lower height.
// In this case it does not make sense to print how long sync took.
debug!("Block sync state changed to \"synced\".");
}
}
SyncStatus::Syncing => {
// num_blocks_behind should never be None at this point,
// but we still use `unwrap_or` just in case.
let behind_msg = num_blocks_behind.map(|n| n.to_string()).unwrap_or("unknown".to_string());

debug!("Block sync state changed to \"syncing\". We are {behind_msg} blocks behind.");
}
SyncStatus::Unsynced => {
debug!("Block sync state changed to \"unsynced\". Connect more peers to resume block sync.");
}
}

self.is_synced = new_sync_val;

// Update the `IS_SYNCED` metric.
#[cfg(feature = "metrics")]
metrics::gauge(metrics::bft::IS_SYNCED, new_sync_val);
metrics::gauge(metrics::bft::IS_SYNCED, self.status == SyncStatus::Synced);
}
}