Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 41 additions & 63 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
MEMORY_POOL_PORT,
Worker,
events::{EventCodec, PrimaryPing},
helpers::{Cache, PrimarySender, Resolver, Storage, SyncSender, WorkerSender, assign_to_worker},
helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
spawn_blocking,
};
use aleo_std::StorageMode;
Expand All @@ -43,7 +43,7 @@ use snarkos_node_bft_events::{
ValidatorsResponse,
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_router::{NodeType, Peer, PeerPoolHandling};
use snarkos_node_router::{NodeType, Peer, PeerPoolHandling, Resolver};
use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
use snarkos_node_tcp::{
Config,
Expand Down Expand Up @@ -161,11 +161,21 @@ pub struct InnerGateway<N: Network> {
}

impl<N: Network> PeerPoolHandling<N> for Gateway<N> {
const MAXIMUM_POOL_SIZE: usize = 200;
const OWNER: &str = CONTEXT;
const PEER_SLASHING_COUNT: usize = 40;
Comment thread
ljedrz marked this conversation as resolved.
Outdated

fn peer_pool(&self) -> &RwLock<HashMap<SocketAddr, Peer<N>>> {
&self.peer_pool
}

fn resolver(&self) -> &RwLock<Resolver<N>> {
&self.resolver
}

fn is_dev(&self) -> bool {
self.dev.is_some()
}
}

impl<N: Network> Gateway<N> {
Expand Down Expand Up @@ -452,27 +462,6 @@ impl<N: Network> Gateway<N> {
}
}

/// Removes the connected peer and adds them to the candidate peers.
fn remove_connected_peer(&self, peer_ip: SocketAddr) {
// Remove the peer from the sync module. Except for some tests, there is always a sync sender.
if let Some(sync_sender) = self.sync_sender.get() {
let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
tokio::spawn(async move {
if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
}
});
}
if let Some(peer) = self.peer_pool.write().get_mut(&peer_ip) {
if let Peer::Connected(connected_peer) = peer {
self.resolver.write().remove_peer(peer_ip, connected_peer.aleo_addr);
}
peer.downgrade_to_candidate(peer_ip);
}
#[cfg(feature = "metrics")]
self.update_metrics();
}

/// Sends the given event to specified peer.
///
/// This function returns as soon as the event is queued to be sent,
Expand Down Expand Up @@ -752,45 +741,24 @@ impl<N: Network> Gateway<N> {
// Decrement the number of validators requests for this peer.
self.cache.decrement_outbound_validators_requests(peer_ip);

// If the number of connected validators is less than the minimum, connect to more validators.
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
// Attempt to connect to any validators that are not already connected.
let self_ = self.clone();
tokio::spawn(async move {
for (validator_ip, validator_address) in validators {
if self_.dev.is_some() {
// Ensure the validator IP is not this node.
if self_.is_local_ip(validator_ip) {
continue;
}
} else {
// Ensure the validator IP is not this node and is well-formed.
if !self_.is_valid_peer_ip(validator_ip) {
continue;
}
}

// Ensure the validator address is not this node.
if self_.account.address() == validator_address {
continue;
}
// Ensure the validator IP is not already connected or connecting.
if self_.is_connected(validator_ip) || self_.is_connecting(validator_ip) {
continue;
}
// Ensure the validator address is not already connected.
if self_.is_connected_address(validator_address) {
continue;
}
// Ensure the validator address is an authorized validator.
if !self_.is_authorized_validator_address(validator_address) {
continue;
}
// Attempt to connect to the validator.
self_.connect(validator_ip);
}
});
// Add valid validators as candidates to the peer pool; only validator-related
// filters need to be applied, the rest is handled by `PeerPoolHandling`.
let valid_addrs = validators
.into_iter()
.filter_map(|(listener_addr, aleo_addr)| {
(self.account.address() != aleo_addr
&& !self.is_connected_address(aleo_addr)
&& self.is_authorized_validator_address(aleo_addr))
.then_some(listener_addr)
})
.collect::<Vec<_>>();
if !valid_addrs.is_empty() {
self.insert_candidate_peers(valid_addrs);
Comment thread
ljedrz marked this conversation as resolved.
}

#[cfg(feature = "metrics")]
self.update_metrics();

Ok(true)
}
Event::WorkerPing(ping) => {
Expand Down Expand Up @@ -1150,13 +1118,23 @@ impl<N: Network> Disconnect for Gateway<N> {
/// Any extra operations to be performed during a disconnect.
async fn handle_disconnect(&self, peer_addr: SocketAddr) {
if let Some(peer_ip) = self.resolve_to_listener(&peer_addr) {
self.remove_connected_peer(peer_ip);

self.downgrade_peer_to_candidate(peer_ip);
// Remove the peer from the sync module. Except for some tests, there is always a sync sender.
if let Some(sync_sender) = self.sync_sender.get() {
let tx_block_sync_remove_peer_ = sync_sender.tx_block_sync_remove_peer.clone();
tokio::spawn(async move {
if let Err(e) = tx_block_sync_remove_peer_.send(peer_ip).await {
Comment thread
kaimast marked this conversation as resolved.
warn!("Unable to remove '{peer_ip}' from the sync module - {e}");
Comment thread
kaimast marked this conversation as resolved.
}
});
}
// We don't clear this map based on time but only on peer disconnect.
// This is sufficient to avoid infinite growth as the committee has a fixed number
// of members.
self.cache.clear_outbound_validators_requests(peer_ip);
self.cache.clear_outbound_block_requests(peer_ip);
#[cfg(feature = "metrics")]
self.update_metrics();
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions node/bft/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub use proposal_cache::*;
pub mod ready;
pub use ready::*;

pub mod resolver;
pub use resolver::*;

pub mod signed_proposals;
pub use signed_proposals::*;

Expand Down
99 changes: 0 additions & 99 deletions node/bft/src/helpers/resolver.rs

This file was deleted.

2 changes: 1 addition & 1 deletion node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl<N: Network> Router<N> {
if let Some(addr) = listener_addr {
if let Ok(ref cr) = handshake_result {
if let Some(peer) = self.peer_pool.write().get_mut(&addr) {
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr);
self.resolver.write().insert_peer(peer.listener_addr(), peer_addr, cr.address);
peer.upgrade_to_connected(peer_addr, cr.listener_port, cr.address, cr.node_type, cr.version);
}
Comment thread
kaimast marked this conversation as resolved.
#[cfg(feature = "metrics")]
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ mod peer;
pub use peer::*;

mod resolver;
pub(crate) use resolver::*;
pub use resolver::*;
89 changes: 73 additions & 16 deletions node/router/src/helpers/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,87 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use snarkvm::prelude::{Address, Network};

use std::{collections::HashMap, net::SocketAddr};

/// The `Resolver` provides the means to map the connected address (used in the lower-level
/// `tcp` module internals, and provided by the OS) to the listener address (used as the
/// unique peer identifier in the higher-level functions).
#[derive(Debug, Default)]
pub(crate) struct Resolver {
/// The map of the connected peer address to the correponding listener address.
/// The resolver contains some reverse maps for peers which are not available
Comment thread
ljedrz marked this conversation as resolved.
Outdated
/// by default to the implementors of PeerPoolHandling (who already contain
Comment thread
ljedrz marked this conversation as resolved.
Outdated
/// maps from the peer's listening address to their various components).
#[derive(Debug)]
pub struct Resolver<N: Network> {
/// The map of the (ambiguous) peer address to listener address.
to_listener: HashMap<SocketAddr, SocketAddr>,
/// A map of `address` to `peer IP`.
Comment thread
ljedrz marked this conversation as resolved.
Outdated
address_peers: HashMap<Address<N>, SocketAddr>,
}

impl<N: Network> Default for Resolver<N> {
/// Initializes a new instance of the resolver.
fn default() -> Self {
Self::new()
}
}

impl<N: Network> Resolver<N> {
/// Initializes a new instance of the resolver.
pub fn new() -> Self {
Self { to_listener: Default::default(), address_peers: Default::default() }
}
}

impl Resolver {
/// Returns the listener address for the given connected address, if it exists.
pub fn get_listener(&self, connected_addr: &SocketAddr) -> Option<SocketAddr> {
self.to_listener.get(connected_addr).copied()
impl<N: Network> Resolver<N> {
/// Returns the listener address for the given (ambiguous) peer address, if it exists.
pub fn get_listener(&self, peer_addr: SocketAddr) -> Option<SocketAddr> {
self.to_listener.get(&peer_addr).copied()
}

/// Inserts a new mapping of a connected address to the corresponding listener address.
pub fn insert_peer(&mut self, listener_addr: SocketAddr, connected_addr: SocketAddr) {
self.to_listener.insert(connected_addr, listener_addr);
/// Returns the peer IP for the given address.
Comment thread
ljedrz marked this conversation as resolved.
Outdated
pub fn get_peer_ip_for_address(&self, address: Address<N>) -> Option<SocketAddr> {
self.address_peers.get(&address).copied()
}

/// Removes the given mapping.
pub fn remove_peer(&mut self, connected_addr: &SocketAddr) {
self.to_listener.remove(connected_addr);
/// Inserts a mapping of a peer's connected address to its listener address,
/// alongside a mapping of the Aleo address to the listener address.
pub fn insert_peer(&mut self, listener_ip: SocketAddr, peer_addr: SocketAddr, address: Address<N>) {
self.to_listener.insert(peer_addr, listener_ip);
self.address_peers.insert(address, listener_ip);
}

/// Removes the mapping of a peer's connected address to its listener address,
/// alongside the mapping of the Aleo address to the listener address.
pub fn remove_peer(&mut self, connected_addr: SocketAddr, aleo_addr: Address<N>) {
self.to_listener.remove(&connected_addr);
self.address_peers.remove(&aleo_addr);
}
}

#[cfg(test)]
mod tests {
use super::*;
use snarkvm::{prelude::Rng, utilities::TestRng};

type CurrentNetwork = snarkvm::prelude::MainnetV0;

#[test]
fn test_resolver() {
Comment thread
ljedrz marked this conversation as resolved.
let mut resolver = Resolver::<CurrentNetwork>::new();
let listener_ip = SocketAddr::from(([127, 0, 0, 1], 1234));
let peer_addr = SocketAddr::from(([127, 0, 0, 1], 4321));
let mut rng = TestRng::default();
let address = Address::<CurrentNetwork>::new(rng.r#gen());

assert!(resolver.get_listener(peer_addr).is_none());
assert!(resolver.get_peer_ip_for_address(address).is_none());

resolver.insert_peer(listener_ip, peer_addr, address);

assert_eq!(resolver.get_listener(peer_addr).unwrap(), listener_ip);
assert_eq!(resolver.get_peer_ip_for_address(address).unwrap(), listener_ip);

resolver.remove_peer(peer_addr, address);

assert!(resolver.get_listener(peer_addr).is_none());
assert!(resolver.get_peer_ip_for_address(address).is_none());
}
}
Loading