diff --git a/node/bft/src/gateway.rs b/node/bft/src/gateway.rs index 6c72524b59..a8e65f86bc 100644 --- a/node/bft/src/gateway.rs +++ b/node/bft/src/gateway.rs @@ -21,7 +21,7 @@ use crate::{ MEMORY_POOL_PORT, Worker, events::{EventCodec, PrimaryPing}, - helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker}, + helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker}, spawn_blocking, }; use aleo_std::StorageMode; @@ -43,7 +43,7 @@ use snarkos_node_bft_events::{ ValidatorsResponse, }; use snarkos_node_bft_ledger_service::LedgerService; -use snarkos_node_router::{NodeType, Peer, PeerPoolHandling}; +use snarkos_node_router::{NodeType, Peer, PeerPoolHandling, Resolver}; use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService}; use snarkos_node_tcp::{ Config, @@ -161,11 +161,21 @@ pub struct InnerGateway { } impl PeerPoolHandling for Gateway { + const MAXIMUM_POOL_SIZE: usize = 200; const OWNER: &str = CONTEXT; + const PEER_SLASHING_COUNT: usize = 20; fn peer_pool(&self) -> &RwLock>> { &self.peer_pool } + + fn resolver(&self) -> &RwLock> { + &self.resolver + } + + fn is_dev(&self) -> bool { + self.dev.is_some() + } } impl Gateway { @@ -443,7 +453,7 @@ impl Gateway { #[cfg(test)] pub fn insert_connected_peer(&self, peer_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { // Adds a bidirectional map between the listener address and (ambiguous) peer address. - self.resolver.write().insert_peer(peer_ip, peer_addr, address); + self.resolver.write().insert_peer(peer_ip, peer_addr, Some(address)); // Add a transmission for this peer in the connected peers. self.peer_pool.write().insert(peer_ip, Peer::new_connecting(peer_ip, false)); if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { @@ -451,27 +461,6 @@ impl Gateway { } } - /// Removes the connected peer and adds them to the candidate peers. - fn remove_connected_peer(&self, peer_ip: SocketAddr) { - // Remove the peer from the sync module. Except for some tests, there is always a sync sender. - if let Some(sync_sender) = self.sync_sender.get() { - let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone(); - tokio::spawn(async move { - if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await { - warn!("Unable to remove '{peer_ip}' from the sync module - {e}"); - } - }); - } - if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { - if let Peer::Connected(connected_peer) = peer { - self.resolver.write().remove_peer(peer_ip, connected_peer.aleo_addr); - } - peer.downgrade_to_candidate(peer_ip); - } - #[cfg(feature = "metrics")] - self.update_metrics(); - } - /// Sends the given event to specified peer. /// /// This function returns as soon as the event is queued to be sent, @@ -751,45 +740,24 @@ impl Gateway { // Decrement the number of validators requests for this peer. self.cache.decrement_outbound_validators_requests(peer_ip); - // If the number of connected validators is less than the minimum, connect to more validators. - if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize { - // Attempt to connect to any validators that are not already connected. - let self_ = self.clone(); - tokio::spawn(async move { - for (validator_ip, validator_address) in validators { - if self_.dev.is_some() { - // Ensure the validator IP is not this node. - if self_.is_local_ip(validator_ip) { - continue; - } - } else { - // Ensure the validator IP is not this node and is well-formed. - if !self_.is_valid_peer_ip(validator_ip) { - continue; - } - } - - // Ensure the validator address is not this node. - if self_.account.address() == validator_address { - continue; - } - // Ensure the validator IP is not already connected or connecting. - if self_.is_connected(validator_ip) || self_.is_connecting(validator_ip) { - continue; - } - // Ensure the validator address is not already connected. - if self_.is_connected_address(validator_address) { - continue; - } - // Ensure the validator address is an authorized validator. - if !self_.is_authorized_validator_address(validator_address) { - continue; - } - // Attempt to connect to the validator. - self_.connect(validator_ip); - } - }); + // Add valid validators as candidates to the peer pool; only validator-related + // filters need to be applied, the rest is handled by `PeerPoolHandling`. + let valid_addrs = validators + .into_iter() + .filter_map(|(listener_addr, aleo_addr)| { + (self.account.address() != aleo_addr + && !self.is_connected_address(aleo_addr) + && self.is_authorized_validator_address(aleo_addr)) + .then_some(listener_addr) + }) + .collect::>(); + if !valid_addrs.is_empty() { + self.insert_candidate_peers(valid_addrs); } + + #[cfg(feature = "metrics")] + self.update_metrics(); + Ok(true) } Event::WorkerPing(ping) => { @@ -1147,13 +1115,23 @@ impl Disconnect for Gateway { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) { - self.remove_connected_peer(peer_ip); - + self.downgrade_peer_to_candidate(peer_ip); + // Remove the peer from the sync module. Except for some tests, there is always a sync sender. + if let Some(sync_sender) = self.sync_sender.get() { + let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone(); + tokio::spawn(async move { + if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await { + warn!("Unable to remove '{peer_ip}' from the sync module - {e}"); + } + }); + } // We don't clear this map based on time but only on peer disconnect. // This is sufficient to avoid infinite growth as the committee has a fixed number // of members. self.cache.clear_outbound_validators_requests(peer_ip); self.cache.clear_outbound_block_requests(peer_ip); + #[cfg(feature = "metrics")] + self.update_metrics(); } } } @@ -1218,7 +1196,7 @@ impl Handshake for Gateway { match handshake_result { Ok(Some(ref cr)) => { if let Some(peer) = self.peer_pool.write().get_mut(&addr) { - self.resolver.write().insert_peer(addr, peer_addr, cr.address); + self.resolver.write().insert_peer(addr, peer_addr, Some(cr.address)); peer.upgrade_to_connected( peer_addr, cr.listener_port, diff --git a/node/bft/src/helpers/mod.rs b/node/bft/src/helpers/mod.rs index 7d9dd7f531..28f27da3cb 100644 --- a/node/bft/src/helpers/mod.rs +++ b/node/bft/src/helpers/mod.rs @@ -37,9 +37,6 @@ pub use proposal_cache::*; pub mod ready; pub use ready::*; -pub mod resolver; -pub use resolver::*; - pub mod signed_proposals; pub use signed_proposals::*; diff --git a/node/bft/src/helpers/resolver.rs b/node/bft/src/helpers/resolver.rs deleted file mode 100644 index c9ed3ee8db..0000000000 --- a/node/bft/src/helpers/resolver.rs +++ /dev/null @@ -1,99 +0,0 @@ -// Copyright (c) 2019-2025 Provable Inc. -// This file is part of the snarkOS library. - -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at: - -// http://www.apache.org/licenses/LICENSE-2.0 - -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use snarkvm::prelude::{Address, Network}; - -use std::{collections::HashMap, net::SocketAddr}; - -/// The resolver contains some reverse maps for peers which are not available -/// by default to the implementors of PeerPoolHandling (who already contain -/// maps from the peer's listening address to their various components). -#[derive(Debug)] -pub struct Resolver { - /// The map of the (ambiguous) peer address to listener address. - to_listener: HashMap, - /// A map of `address` to `peer IP`. - address_peers: HashMap, SocketAddr>, -} - -impl Default for Resolver { - /// Initializes a new instance of the resolver. - fn default() -> Self { - Self::new() - } -} - -impl Resolver { - /// Initializes a new instance of the resolver. - pub fn new() -> Self { - Self { to_listener: Default::default(), address_peers: Default::default() } - } -} - -impl Resolver { - /// Returns the listener address for the given (ambiguous) peer address, if it exists. - pub fn get_listener(&self, peer_addr: SocketAddr) -> Option { - self.to_listener.get(&peer_addr).copied() - } - - /// Returns the peer IP for the given address. - pub fn get_peer_ip_for_address(&self, address: Address) -> Option { - self.address_peers.get(&address).copied() - } - - /// Inserts a mapping of a peer's connected address to its listener address, - /// alongside a mapping of the Aleo address to the listener address. - pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address) { - self.to_listener.insert(peer_addr, listener_ip); - self.address_peers.insert(address, listener_ip); - } - - /// Removes the mapping of a peer's connected address to its listener address, - /// alongside the mapping of the Aleo address to the listener address. - pub fn remove_peer(&mut self, connected_addr: SocketAddr, aleo_addr: Address) { - self.to_listener.remove(&connected_addr); - self.address_peers.remove(&aleo_addr); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use snarkvm::{prelude::Rng, utilities::TestRng}; - - type CurrentNetwork = snarkvm::prelude::MainnetV0; - - #[test] - fn test_resolver() { - let mut resolver = Resolver::::new(); - let listener_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); - let peer_addr = SocketAddr::from(([127, 0, 0, 1], 4321)); - let mut rng = TestRng::default(); - let address = Address::::new(rng.r#gen()); - - assert!(resolver.get_listener(peer_addr).is_none()); - assert!(resolver.get_peer_ip_for_address(address).is_none()); - - resolver.insert_peer(listener_ip, peer_addr, address); - - assert_eq!(resolver.get_listener(peer_addr).unwrap(), listener_ip); - assert_eq!(resolver.get_peer_ip_for_address(address).unwrap(), listener_ip); - - resolver.remove_peer(peer_addr, address); - - assert!(resolver.get_listener(peer_addr).is_none()); - assert!(resolver.get_peer_ip_for_address(address).is_none()); - } -} diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 07bfee158f..4063688c3e 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -2231,7 +2231,7 @@ mod tests { fn map_account_addresses(primary: &Primary, accounts: &[(SocketAddr, Account)]) { // First account is primary, which doesn't need to resolve. for (addr, acct) in accounts.iter().skip(1) { - primary.gateway.resolver().write().insert_peer(*addr, *addr, acct.address()); + primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address())); } } @@ -2428,7 +2428,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // The primary will only consider itself synced if we received // block locators from a peer. @@ -2467,7 +2467,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // Add a high block locator to indicate we are not synced. primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap(); @@ -2507,7 +2507,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // The primary will only consider itself synced if we received // block locators from a peer. @@ -2544,7 +2544,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2589,7 +2589,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2645,7 +2645,7 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // The primary must be considered synced. primary.sync.try_block_sync().await; @@ -2692,8 +2692,8 @@ mod tests { } // The author must be known to resolver to pass propose checks. - primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); - primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, peer_account.1.address()); + primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); + primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address())); // primary v4 must be considered synced. primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap(); diff --git a/node/router/src/handshake.rs b/node/router/src/handshake.rs index 36fb64dc9c..40c20e731a 100644 --- a/node/router/src/handshake.rs +++ b/node/router/src/handshake.rs @@ -135,7 +135,7 @@ impl Router { match handshake_result { Ok(Some(ref cr)) => { if let Some(peer) = self.peer_pool.write().get_mut(&addr) { - self.resolver.write().insert_peer(peer.listener_addr(), peer_addr); + self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, None); peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version); } #[cfg(feature = "metrics")] diff --git a/node/router/src/helpers/mod.rs b/node/router/src/helpers/mod.rs index 915d3212de..607f9a0519 100644 --- a/node/router/src/helpers/mod.rs +++ b/node/router/src/helpers/mod.rs @@ -20,4 +20,4 @@ mod peer; pub use peer::*; mod resolver; -pub(crate) use resolver::*; +pub use resolver::*; diff --git a/node/router/src/helpers/resolver.rs b/node/router/src/helpers/resolver.rs index 5eed2a0242..b712041380 100644 --- a/node/router/src/helpers/resolver.rs +++ b/node/router/src/helpers/resolver.rs @@ -13,30 +13,98 @@ // See the License for the specific language governing permissions and // limitations under the License. +use snarkvm::prelude::{Address, Network}; + use std::{collections::HashMap, net::SocketAddr}; -/// The `Resolver` provides the means to map the connected address (used in the lower-level -/// `tcp` module internals, and provided by the OS) to the listener address (used as the -/// unique peer identifier in the higher-level functions). -#[derive(Debug, Default)] -pub(crate) struct Resolver { - /// The map of the connected peer address to the correponding listener address. +/// The resolver contains additional reverse maps for peers which are not available +/// by default to the implementors of PeerPoolHandling (which already contains +/// maps from the peer's listening address to their various components). +#[derive(Debug)] +pub struct Resolver { + /// The map of peers' connected addresses to the corresponding listener addresses. to_listener: HashMap, + /// A map of peers' Aleo addresses to the corresponding listener addresses. + /// It is currently only used for the validators. + address_peers: HashMap, SocketAddr>, +} + +impl Default for Resolver { + /// Initializes a new instance of the resolver. + fn default() -> Self { + Self::new() + } +} + +impl Resolver { + /// Initializes a new instance of the resolver. + pub fn new() -> Self { + Self { to_listener: Default::default(), address_peers: Default::default() } + } } -impl Resolver { - /// Returns the listener address for the given connected address, if it exists. - pub fn get_listener(&self, connected_addr: &SocketAddr) -> Option { - self.to_listener.get(connected_addr).copied() +impl Resolver { + /// Returns the listener address for the given connected peer address, if it exists. + pub fn get_listener(&self, connected_addr: SocketAddr) -> Option { + self.to_listener.get(&connected_addr).copied() } - /// Inserts a new mapping of a connected address to the corresponding listener address. - pub fn insert_peer(&mut self, listener_addr: SocketAddr, connected_addr: SocketAddr) { + /// Returns the listener address for the peer with the given Aleo address. + pub fn get_peer_ip_for_address(&self, aleo_addr: Address) -> Option { + self.address_peers.get(&aleo_addr).copied() + } + + /// Inserts a mapping of a peer's connected address to its listener address, + /// alongside an optional mapping of the Aleo address to the listener address. + pub fn insert_peer( + &mut self, + listener_addr: SocketAddr, + connected_addr: SocketAddr, + aleo_addr: Option>, + ) { self.to_listener.insert(connected_addr, listener_addr); + if let Some(addr) = aleo_addr { + self.address_peers.insert(addr, listener_addr); + } } - /// Removes the given mapping. - pub fn remove_peer(&mut self, connected_addr: &SocketAddr) { - self.to_listener.remove(connected_addr); + /// Removes the mapping of a peer's connected address to its listener address, + /// alongside the optional mapping of the Aleo address to the listener address. + pub fn remove_peer(&mut self, connected_addr: SocketAddr, aleo_addr: Option>) { + self.to_listener.remove(&connected_addr); + if let Some(addr) = aleo_addr { + self.address_peers.remove(&addr); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use snarkvm::{prelude::Rng, utilities::TestRng}; + + type CurrentNetwork = snarkvm::prelude::MainnetV0; + + // Test the basic functionalities of the resolver. + #[test] + fn test_resolver() { + let mut resolver = Resolver::::new(); + let listener_ip = SocketAddr::from(([127, 0, 0, 1], 1234)); + let peer_addr = SocketAddr::from(([127, 0, 0, 1], 4321)); + let mut rng = TestRng::default(); + let address = Address::::new(rng.r#gen()); + + assert!(resolver.get_listener(peer_addr).is_none()); + assert!(resolver.get_peer_ip_for_address(address).is_none()); + + resolver.insert_peer(listener_ip, peer_addr, Some(address)); + + assert_eq!(resolver.get_listener(peer_addr).unwrap(), listener_ip); + assert_eq!(resolver.get_peer_ip_for_address(address).unwrap(), listener_ip); + + resolver.remove_peer(peer_addr, Some(address)); + + assert!(resolver.get_listener(peer_addr).is_none()); + assert!(resolver.get_peer_ip_for_address(address).is_none()); } } diff --git a/node/router/src/inbound.rs b/node/router/src/inbound.rs index 4722d0ee54..8d7e8ccc46 100644 --- a/node/router/src/inbound.rs +++ b/node/router/src/inbound.rs @@ -78,7 +78,7 @@ pub trait Inbound: Reading + Outbound { /// propagated to the caller. async fn inbound(&self, peer_addr: SocketAddr, message: Message) -> Result { // Retrieve the listener IP for the peer. - let peer_ip = match self.router().resolve_to_listener(&peer_addr) { + let peer_ip = match self.router().resolve_to_listener(peer_addr) { Some(peer_ip) => peer_ip, None => { // No longer connected to the peer. @@ -179,7 +179,7 @@ pub trait Inbound: Reading + Outbound { bail!("Not accepting peer response from '{peer_ip}' (validator gossip is disabled)"); } - match self.peer_response(peer_ip, &message.peers) { + match self.peer_response(peer_ip, message.peers) { true => Ok(true), false => bail!("Peer '{peer_ip}' sent an invalid peer response"), } @@ -349,20 +349,19 @@ pub trait Inbound: Reading + Outbound { } /// Handles a `PeerResponse` message. - fn peer_response(&self, _peer_ip: SocketAddr, peers: &[SocketAddr]) -> bool { + fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec) -> bool { // Check if the number of peers received is less than MAX_PEERS_TO_SEND. if peers.len() > MAX_PEERS_TO_SEND { return false; } - // Filter out invalid addresses. - let peers = match self.router().is_dev() { - // In development mode, relax the validity requirements to make operating devnets more flexible. - true => peers.iter().copied().filter(|ip| !is_bogon_ip(ip.ip())).collect::>(), - // In production mode, ensure the peer IPs are valid. - false => peers.iter().copied().filter(|ip| self.router().is_valid_peer_ip(*ip)).collect(), - }; // Adds the given peer IPs to the list of candidate peers. - self.router().insert_candidate_peers(&peers); + if !peers.is_empty() { + self.router().insert_candidate_peers(peers); + } + + #[cfg(feature = "metrics")] + self.router().update_metrics(); + true } diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index 33a9fcab2d..a7cdcd9e05 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -70,7 +70,7 @@ use std::{ ops::Deref, str::FromStr, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use tokio::task::JoinHandle; @@ -83,8 +83,20 @@ const PEER_CACHE_FILENAME: &str = "cached_router_peers"; pub trait PeerPoolHandling: P2P { const OWNER: &str; + /// The maximum number of peers permitted to be stored in the peer pool. + const MAXIMUM_POOL_SIZE: usize; + + /// The number of candidate peers to be removed from the pool once `MAXIMUM_POOL_SIZE` is reached. + /// It must be lower than `MAXIMUM_POOL_SIZE`. + const PEER_SLASHING_COUNT: usize; + fn peer_pool(&self) -> &RwLock>>; + fn resolver(&self) -> &RwLock>; + + /// Returns `true` if the owning node is in development mode. + fn is_dev(&self) -> bool; + /// Returns the listener address of this node. fn local_ip(&self) -> SocketAddr { self.tcp().listening_addr().expect("The TCP listener is not enabled") @@ -131,6 +143,10 @@ pub trait PeerPoolHandling: P2P { debug!("{{Self::OWNER}} Dropping connection attempt to '{listener_addr}' (already connecting)"); return Ok(true); } + // If the IP is already banned, reject the attempt. + if self.is_ip_banned(listener_addr.ip()) { + bail!("{} Rejected a connection attempt to a banned IP '{}'", Self::OWNER, listener_addr.ip()); + } Ok(false) } @@ -183,6 +199,84 @@ pub trait PeerPoolHandling: P2P { } } + /// Downgrades a connected peer to candidate status. + fn downgrade_peer_to_candidate(&self, listener_addr: SocketAddr) { + if let Some(peer) = self.peer_pool().write().get_mut(&listener_addr) { + if let Peer::Connected(peer) = peer { + // Only validators get their aleo address registered with the resolver. + let aleo_addr = if peer.node_type == NodeType::Validator { Some(peer.aleo_addr) } else { None }; + self.resolver().write().remove_peer(peer.connected_addr, aleo_addr); + } + peer.downgrade_to_candidate(listener_addr); + } + } + + /// Adds new candidate peers to the peer pool, ensuring their validity and following the + /// limit on the number of peers in the pool. + fn insert_candidate_peers(&self, mut listener_addrs: Vec) { + // Hold a write guard from now on, so as not to accidentally slash multiple times + // based on multiple batches of candidate peers, and to not overwrite any entries. + let mut peer_pool = self.peer_pool().write(); + + // Perform filtering to ensure candidate validity. + listener_addrs.retain(|&addr| { + !peer_pool.contains_key(&addr) + && !self.is_ip_banned(addr.ip()) + && if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) } + }); + + // If we've managed to filter out every entry, there's nothing to do. + if listener_addrs.is_empty() { + return; + } + + // If we're about to exceed the peer pool size limit, apply candidate slashing. + if self.number_of_peers() + listener_addrs.len() >= Self::MAXIMUM_POOL_SIZE && Self::PEER_SLASHING_COUNT != 0 { + // Collect the addresses of prospect peers. + let mut peers_to_slash = peer_pool + .iter() + .filter_map(|(addr, peer)| (matches!(peer, Peer::Candidate(_))).then_some(*addr)) + .collect::>(); + + // Get the low-level peer stats. + let known_peers = self.tcp().known_peers().snapshot(); + + // Sort the list of candidate peers by failure count (descending) and timestamp (ascending). + let default_value = (0, Instant::now()); + peers_to_slash.sort_unstable_by_key(|addr| { + let (num_failures, last_seen) = known_peers + .get(&addr.ip()) + .map(|stats| (stats.failures(), stats.timestamp())) + .unwrap_or(default_value); + (cmp::Reverse(num_failures), last_seen) + }); + + // Retain the candidate peers with the most failures and oldest timestamps. + peers_to_slash.truncate(Self::PEER_SLASHING_COUNT); + + // Remove the peers to slash from the pool. + peer_pool.retain(|addr, _| !peers_to_slash.contains(addr)); + } + + // Make sure that we won't breach the pool size limit in case the slashing didn't suffice. + listener_addrs.truncate(Self::MAXIMUM_POOL_SIZE.saturating_sub(self.number_of_peers())); + + // If we've managed to truncate to 0, exit. + if listener_addrs.is_empty() { + return; + } + + // Insert new candidate peers. + for addr in listener_addrs { + peer_pool.insert(addr, Peer::new_candidate(addr, false)); + } + } + + /// Completely removes an entry from the peer pool. + fn remove_peer(&self, listener_addr: SocketAddr) { + self.peer_pool().write().remove(&listener_addr); + } + /// Returns the connected peer address from the listener IP address. fn resolve_to_ambiguous(&self, listener_addr: SocketAddr) -> Option { if let Some(Peer::Connected(peer)) = self.peer_pool().read().get(&listener_addr) { @@ -216,6 +310,11 @@ pub trait PeerPoolHandling: P2P { self.peer_pool().read().get(&listener_addr).is_some_and(|peer| peer.is_trusted()) } + /// Returns the number of all peers. + fn number_of_peers(&self) -> usize { + self.peer_pool().read().len() + } + /// Returns the number of connected peers. fn number_of_connected_peers(&self) -> usize { self.peer_pool().read().iter().filter(|(_, peer)| peer.is_connected()).count() @@ -294,7 +393,7 @@ pub trait PeerPoolHandling: P2P { } /// Returns the list of candidate peers. - fn candidate_peers(&self) -> HashSet { + fn candidate_peers(&self) -> Vec { let banned_ips = self.tcp().banned_peers().get_banned_ips(); self.peer_pool() .read() @@ -399,15 +498,18 @@ pub trait PeerPoolHandling: P2P { } /// Temporarily IP-ban and disconnect from the peer with the given listener address and an - /// optional reason for the ban. + /// optional reason for the ban. This also removes the peer from the candidate pool. fn ip_ban_peer(&self, listener_addr: SocketAddr, reason: Option<&str>) { let ip = listener_addr.ip(); debug!("IP-banning {ip}{}", reason.map(|r| format!(" reason: {r}")).unwrap_or_default()); - let tcp = self.tcp().clone(); - tcp.banned_peers().update_ip_ban(ip); + // Insert/update the low-level IP ban list. + self.tcp().banned_peers().update_ip_ban(ip); + // Disconnect from the peer. self.disconnect(listener_addr); + // Remove the peer from the pool. + self.remove_peer(listener_addr); } /// Check whether the given IP address is currently banned. @@ -436,11 +538,21 @@ impl Deref for Router { } impl PeerPoolHandling for Router { + const MAXIMUM_POOL_SIZE: usize = 10_000; const OWNER: &str = "[Router]"; + const PEER_SLASHING_COUNT: usize = 200; fn peer_pool(&self) -> &RwLock>> { &self.peer_pool } + + fn resolver(&self) -> &RwLock> { + &self.resolver + } + + fn is_dev(&self) -> bool { + self.is_dev + } } pub struct InnerRouter { @@ -455,7 +567,7 @@ pub struct InnerRouter { /// The cache. cache: Cache, /// The resolver. - resolver: RwLock, + resolver: RwLock>, /// The collection of both candidate and connected peers. peer_pool: RwLock>>, /// The spawned handles. @@ -474,8 +586,6 @@ impl Router { /// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious. #[cfg(not(feature = "test"))] const CONNECTION_ATTEMPTS_SINCE_SECS: i64 = 10; - /// The maximum number of candidate peers permitted to be stored in the node. - const MAXIMUM_CANDIDATE_PEERS: usize = 10_000; /// The maximum amount of connection attempts within a 10 second threshold #[cfg(not(feature = "test"))] const MAX_CONNECTION_ATTEMPTS: usize = 10; @@ -572,9 +682,9 @@ impl Router { self.account.address() } - /// Returns `true` if the node is in development mode. - pub fn is_dev(&self) -> bool { - self.is_dev + /// Returns a reference to the cache. + pub fn cache(&self) -> &Cache { + &self.cache } /// Returns `true` if the node is periodically evicting more external peers. @@ -588,7 +698,7 @@ impl Router { } /// Returns the listener IP address from the (ambiguous) peer address. - pub fn resolve_to_listener(&self, connected_addr: &SocketAddr) -> Option { + pub fn resolve_to_listener(&self, connected_addr: SocketAddr) -> Option { self.resolver.read().get_listener(connected_addr) } @@ -598,58 +708,17 @@ impl Router { } #[cfg(feature = "metrics")] - fn update_metrics(&self) { + pub fn update_metrics(&self) { metrics::gauge(metrics::router::CONNECTED, self.number_of_connected_peers() as f64); metrics::gauge(metrics::router::CANDIDATE, self.number_of_candidate_peers() as f64); } - /// Inserts the given peer IPs to the set of candidate peers. - /// - /// This method skips adding any given peers if the combined size exceeds the threshold, - /// as the peer providing this list could be subverting the protocol. - pub fn insert_candidate_peers(&self, peers: &[SocketAddr]) { - // Compute the maximum number of candidate peers. - let max_candidate_peers = Self::MAXIMUM_CANDIDATE_PEERS.saturating_sub(self.number_of_candidate_peers()); - { - let mut peer_pool = self.peer_pool.write(); - // Ensure the combined number of peers does not surpass the threshold. - let eligible_peers = peers - .iter() - .filter(|&peer_ip| { - // Ensure the peer is not itself, and is not already known. - !self.is_local_ip(*peer_ip) && !peer_pool.contains_key(peer_ip) - }) - .take(max_candidate_peers) - .map(|addr| (*addr, Peer::new_candidate(*addr, false))) - .collect::>(); - - // Proceed to insert the eligible candidate peer IPs. - peer_pool.extend(eligible_peers); - } - #[cfg(feature = "metrics")] - self.update_metrics(); - } - pub fn update_last_seen_for_connected_peer(&self, peer_ip: SocketAddr) { if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { peer.update_last_seen(); } } - /// Removes the connected peer and adds them to the candidate peers. - pub fn remove_connected_peer(&self, peer_ip: SocketAddr) { - if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) { - if let Peer::Connected(peer) = peer { - self.resolver.write().remove_peer(&peer.connected_addr); - } - peer.downgrade_to_candidate(peer_ip); - } - // Clear cached entries applicable to the peer. - self.cache.clear_peer_entries(peer_ip); - #[cfg(feature = "metrics")] - self.update_metrics(); - } - /// Spawns a task with the given future; it should only be used for long-running tasks. pub fn spawn + Send + 'static>(&self, future: T) { self.handles.lock().push(tokio::spawn(future)); diff --git a/node/router/tests/common/router.rs b/node/router/tests/common/router.rs index 821ccb43cf..746571a87d 100644 --- a/node/router/tests/common/router.rs +++ b/node/router/tests/common/router.rs @@ -20,6 +20,7 @@ use snarkos_node_router::{ Outbound, Peer, PeerPoolHandling, + Resolver, Router, Routing, messages::{ @@ -80,11 +81,21 @@ impl P2P for TestRouter { } impl PeerPoolHandling for TestRouter { + const MAXIMUM_POOL_SIZE: usize = 1_000; const OWNER: &str = "[TestRouter]"; + const PEER_SLASHING_COUNT: usize = 0; fn peer_pool(&self) -> &RwLock>> { self.router().peer_pool() } + + fn resolver(&self) -> &RwLock> { + self.router().resolver() + } + + fn is_dev(&self) -> bool { + true + } } #[async_trait] @@ -114,8 +125,8 @@ impl OnConnect for TestRouter { impl Disconnect for TestRouter { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { - if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) { - self.router().remove_connected_peer(peer_ip); + if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { + self.router().downgrade_peer_to_candidate(peer_ip); } } } diff --git a/node/src/client/router.rs b/node/src/client/router.rs index 577f3f1286..7a6be98f44 100644 --- a/node/src/client/router.rs +++ b/node/src/client/router.rs @@ -66,7 +66,7 @@ impl> Handshake for Client { impl> OnConnect for Client { async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. - let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; + let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) else { return }; // If it's a bootstrap peer, first request its peers. if bootstrap_peers::(self.router.is_dev()).contains(&peer_ip) { self.router().send(peer_ip, Message::PeerRequest(PeerRequest)); @@ -80,9 +80,13 @@ impl> OnConnect for Client { impl> Disconnect for Client { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { - if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { self.sync.remove_peer(&peer_ip); - self.router.remove_connected_peer(peer_ip); + self.router.downgrade_peer_to_candidate(peer_ip); + // Clear cached entries applicable to the peer. + self.router.cache().clear_peer_entries(peer_ip); + #[cfg(feature = "metrics")] + self.router.update_metrics(); } } } @@ -123,7 +127,7 @@ impl> Client { // Process the message. Disconnect if the peer violated the protocol. if let Err(error) = self.inbound(peer_addr, message).await { warn!("Failed to process inbound message from '{peer_addr}' - {error}"); - if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { warn!("Disconnecting from '{peer_ip}' for protocol violation"); self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); // Disconnect from this peer. diff --git a/node/src/prover/router.rs b/node/src/prover/router.rs index 1d7aef68e0..ff54719757 100644 --- a/node/src/prover/router.rs +++ b/node/src/prover/router.rs @@ -60,7 +60,7 @@ where { async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. - let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; + let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) else { return }; // Send the first `Ping` message to the peer. self.ping.on_peer_connected(peer_ip); } @@ -70,9 +70,13 @@ where impl> Disconnect for Prover { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { - if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { self.sync.remove_peer(&peer_ip); - self.router.remove_connected_peer(peer_ip); + self.router.downgrade_peer_to_candidate(peer_ip); + // Clear cached entries applicable to the peer. + self.router.cache().clear_peer_entries(peer_ip); + #[cfg(feature = "metrics")] + self.router.update_metrics(); } } } @@ -92,7 +96,7 @@ impl> Reading for Prover { async fn process_message(&self, peer_addr: SocketAddr, message: Self::Message) -> io::Result<()> { // Process the message. Disconnect if the peer violated the protocol. if let Err(error) = self.inbound(peer_addr, message).await { - if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { warn!("Disconnecting from '{peer_addr}' - {error}"); self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); // Disconnect from this peer. diff --git a/node/src/traits.rs b/node/src/traits.rs index 0c031c0ec9..b84786707d 100644 --- a/node/src/traits.rs +++ b/node/src/traits.rs @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use snarkos_node_router::{Routing, messages::NodeType}; +use snarkos_node_router::{PeerPoolHandling, Routing, messages::NodeType}; use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; use once_cell::sync::OnceCell; diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index 622b9aad44..05918b9410 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -66,7 +66,7 @@ where { async fn on_connect(&self, peer_addr: SocketAddr) { // Resolve the peer address to the listener address. - let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) else { return }; + let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) else { return }; // Send the first `Ping` message to the peer. self.ping.on_peer_connected(peer_ip); } @@ -76,9 +76,13 @@ where impl> Disconnect for Validator { /// Any extra operations to be performed during a disconnect. async fn handle_disconnect(&self, peer_addr: SocketAddr) { - if let Some(peer_ip) = self.router.resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router.resolve_to_listener(peer_addr) { self.sync.remove_peer(&peer_ip); - self.router.remove_connected_peer(peer_ip); + self.router.downgrade_peer_to_candidate(peer_ip); + // Clear cached entries applicable to the peer. + self.router.cache().clear_peer_entries(peer_ip); + #[cfg(feature = "metrics")] + self.router.update_metrics(); } } } @@ -119,7 +123,7 @@ impl> Validator { // Process the message. Disconnect if the peer violated the protocol. if let Err(error) = self.inbound(peer_addr, message).await { warn!("Failed to process inbound message from '{peer_addr}' - {error}"); - if let Some(peer_ip) = self.router().resolve_to_listener(&peer_addr) { + if let Some(peer_ip) = self.router().resolve_to_listener(peer_addr) { warn!("Disconnecting from '{peer_ip}' for protocol violation"); self.router().send(peer_ip, Message::Disconnect(DisconnectReason::ProtocolViolation.into())); // Disconnect from this peer. diff --git a/node/sync/src/block_sync.rs b/node/sync/src/block_sync.rs index b2d320ad2d..899801c93a 100644 --- a/node/sync/src/block_sync.rs +++ b/node/sync/src/block_sync.rs @@ -1290,7 +1290,7 @@ mod tests { }; use snarkos_node_bft_ledger_service::MockLedgerService; - use snarkos_node_router::Peer; + use snarkos_node_router::{Peer, Resolver}; use snarkos_node_tcp::{P2P, Tcp}; use snarkvm::{ ledger::committee::Committee, @@ -1319,12 +1319,22 @@ mod tests { } impl PeerPoolHandling for DummyPeerPoolHandler { + const MAXIMUM_POOL_SIZE: usize = 10; const OWNER: &str = "[DummyPeerPoolHandler]"; + const PEER_SLASHING_COUNT: usize = 0; fn peer_pool(&self) -> &RwLock>> { unreachable!(); } + fn resolver(&self) -> &RwLock> { + unreachable!(); + } + + fn is_dev(&self) -> bool { + true + } + fn ip_ban_peer(&self, listener_addr: SocketAddr, _reason: Option<&str>) { self.peers_to_ban.write().push(listener_addr); }