Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ impl<N: Network> Gateway<N> {
(self.account.address() != aleo_addr
&& !self.is_connected_address(aleo_addr)
&& self.is_authorized_validator_address(aleo_addr))
.then_some(listener_addr)
.then_some((listener_addr, None))
})
.collect::<Vec<_>>();
if !valid_addrs.is_empty() {
Expand Down Expand Up @@ -931,10 +931,10 @@ impl<N: Network> Gateway<N> {
// The trusted ones are already handled by `handle_trusted_validators`.
let trusted_validators = self.trusted_peers();
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
for candidate_addr in self.candidate_peers() {
if !trusted_validators.contains(&candidate_addr) {
for peer in self.get_candidate_peers() {
if !trusted_validators.contains(&peer.listener_addr) {
// Attempt to connect to unconnected validators.
self.connect(candidate_addr);
self.connect(peer.listener_addr);
}
}

Expand Down
72 changes: 63 additions & 9 deletions node/router/messages/src/peer_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PeerResponse {
pub peers: Vec<SocketAddr>,
pub peers: Vec<(SocketAddr, Option<u32>)>,
}

impl MessageTrait for PeerResponse {
Expand All @@ -39,20 +39,59 @@ impl ToBytes for PeerResponse {
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Too many peers: {}", self.peers.len())));
}

// A version indicator; we don't expect empty peer responses, so a zero value can serve
// as an indicator that this message is to be processed differently. The version value
// can be changed to a 2 in the future, once everyone expects it there.
0u8.write_le(&mut writer)?;
Comment thread
vicsn marked this conversation as resolved.

(self.peers.len() as u8).write_le(&mut writer)?;
for peer in self.peers.iter() {
peer.write_le(&mut writer)?;
for (addr, height) in self.peers.iter() {
addr.write_le(&mut writer)?;
if let Some(h) = height {
1u8.write_le(&mut writer)?;
h.write_le(&mut writer)?;
} else {
0u8.write_le(&mut writer)?;
}
}
Ok(())
}
}

impl FromBytes for PeerResponse {
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
let count = u8::read_le(&mut reader)?;
// Read the peer count if their heights aren't present; otherwise, interpret this value
// as the message version. It is a workaround for a currently missing version value.
Comment thread
vicsn marked this conversation as resolved.
// The worst-case scenario is if a node hasn't updated, and it gets a `PeerRequest` from
// its only peer who has; this would cause it to return a message that appears as if it
// contains heights (due to a leading `0`), but it would end up failing to deserialize.
// TODO: after a release or two, we should always be expecting the version to be present,
// simplifying the deserialization; also, remove the `empty_old_peerlist_handling` test.
let mut contains_heights = false;
let count_or_version = u8::read_le(&mut reader)?;
let count = if count_or_version == 0 {
// Version indicator found; this message will contain optional heights.
contains_heights = true;
// If the first value is a zero, the next u8 is the peer count.
u8::read_le(&mut reader)?
} else {
// A non-zero value indicates that this is the "old" PeerResponse without heights.
count_or_version
};

let mut peers = Vec::with_capacity(count as usize);
for _ in 0..count {
peers.push(SocketAddr::read_le(&mut reader)?);
let addr = SocketAddr::read_le(&mut reader)?;
let height = if contains_heights {
match u8::read_le(&mut reader)? {
1 => Some(u32::read_le(&mut reader)?),
0 => None,
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid peer height".to_string())),
}
} else {
None
};
peers.push((addr, height));
}

Ok(Self { peers })
Expand All @@ -69,14 +108,19 @@ pub mod prop_tests {
collection::vec,
prelude::{BoxedStrategy, Strategy, any},
};
use std::net::{IpAddr, SocketAddr};
use std::{
io,
net::{IpAddr, SocketAddr},
};
use test_strategy::proptest;

pub fn any_valid_socket_addr() -> BoxedStrategy<SocketAddr> {
any::<(IpAddr, u16)>().prop_map(|(ip_addr, port)| SocketAddr::new(ip_addr, port)).boxed()
pub fn any_valid_socket_addr() -> BoxedStrategy<(SocketAddr, Option<u32>)> {
any::<(IpAddr, u16, Option<u32>)>()
.prop_map(|(ip_addr, port, height)| (SocketAddr::new(ip_addr, port), height))
.boxed()
}

pub fn any_vec() -> BoxedStrategy<Vec<SocketAddr>> {
pub fn any_vec() -> BoxedStrategy<Vec<(SocketAddr, Option<u32>)>> {
vec(any_valid_socket_addr(), 0..50).prop_map(|v| v).boxed()
}

Expand All @@ -91,4 +135,14 @@ pub mod prop_tests {
let decoded = PeerResponse::read_le(&mut bytes.into_inner().reader()).unwrap();
assert_eq!(decoded, peer_response);
}

// The following test will be obsolete once all the nodes handle heights in the `PeerResponse`.
#[test]
fn empty_old_peerlist_handling() {
// An empty `PeerResponse` without heights contains a single 0u8.
let serialized = &[0u8];
let deserialized = PeerResponse::read_le(&serialized[..]).unwrap_err();
// Check for the expected error.
assert_eq!(deserialized.kind(), io::ErrorKind::UnexpectedEof);
}
}
18 changes: 15 additions & 3 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,21 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Initialize an RNG.
let rng = &mut OsRng;

// Attempt to connect to more peers.
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
self.router().connect(peer_ip);
// Attempt to connect to more peers, separately choosing from those at a greater block
// height, and those whose height is lower or unknown to us.
let own_height = self.router().ledger.latest_block_height();
let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
.router()
.get_candidate_peers()
.into_iter()
.partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
// We may not know of half of `num_deficient` candidates; account for it using `min`.
let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
self.router().connect(peer.listener_addr);
}
for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
self.router().connect(peer.listener_addr);
}

if self.router().allow_external_peers() {
Expand Down
10 changes: 8 additions & 2 deletions node/router/src/helpers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct CandidatePeer {
pub listener_addr: SocketAddr,
/// Indicates whether the peer is considered trusted.
pub trusted: bool,
/// The latest block height known to be associated with the peer.
pub last_height_seen: Option<u32>,
}

/// A fully connected peer.
Expand Down Expand Up @@ -73,7 +75,7 @@ pub struct ConnectedPeer<N: Network> {
impl<N: Network> Peer<N> {
/// Create a candidate peer.
pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
Self::Candidate(CandidatePeer { listener_addr, trusted })
Self::Candidate(CandidatePeer { listener_addr, trusted, last_height_seen: None })
}

/// Create a connecting peer.
Expand Down Expand Up @@ -114,7 +116,11 @@ impl<N: Network> Peer<N> {

/// Demote a peer to candidate status, marking it as disconnected.
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) {
*self = Self::new_candidate(listener_addr, self.is_trusted());
*self = Self::Candidate(CandidatePeer {
listener_addr,
trusted: self.is_trusted(),
last_height_seen: self.last_height_seen(),
});
}

/// Returns the type of the node (only applicable to connected peers).
Expand Down
4 changes: 2 additions & 2 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,15 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {

// Truncate and convert to socket addrs.
peers.truncate(MAX_PEERS_TO_SEND);
let peers = peers.into_iter().map(|peer| peer.listener_addr).collect();
let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();

// Send a `PeerResponse` message to the peer.
self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
true
}

/// Handles a `PeerResponse` message.
fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<SocketAddr>) -> bool {
fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
// Check if the number of peers received is less than MAX_PEERS_TO_SEND.
if peers.len() > MAX_PEERS_TO_SEND {
return false;
Expand Down
36 changes: 27 additions & 9 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,16 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Adds new candidate peers to the peer pool, ensuring their validity and following the
/// limit on the number of peers in the pool.
fn insert_candidate_peers(&self, mut listener_addrs: Vec<SocketAddr>) {
/// limit on the number of peers in the pool. The listener addresses may be paired with
/// the last known block height of the associated peer.
fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
// Hold a write guard from now on, so as not to accidentally slash multiple times
// based on multiple batches of candidate peers, and to not overwrite any entries.
let mut peer_pool = self.peer_pool().write();

// Perform filtering to ensure candidate validity.
listener_addrs.retain(|&addr| {
!peer_pool.contains_key(&addr)
listener_addrs.retain(|&(addr, _)| {
peer_pool.get(&addr).map(|peer| peer.is_candidate()).unwrap_or(true)
Comment thread
ljedrz marked this conversation as resolved.
Outdated
&& !self.is_ip_banned(addr.ip())
&& if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
});
Expand Down Expand Up @@ -266,9 +267,20 @@ pub trait PeerPoolHandling<N: Network>: P2P {
return;
}

// Insert new candidate peers.
for addr in listener_addrs {
peer_pool.insert(addr, Peer::new_candidate(addr, false));
// Insert or update the applicable candidate peers.
for (addr, height) in listener_addrs {
match peer_pool.entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_candidate(addr, false));
}
Entry::Occupied(mut entry) => {
if height.is_some() {
if let Peer::Candidate(peer) = entry.get_mut() {
peer.last_height_seen = height;
}
}
}
}
}
}

Expand Down Expand Up @@ -393,13 +405,19 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Returns the list of candidate peers.
fn candidate_peers(&self) -> Vec<SocketAddr> {
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
let banned_ips = self.tcp().banned_peers().get_banned_ips();
self.peer_pool()
.read()
.iter()
.filter_map(|(addr, peer)| {
(matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr)
if let Peer::Candidate(peer) = peer
&& !banned_ips.contains(&addr.ip())
Comment thread
ljedrz marked this conversation as resolved.
Outdated
{
Some(peer.clone())
} else {
None
}
})
.collect()
}
Expand Down
10 changes: 7 additions & 3 deletions node/tests/peering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ macro_rules! test_reject_unsolicited_peer_response {
// Check the candidate peers.
assert_eq!(node.router().number_of_candidate_peers(), 0);

let peers = vec!["1.1.1.1:1111".parse().unwrap(), "2.2.2.2:2222".parse().unwrap()];
let peers = vec![
("1.1.1.1:1111".parse().unwrap(), None),
("2.2.2.2:2222".parse().unwrap(), None),
];

// Send a `PeerResponse` to the node.
assert!(
Expand All @@ -71,8 +74,9 @@ macro_rules! test_reject_unsolicited_peer_response {
deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0);

// Make sure the sent addresses weren't inserted in the candidate peers.
for peer in peers {
assert!(!node.router().candidate_peers().contains(&peer));
let candidate_peer_addrs = node.router().get_candidate_peers().into_iter().map(|peer| peer.listener_addr).collect::<Vec<_>>();
for (peer, _) in peers {
assert!(!candidate_peer_addrs.contains(&peer));
}
}
}
Expand Down