diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 82ba0d2b11..15c3b5bda1 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -64,7 +64,14 @@ use futures::SinkExt; use indexmap::{IndexMap, IndexSet}; use parking_lot::{Mutex, RwLock}; use rand::seq::{IteratorRandom, SliceRandom}; -use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + io, + net::SocketAddr, + sync::Arc, + time::Duration, +}; use tokio::{ net::TcpStream, sync::{oneshot, OnceCell}, @@ -119,6 +126,8 @@ pub struct Gateway { /// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously /// attempt to connect to each other). This set is used to prevent this from happening. connecting_peers: Arc>>, + /// The cached view of connected validator. + cached_validator_view: Arc>>>, /// The primary sender. primary_sender: Arc>>, /// The worker senders. @@ -160,6 +169,7 @@ impl Gateway { trusted_validators: trusted_validators.iter().copied().collect(), connected_peers: Default::default(), connecting_peers: Default::default(), + cached_validator_view: Default::default(), primary_sender: Default::default(), worker_senders: Default::default(), sync_sender: Default::default(), @@ -872,6 +882,57 @@ impl Gateway { self.handle_min_connected_validators(); } + /// Logs the changes in the connected validator set. Only executed if debug-log is enabled. + fn log_connected_validator_changes(&self, resolved_peer_ips: IndexSet) { + use tracing::Level; + + // Skip the computations if we're not logging debug events + if !enabled!(Level::DEBUG) { + return; + } + + // We reserve the write lock for the duration of validator set intersection/difference computations, + // as this function should be called by a single `Gateway::heartbeat`- task only. + let mut cached_view = self.cached_validator_view.lock(); + let mut new_view = HashMap::with_capacity(resolved_peer_ips.len()); + + // `resolver.get_address()` should always return address for SocketAddrs in `resolved_peer_ips`, + // so we do `filter_map` to reduce noise inside a loop. + for (peer_ip, new_address) in resolved_peer_ips + .iter() + .filter_map(|peer_ip| self.resolver.get_address(*peer_ip).map(|address| (peer_ip, address))) + { + match cached_view.get(peer_ip) { + Some(previous_address) if *previous_address == new_address => { + // If previous and new address for `peer_ip` match, then validator is the same since the last check. + // Purposefully using `trace!` to reduce the redundant debug logging here. + let unchanged = format!("{peer_ip} - {new_address} (unchanged)").dimmed(); + trace!(" = {}", unchanged); + } + Some(previous_address) => { + // If previous and new address don't match, then peer has switched the validator address. + // TODO: is this actually something we should `warn!` about? + let identity_changed = + format!("{peer_ip} - {new_address} (previous identity {previous_address})").dimmed(); + debug!(" + {}", identity_changed); + } + None => { + // Peer not in cached view, so it's a newly joined validator. + let new_validator = format!("{peer_ip} - {new_address}").dimmed(); + debug!(" + {}", new_validator); + } + }; + new_view.insert(*peer_ip, new_address); + } + + // Then go through remaining peers in `cached_view` to resolve the peers not in new view (= removed ones). + for (peer_ip, address) in cached_view.iter().filter(|(peer_ip, _)| !new_view.contains_key(peer_ip)) { + let removed_validator = format!("{peer_ip} - {address}").dimmed(); + debug!(" - {}", removed_validator); + } + *cached_view = new_view; + } + /// Logs the connected validators. fn log_connected_validators(&self) { // Log the connected validators. @@ -887,10 +948,8 @@ impl Gateway { }; // Log the connected validators. info!("{connections_msg}"); - for peer_ip in validators { - let address = self.resolver.get_address(peer_ip).map_or("Unknown".to_string(), |a| a.to_string()); - debug!("{}", format!(" {peer_ip} - {address}").dimmed()); - } + // Log the validator change set. + self.log_connected_validator_changes(validators); } /// This function attempts to connect to any disconnected trusted validators. diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index f91cd52bf4..f892cce10f 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -290,25 +290,28 @@ impl Primary { // Construct the event. // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers. let event = Event::BatchPropose(proposal.batch_header().clone().into()); - // Iterate through the non-signers. - for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) { - // Resolve the address to the peer IP. - match self.gateway.resolver().get_peer_ip_for_address(address) { - // Resend the batch proposal to the validator for signing. - Some(peer_ip) => { - let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round()); - tokio::spawn(async move { - debug!("Resending batch proposal for round {round} to peer '{peer_ip}'"); - // Resend the batch proposal to the peer. - if gateway.send(peer_ip, event_).await.is_none() { - warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'"); - } - }); + + // Collect all non_signers into `Vec` to avoid logging and spawning tasks in a loop. + let non_signers: Vec = proposal + .nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) + .iter() + .filter_map(|address| self.gateway.resolver().get_peer_ip_for_address(*address)) + .collect(); + + debug!( + "Proposed batch for round {} is still valid, resending to {} peers: '{non_signers:?}'", + proposal.round(), + non_signers.len() + ); + for non_signer in non_signers { + let (gateway, event, round) = (self.gateway.clone(), event.clone(), proposal.round()); + tokio::spawn(async move { + // Resend the batch proposal to the peer. + if gateway.send(non_signer, event).await.is_none() { + warn!("Failed to resend batch proposal for round {round} to peer '{non_signer}'"); } - None => continue, - } + }); } - debug!("Proposed batch for round {} is still valid", proposal.round()); return Ok(()); }