Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 79 additions & 1 deletion beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,38 @@ use warp_utils::{query::multi_key_query, uor::UnifyingOrFilter};

const API_PREFIX: &str = "eth";

/// Translates Lighthouse's internal disconnect reason tag (the `reason` field
/// on `LastDisconnect`, which is a `'static` variant name of `GoodbyeReason`)
/// into the simplified `PeerDisconnectReason` vocabulary defined by the
/// beacon-API peer-scoring proposal.
fn map_disconnect_reason(reason: &str) -> Option<&'static str> {
let mapped = match reason {
"BadScore" | "Banned" | "BannedIP" => "bad_score",
"ClientShutdown" => "client_shutdown",
"IrrelevantNetwork" => "irrelevant_network",
"UnableToVerifyNetwork" => "unviable_fork",
"TooManyPeers" => "too_many_peers",
"Fault" => "io_error",
_ => "unknown",
};
Some(mapped)
}

/// Translates Lighthouse's internal downscore reason tag (the `reason` field
/// on `LastAction`, e.g. tags emitted by `rpc_error_msg` or report-peer call
/// sites) into the simplified `PeerScoreReason` vocabulary defined by the
/// beacon-API peer-scoring proposal.
fn map_downscore_reason(reason: &str) -> &'static str {
match reason {
"rpc_invalid_request" => "rpc_invalid_request",
"rpc_rate_limited" => "rpc_rate_limited",
"rpc_io_error" => "rpc_io_error",
"rpc_stream_timeout" | "rpc_negotiation_timeout" => "rpc_timeout",
"rpc_invalid_data" | "rpc_ssz_decode_error" => "rpc_invalid_response",
_ => "unknown",
}
}

/// A custom type which allows for both unsecured and TLS-enabled HTTP servers.
type HttpServer = (SocketAddr, Pin<Box<dyn Future<Output = ()> + Send>>);

Expand Down Expand Up @@ -2402,12 +2434,36 @@ pub fn serve<T: BeaconChainTypes>(

// the eth2 API spec implies only peers we have been connected to at some point should be included.
if let Some(&dir) = peer_info.connection_direction() {
let agent_version = peer_info.client().agent_string.clone();
let score = Some(peer_info.score().score());
let state: api_types::PeerState =
peer_info.connection_status().clone().into();
// Per beacon-API spec, `disconnect_reason` MUST only be populated
// when `state` is `disconnected` or `disconnecting`.
let disconnect_reason = if matches!(
state,
api_types::PeerState::Disconnected
| api_types::PeerState::Disconnecting
) {
peer_info.last_disconnect().and_then(|d| {
map_disconnect_reason(d.reason).map(|s| s.to_string())
})
} else {
None
};
let downscore_reasons = peer_info
.last_action()
.map(|a| vec![map_downscore_reason(a.reason).to_string()]);
return Ok(api_types::GenericResponse::from(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr().map(|enr| enr.to_base64()),
last_seen_p2p_address: address,
direction: dir.into(),
state: peer_info.connection_status().clone().into(),
state,
agent_version,
score,
disconnect_reason,
downscore_reasons,
}));
}
}
Expand Down Expand Up @@ -2463,12 +2519,34 @@ pub fn serve<T: BeaconChainTypes>(
.is_none_or(|directions| directions.contains(&direction));

if state_matches && direction_matches {
let agent_version = peer_info.client().agent_string.clone();
let score = Some(peer_info.score().score());
// Per beacon-API spec, `disconnect_reason` MUST only be
// populated when `state` is `disconnected` or `disconnecting`.
let disconnect_reason = if matches!(
state,
api_types::PeerState::Disconnected
| api_types::PeerState::Disconnecting
) {
peer_info.last_disconnect().and_then(|d| {
map_disconnect_reason(d.reason).map(|s| s.to_string())
})
} else {
None
};
let downscore_reasons = peer_info
.last_action()
.map(|a| vec![map_downscore_reason(a.reason).to_string()]);
peers.push(api_types::PeerData {
peer_id: peer_id.to_string(),
enr: peer_info.enr().map(|enr| enr.to_base64()),
last_seen_p2p_address: address,
direction,
state,
agent_version,
score,
disconnect_reason,
downscore_reasons,
});
}
}
Expand Down
19 changes: 19 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3220,6 +3220,10 @@ impl ApiTester {
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected,
direction: PeerDirection::Inbound,
agent_version: result.agent_version.clone(),
score: result.score,
disconnect_reason: result.disconnect_reason.clone(),
downscore_reasons: result.downscore_reasons.clone(),
};

