Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ use futures::SinkExt;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use rand::seq::{IteratorRandom, SliceRandom};
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
future::Future,
io,
net::SocketAddr,
sync::Arc,
time::Duration,
};
use tokio::{
net::TcpStream,
sync::{oneshot, OnceCell},
Expand Down Expand Up @@ -119,6 +126,8 @@ pub struct Gateway<N: Network> {
/// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously
/// attempt to connect to each other). This set is used to prevent this from happening.
connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
/// The cached view of connected validator.
cached_validator_view: Arc<Mutex<HashMap<SocketAddr, Address<N>>>>,
/// The primary sender.
primary_sender: Arc<OnceCell<PrimarySender<N>>>,
/// The worker senders.
Expand Down Expand Up @@ -160,6 +169,7 @@ impl<N: Network> Gateway<N> {
trusted_validators: trusted_validators.iter().copied().collect(),
connected_peers: Default::default(),
connecting_peers: Default::default(),
cached_validator_view: Default::default(),
primary_sender: Default::default(),
worker_senders: Default::default(),
sync_sender: Default::default(),
Expand Down Expand Up @@ -872,6 +882,57 @@ impl<N: Network> Gateway<N> {
self.handle_min_connected_validators();
}

/// Logs the changes in the connected validator set. Only executed if debug-log is enabled.
fn log_connected_validator_changes(&self, resolved_peer_ips: IndexSet<SocketAddr>) {
use tracing::Level;

// Skip the computations if we're not logging debug events
if !enabled!(Level::DEBUG) {
return;
}

// We reserve the write lock for the duration of validator set intersection/difference computations,
// as this function should be called by a single `Gateway::heartbeat`- task only.
let mut cached_view = self.cached_validator_view.lock();
let mut new_view = HashMap::with_capacity(resolved_peer_ips.len());

// `resolver.get_address()` should always return address for SocketAddrs in `resolved_peer_ips`,
// so we do `filter_map` to reduce noise inside a loop.
for (peer_ip, new_address) in resolved_peer_ips
.iter()
.filter_map(|peer_ip| self.resolver.get_address(*peer_ip).map(|address| (peer_ip, address)))
{
match cached_view.get(peer_ip) {
Some(previous_address) if *previous_address == new_address => {
// If previous and new address for `peer_ip` match, then validator is the same since the last check.
// Purposefully using `trace!` to reduce the redundant debug logging here.
let unchanged = format!("{peer_ip} - {new_address} (unchanged)").dimmed();
trace!(" = {}", unchanged);
}
Some(previous_address) => {
// If previous and new address don't match, then peer has switched the validator address.
// TODO: is this actually something we should `warn!` about?
let identity_changed =
format!("{peer_ip} - {new_address} (previous identity {previous_address})").dimmed();
debug!(" + {}", identity_changed);
}
None => {
// Peer not in cached view, so it's a newly joined validator.
let new_validator = format!("{peer_ip} - {new_address}").dimmed();
debug!(" + {}", new_validator);
}
};
new_view.insert(*peer_ip, new_address);
}

// Then go through remaining peers in `cached_view` to resolve the peers not in new view (= removed ones).
for (peer_ip, address) in cached_view.iter().filter(|(peer_ip, _)| !new_view.contains_key(peer_ip)) {
let removed_validator = format!("{peer_ip} - {address}").dimmed();
debug!(" - {}", removed_validator);
}
*cached_view = new_view;
}

/// Logs the connected validators.
fn log_connected_validators(&self) {
// Log the connected validators.
Expand All @@ -887,10 +948,8 @@ impl<N: Network> Gateway<N> {
};
// Log the connected validators.
info!("{connections_msg}");
for peer_ip in validators {
let address = self.resolver.get_address(peer_ip).map_or("Unknown".to_string(), |a| a.to_string());
debug!("{}", format!(" {peer_ip} - {address}").dimmed());
}
// Log the validator change set.
self.log_connected_validator_changes(validators);
}

/// This function attempts to connect to any disconnected trusted validators.
Expand Down
37 changes: 20 additions & 17 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,25 +290,28 @@ impl<N: Network> Primary<N> {
// Construct the event.
// TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
let event = Event::BatchPropose(proposal.batch_header().clone().into());
// Iterate through the non-signers.
for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
// Resolve the address to the peer IP.
match self.gateway.resolver().get_peer_ip_for_address(address) {
// Resend the batch proposal to the validator for signing.
Some(peer_ip) => {
let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
tokio::spawn(async move {
debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
// Resend the batch proposal to the peer.
if gateway.send(peer_ip, event_).await.is_none() {
warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
}
});

// Collect all non_signers into `Vec` to avoid logging and spawning tasks in a loop.
let non_signers: Vec<SocketAddr> = proposal
.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?)
.iter()
.filter_map(|address| self.gateway.resolver().get_peer_ip_for_address(*address))
.collect();

debug!(
"Proposed batch for round {} is still valid, resending to {} peers: '{non_signers:?}'",
proposal.round(),
non_signers.len()
);
for non_signer in non_signers {
let (gateway, event, round) = (self.gateway.clone(), event.clone(), proposal.round());
tokio::spawn(async move {
// Resend the batch proposal to the peer.
if gateway.send(non_signer, event).await.is_none() {
warn!("Failed to resend batch proposal for round {round} to peer '{non_signer}'");
}
None => continue,
}
});
}
debug!("Proposed batch for round {} is still valid", proposal.round());
return Ok(());
}

Expand Down