Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ impl<N: Network> Gateway<N> {
#[cfg(feature = "telemetry")]
self.log_participation_scores();
// Keep the trusted validators connected.
self.handle_trusted_validators();
self.handle_trusted_validators().await;
// Keep the bootstrap peers within the allowed range.
self.handle_bootstrap_peers().await;
// Removes any validators that not in the current committee.
Expand Down Expand Up @@ -1050,16 +1050,36 @@ impl<N: Network> Gateway<N> {
}

/// This function attempts to connect to any disconnected trusted validators.
fn handle_trusted_validators(&self) {
let trusted_peers = self.trusted_peers();
async fn handle_trusted_validators(&self) {
// Collect trusted peer addresses that genuinely need a reconnection attempt.
// Skip peers whose last known Aleo address is already connected via a different IP,
// which prevents spurious reconnection attempts when a validator's IP changes.
let trusted_peers: Vec<_> = {
let pool = self.peer_pool().read();
pool.iter()
.filter_map(|(addr, peer)| {
if !peer.is_trusted() {
return None;
}
if let Peer::Candidate(c) = peer {
if let Some(aleo_addr) = c.last_known_aleo_addr {
if self.is_connected_address(aleo_addr) {
return None;
}
}
Comment on lines +1065 to +1069
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more robust (and simpler) way to check whether there's already a peer connected with the given Aleo address is to use Resolver::get_peer_ip_for_address - the Resolver only ever holds the addresses of connected peers.

}
Some(*addr)
})
.collect()
};

// Attempt to re-establish connections with any trusted peer that is not connected already.
let handles: Vec<JoinHandle<_>> = trusted_peers
let (addrs, handles): (Vec<_>, Vec<JoinHandle<_>>) = trusted_peers
.iter()
.filter_map(|validator_ip| {
// Attempt to connect to the trusted validator.
match self.connect(*validator_ip) {
Ok(hdl) => Some(hdl),
Ok(hdl) => Some((*validator_ip, hdl)),
Err(ConnectError::SelfConnect { .. })
| Err(ConnectError::AlreadyConnected { .. })
| Err(ConnectError::AlreadyConnecting { .. }) => None,
Expand All @@ -1069,10 +1089,15 @@ impl<N: Network> Gateway<N> {
}
}
})
.collect();
.unzip();

if !handles.is_empty() {
info!("Reconnnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
info!("Reconnecting to {} out of {} trusted validators", handles.len(), trusted_peers.len());
for (addr, result) in addrs.into_iter().zip(join_all(handles).await) {
if let Err(err) = result {
warn!("{CONTEXT} Failed to connect to trusted validator at '{addr}' - {err}");
}
}
}
}

Expand Down Expand Up @@ -1498,6 +1523,17 @@ impl<N: Network> Handshake for Gateway<N> {
ConnectionMode::Gateway,
);
}
// Warn if this validator was previously known under a different trusted address.
for (candidate_addr, peer) in self.peer_pool.read().iter() {
if let Peer::Candidate(c) = peer {
if c.trusted && c.last_known_aleo_addr == Some(cr.address) && *candidate_addr != addr {
warn!(
"{CONTEXT} Validator '{addr}' ({}) is connected but is also configured as trusted peer '{candidate_addr}' - the addresses differ",
cr.address
);
}
}
}
info!("{CONTEXT} Connected to '{addr}'");
}
Err(error) => {
Expand Down
15 changes: 12 additions & 3 deletions node/network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{fmt, net::SocketAddr, time::Instant};
#[derive(Clone, Debug)]
pub enum Peer<N: Network> {
/// A candidate peer that's currently not connected to.
Candidate(CandidatePeer),
Candidate(CandidatePeer<N>),
/// A peer that's currently being connected to (the handshake is in progress).
Connecting(ConnectingPeer),
/// A fully connected (post-handshake) peer.
Expand All @@ -41,7 +41,7 @@ pub struct ConnectingPeer {

/// A candidate peer.
#[derive(Clone, Debug)]
pub struct CandidatePeer {
pub struct CandidatePeer<N: Network> {
/// The listening address of a candidate peer.
pub listener_addr: SocketAddr,
/// Indicates whether the peer is considered trusted.
Expand All @@ -53,6 +53,9 @@ pub struct CandidatePeer {
pub last_connection_attempt: Option<Instant>,
/// The total number of connection attempts, since the peer was last connected.
pub total_connection_attempts: u32,
/// The last known Aleo address of this peer, carried over from a prior connection.
/// Used to detect when a validator reconnects from a different IP address.
pub last_known_aleo_addr: Option<Address<N>>,
}

/// A fully connected peer.
Expand Down Expand Up @@ -100,13 +103,14 @@ impl fmt::Display for ConnectionMode {

impl<N: Network> Peer<N> {
/// Create a candidate peer.
pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
pub fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
Self::Candidate(CandidatePeer {
listener_addr,
trusted,
last_height_seen: None,
last_connection_attempt: None,
total_connection_attempts: 0,
last_known_aleo_addr: None,
})
}

Expand Down Expand Up @@ -153,12 +157,17 @@ impl<N: Network> Peer<N> {

/// Demote a peer to candidate status, marking it as disconnected.
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) {
let last_known_aleo_addr = match self {
Self::Connected(p) => Some(p.aleo_addr),
_ => None,
};
*self = Self::Candidate(CandidatePeer {
listener_addr,
trusted: self.is_trusted(),
last_height_seen: self.last_height_seen(),
last_connection_attempt: None,
total_connection_attempts: 0,
last_known_aleo_addr,
});
}

Expand Down
4 changes: 2 additions & 2 deletions node/network/src/peering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Returns the list of candidate peers.
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
fn get_candidate_peers(&self) -> Vec<CandidatePeer<N>> {
self.peer_pool()
.read()
.values()
Expand All @@ -627,7 +627,7 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Returns the list of trusted candidate peers.
fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer> {
fn get_trusted_candidate_peers(&self) -> Vec<CandidatePeer<N>> {
self.peer_pool()
.read()
.values()
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
/// Helper function that attempts to connect the given peers.
///
/// Used by [`Self::handle_trusted_peers`] and [`Self::handle_connected_peers`].
async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer> + Send + 'static) {
async fn try_connect_to_peers(&self, peers: impl Iterator<Item = CandidatePeer<N>> + Send + 'static) {
let (peer_info, hdls): (Vec<_>, Vec<_>) = peers
.filter_map(|peer| {
let peer_type = if peer.trusted { "trusted peer" } else { "peer" };
Expand Down