assert_eq!(result, expected);
Expand All @@ -3246,12 +3250,27 @@ impl ApiTester {
for states in peer_states {
for dirs in peer_dirs.clone() {
let result = self.client.get_node_peers(states, dirs).await.unwrap();
let (agent_version, score, disconnect_reason, downscore_reasons) =
if let Some(peer) = result.data.first() {
(
peer.agent_version.clone(),
peer.score,
peer.disconnect_reason.clone(),
peer.downscore_reasons.clone(),
)
} else {
(None, None, None, None)
};
let expected_peer = PeerData {
peer_id: self.external_peer_id.to_string(),
enr: None,
last_seen_p2p_address: EXTERNAL_ADDR.to_string(),
state: PeerState::Connected,
direction: PeerDirection::Inbound,
agent_version,
score,
disconnect_reason,
downscore_reasons,
};

let state_match =
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ unsigned-varint = { version = "0.8", features = ["codec"] }
async-channel = { workspace = true }
logging = { workspace = true }
proptest = { workspace = true }
serde_json = { workspace = true }
tempfile = { workspace = true }

[[test]]
Expand Down
66 changes: 58 additions & 8 deletions beacon_node/lighthouse_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p::multiaddr;
use network_utils::discovery_metrics;
use network_utils::enr_ext::{EnrExt, peer_id_to_node_id};
pub use peerdb::peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use peerdb::score::{PeerAction, ReportSource};
use peerdb::score::{DisconnectDirection, LastDisconnect, PeerAction, ReportSource};
pub use peerdb::sync_status::{SyncInfo, SyncStatus};
use std::collections::{HashMap, HashSet, hash_map::Entry};
use std::net::IpAddr;
Expand Down Expand Up @@ -151,6 +151,49 @@ pub enum PeerManagerEvent {
DiscoverSubnetPeers(Vec<SubnetDiscovery>),
}

/// Maps an [`RPCError`] (and its inner status code, where applicable) to a
/// stable, granular `'static` tag used as the `msg` for `report_peer` so
/// external consumers can distinguish e.g. a rate limit from a ssz decode
/// failure or an unsupported protocol.
fn rpc_error_msg(err: &RPCError) -> &'static str {
match err {
RPCError::IncompleteStream => "rpc_incomplete_stream",
RPCError::HandlerRejected => "rpc_handler_rejected",
RPCError::InvalidData(_) => "rpc_invalid_data",
RPCError::SSZDecodeError(_) => "rpc_ssz_decode_error",
RPCError::IoError(_) => "rpc_io_error",
RPCError::NegotiationTimeout => "rpc_negotiation_timeout",
RPCError::StreamTimeout => "rpc_stream_timeout",
RPCError::UnsupportedProtocol => "rpc_unsupported_protocol",
RPCError::Disconnected => "rpc_disconnected",
RPCError::InternalError(_) => "rpc_internal_error",
RPCError::ErrorResponse(code, _) => match code {
RpcErrorResponse::Unknown => "rpc_unknown_status",
RpcErrorResponse::ResourceUnavailable => "rpc_resource_unavailable",
RpcErrorResponse::ServerError => "rpc_server_error",
RpcErrorResponse::InvalidRequest => "rpc_invalid_request",
RpcErrorResponse::RateLimited => "rpc_rate_limited",
RpcErrorResponse::BlobsNotFoundForBlock => "rpc_blobs_not_found",
},
}
}

/// Returns a stable `'static` variant name for a [`GoodbyeReason`] suitable
/// for JSON serialization on the `/lighthouse/peers` HTTP endpoint.
pub(crate) fn goodbye_reason_name(reason: &GoodbyeReason) -> &'static str {
match reason {
GoodbyeReason::ClientShutdown => "ClientShutdown",
GoodbyeReason::IrrelevantNetwork => "IrrelevantNetwork",
GoodbyeReason::Fault => "Fault",
GoodbyeReason::UnableToVerifyNetwork => "UnableToVerifyNetwork",
GoodbyeReason::TooManyPeers => "TooManyPeers",
GoodbyeReason::BadScore => "BadScore",
GoodbyeReason::Banned => "Banned",
GoodbyeReason::BannedIP => "BannedIP",
GoodbyeReason::Unknown => "Unknown",
}
}

impl<E: EthSpec> PeerManager<E> {
// NOTE: Must be run inside a tokio executor.
pub fn new(
Expand Down Expand Up @@ -214,12 +257,24 @@ impl<E: EthSpec> PeerManager<E> {
///
/// This will send a goodbye and disconnect the peer if it is connected or dialing.
pub fn goodbye_peer(&mut self, peer_id: &PeerId, reason: GoodbyeReason, source: ReportSource) {
// Capture the goodbye details for the `/lighthouse/peers` API before
// `reason` is consumed by `report_peer` below.
let reason_name = goodbye_reason_name(&reason);
let reason_code: u64 = reason.clone().into();
let last_disconnect = LastDisconnect {
reason: reason_name,
code: reason_code,
direction: DisconnectDirection::Sent,
at: Instant::now(),
};

// Update the sync status if required
if let Some(info) = self.network_globals.peers.write().peer_info_mut(peer_id) {
debug!(%peer_id, %reason, score = %info.score(), "Sending goodbye to peer");
if matches!(reason, GoodbyeReason::IrrelevantNetwork) {
info.update_sync_status(SyncStatus::IrrelevantPeer);
}
info.set_last_disconnect(last_disconnect);
}

self.report_peer(
Expand Down Expand Up @@ -664,13 +719,8 @@ impl<E: EthSpec> PeerManager<E> {
RPCError::Disconnected => return, // No penalty for a graceful disconnection
};

self.report_peer(
peer_id,
peer_action,
ReportSource::RPC,
None,
"handle_rpc_error",
);
let msg = rpc_error_msg(err);
self.report_peer(peer_id, peer_action, ReportSource::RPC, None, msg);
}

/// A ping request has been received.
Expand Down
31 changes: 30 additions & 1 deletion beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use itertools::Itertools;
use logging::crit;
use network_utils::enr_ext::{EnrExt, peer_id_to_node_id};
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use score::{PeerAction, ReportSource, Score, ScoreState};
use score::{LastAction, PeerAction, ReportSource, Score, ScoreState};
use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
Expand Down Expand Up @@ -122,6 +122,21 @@ impl<E: EthSpec> PeerDB<E> {
self.peers.get_mut(peer_id)
}

/// Records the most recent disconnect event for a peer.
///
/// Exposed to the wider crate (e.g. the RPC service layer when receiving
/// a `Goodbye` request) so the `/lighthouse/peers` HTTP endpoint can
/// show *why* a peer disconnected. No-op if the peer is unknown.
pub(crate) fn record_last_disconnect(
&mut self,
peer_id: &PeerId,
last_disconnect: score::LastDisconnect,
) {
if let Some(info) = self.peers.get_mut(peer_id) {
info.set_last_disconnect(last_disconnect);
}
}

/// Returns if the peer is already connected.
pub fn is_connected(&self, peer_id: &PeerId) -> bool {
matches!(
Expand Down Expand Up @@ -643,7 +658,21 @@ impl<E: EthSpec> PeerDB<E> {
match self.peers.get_mut(peer_id) {
Some(info) => {
let previous_state = info.score_state();
let pre_score = info.score().score();
info.apply_peer_action_to_score(action);
let post_score = info.score().score();
// Record the most recent score-affecting event so the HTTP
// API can expose *why* a peer's score moved. Trusted peers
// are skipped because their score is never mutated.
if !info.is_trusted {
info.set_last_action(LastAction {
reason: msg,
source,
action,
delta: post_score - pre_score,
at: Instant::now(),
});
}
metrics::inc_counter_vec(
&metrics::PEER_ACTION_EVENTS_PER_CLIENT,
&[info.client().kind.as_ref(), action.as_ref(), source.into()],
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::client::Client;
use super::score::{PeerAction, Score, ScoreState};
use super::score::{LastAction, LastDisconnect, PeerAction, Score, ScoreState};
use super::sync_status::SyncStatus;
use crate::discovery::Eth2Enr;
use crate::{rpc::MetaData, types::Subnet};
Expand Down Expand Up @@ -57,6 +57,19 @@ pub struct PeerInfo<E: EthSpec> {
connection_direction: Option<ConnectionDirection>,
/// The enr of the peer, if known.
enr: Option<Enr>,
/// The most recent score-affecting event for this peer (if any).
///
/// Exposed via the `/lighthouse/peers` HTTP endpoint so external tooling
/// can correlate score changes with their cause. Skipped from JSON when
/// the peer has never been scored.
#[serde(skip_serializing_if = "Option::is_none")]
last_action: Option<LastAction>,
/// The most recent disconnect event for this peer (if any).
///
/// Records the goodbye reason and direction (sent/received). Skipped from
/// JSON when the peer has never disconnected.
#[serde(skip_serializing_if = "Option::is_none")]
last_disconnect: Option<LastDisconnect>,
}

impl<E: EthSpec> Default for PeerInfo<E> {
Expand All @@ -75,6 +88,8 @@ impl<E: EthSpec> Default for PeerInfo<E> {
is_trusted: false,
connection_direction: None,
enr: None,
last_action: None,
last_disconnect: None,
}
}
}
Expand Down Expand Up @@ -457,6 +472,29 @@ impl<E: EthSpec> PeerInfo<E> {
}
}

/// Returns the most recent score-affecting event recorded for this peer.
pub fn last_action(&self) -> Option<&LastAction> {
self.last_action.as_ref()
}

/// Returns the most recent disconnect event recorded for this peer.
pub fn last_disconnect(&self) -> Option<&LastDisconnect> {
self.last_disconnect.as_ref()
}

/// Records the most recent score-affecting event for this peer.
// VISIBILITY: The peer manager populates this from `report_peer`.
pub(in crate::peer_manager) fn set_last_action(&mut self, last_action: LastAction) {
self.last_action = Some(last_action);
}

/// Records the most recent disconnect event for this peer.
// VISIBILITY: Populated from goodbye send/receive paths in both the peer
// manager and the service layer.
pub(crate) fn set_last_disconnect(&mut self, last_disconnect: LastDisconnect) {
self.last_disconnect = Some(last_disconnect);
}

/// Updates the gossipsub score with a new score. Optionally ignore the gossipsub score.
pub(super) fn update_gossipsub_score(&mut self, new_score: f64, ignore: bool) {
self.score.update_gossipsub_score(new_score, ignore);
Expand Down
Loading