Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 42 additions & 17 deletions node/bft/events/src/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,34 @@

use super::*;

use std::io;
use tracing::warn;

/// The reason behind the node disconnecting from a peer.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
#[repr(u8)]
pub enum DisconnectReason {
/// The peer's challenge response is invalid.
InvalidChallengeResponse,
InvalidChallengeResponse = 0,
/// No reason given.
NoReasonGiven,
NoReasonGiven = 1,
/// The peer is not following the protocol.
ProtocolViolation,
ProtocolViolation = 2,
/// The peer's client is outdated, judging by its version.
OutdatedClientVersion,
OutdatedClientVersion = 3,
/// The disconnect reason is not known. This is used for when the peers sends a disconnect reason that is not known to us.
UnknownReason = u8::MAX,
}

impl std::fmt::Display for DisconnectReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::InvalidChallengeResponse => write!(f, "invalid challenge response"),
Self::NoReasonGiven => write!(f, "no reason given"),
Self::ProtocolViolation => write!(f, "protocol violation"),
Self::OutdatedClientVersion => write!(f, "outdated client version"),
Self::UnknownReason => write!(f, "unknown"),
}
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
Expand All @@ -52,19 +66,30 @@ impl EventTrait for Disconnect {

impl ToBytes for Disconnect {
fn write_le<W: Write>(&self, mut writer: W) -> IoResult<()> {
(self.reason as u8).write_le(&mut writer)?;
Ok(())
if self.reason == DisconnectReason::UnknownReason {
return Err(io_error("Cannot serialize unknown disconnect reason"));
}

(self.reason as u8).write_le(&mut writer)
}
}

impl FromBytes for Disconnect {
fn read_le<R: Read>(mut reader: R) -> IoResult<Self> {
let reason = match u8::read_le(&mut reader) {
Ok(0) => DisconnectReason::InvalidChallengeResponse,
Ok(1) => DisconnectReason::NoReasonGiven,
Ok(2) => DisconnectReason::ProtocolViolation,
Ok(3) => DisconnectReason::OutdatedClientVersion,
_ => return Err(io::Error::other("Invalid 'Disconnect' event")),
let index = match u8::read_le(&mut reader) {
Ok(index) => index,
Err(err) => return Err(io_error(format!("Failed to deserialize disconnect reason: {err}"))),
};

let reason = match index {
0 => DisconnectReason::InvalidChallengeResponse,
1 => DisconnectReason::NoReasonGiven,
2 => DisconnectReason::ProtocolViolation,
3 => DisconnectReason::OutdatedClientVersion,
val => {
warn!("received unknown disconnect reason (id={val})");
DisconnectReason::UnknownReason
}
};

Ok(Self { reason })
Expand Down Expand Up @@ -99,10 +124,10 @@ mod tests {
}

#[test]
#[should_panic(expected = "Invalid 'Disconnect' event")]
fn deserializing_invalid_data_panics() {
fn deserialize_unknown_reason() {
let mut buf = BytesMut::default().writer();
"not a DisconnectReason-value".as_bytes().write_le(&mut buf).unwrap();
let _disconnect = Disconnect::read_le(buf.into_inner().reader()).unwrap();
51u8.to_le_bytes().write_le(&mut buf).unwrap();
let disconnect = Disconnect::read_le(buf.into_inner().reader()).unwrap();
assert_eq!(disconnect.reason, DisconnectReason::UnknownReason);
}
}
17 changes: 8 additions & 9 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
MAX_BATCH_DELAY_IN_MS,
MEMORY_POOL_PORT,
Worker,
events::{EventCodec, PrimaryPing},
events::{Disconnect as DisconnectEvent, DisconnectReason, EventCodec, PrimaryPing},
helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
spawn_blocking,
};
Expand All @@ -34,7 +34,6 @@ use snarkos_node_bft_events::{
ChallengeRequest,
ChallengeResponse,
DataBlocks,
DisconnectReason,
Event,
EventTrait,
TransmissionRequest,
Expand Down Expand Up @@ -684,7 +683,7 @@ impl<N: Network> Gateway<N> {
}
Event::Disconnect(message) => {
// The peer informs us that they had disconnected. Disconnect from them too.
debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
self.disconnect(peer_ip);
Ok(false)
}
Expand Down Expand Up @@ -1356,8 +1355,8 @@ macro_rules! expect_event {
data
}
// Received a disconnect event, abort.
Some(Event::Disconnect(reason)) => {
return Err(error(format!("{CONTEXT} '{}' disconnected: {reason:?}", $peer_addr)));
Some(Event::Disconnect(DisconnectEvent { reason })) => {
return Err(error(format!("{CONTEXT} '{}' disconnected: {reason}", $peer_addr)));
}
// Received an unexpected event, abort.
Some(ty) => {
Expand Down Expand Up @@ -1429,7 +1428,7 @@ impl<N: Network> Gateway<N> {
.await
{
send_event(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
Expand All @@ -1438,7 +1437,7 @@ impl<N: Network> Gateway<N> {
// The Aleo address is already connected; no reason to return an error.
return Ok(None);
} else {
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}
}

Expand Down Expand Up @@ -1500,7 +1499,7 @@ impl<N: Network> Gateway<N> {
// The Aleo address is already connected; no reason to return an error.
return Ok(None);
} else {
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}
}

Expand Down Expand Up @@ -1536,7 +1535,7 @@ impl<N: Network> Gateway<N> {
.await
{
send_event(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}

Ok(Some(peer_request))
Expand Down
8 changes: 0 additions & 8 deletions node/router/messages/src/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,4 @@ mod tests {
assert_eq!(reason, &disconnect.reason);
}
}

#[test]
#[should_panic]
fn disconnect_invalid_data_panics() {
let mut buf = BytesMut::default().writer();
"not a DisconnectReason-value".as_bytes().write_le(&mut buf).unwrap();
let _disconnect = Disconnect::read_le(buf.into_inner().reader()).unwrap();
}
}
72 changes: 54 additions & 18 deletions node/router/messages/src/helpers/disconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use snarkvm::prelude::{FromBytes, ToBytes, error};
use snarkvm::prelude::{FromBytes, ToBytes, io_error};

use std::io;

Expand Down Expand Up @@ -50,6 +50,8 @@ pub enum DisconnectReason {
YouNeedToSyncFirst,
/// The peer's listening port is closed.
YourPortIsClosed(u16),
/// The disconnect reason is not known. This is used for when the peers sends a disconnect reason that is not known to us.
UnknownReason,
}

impl ToBytes for DisconnectReason {
Expand All @@ -73,32 +75,66 @@ impl ToBytes for DisconnectReason {
14u8.write_le(&mut writer)?;
port.write_le(writer)
}
Self::UnknownReason => Err(io_error("Cannot serialize unknown disconnect reason")),
}
}
}

impl FromBytes for DisconnectReason {
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
match u8::read_le(&mut reader)? {
0 => Ok(Self::ExceededForkRange),
1 => Ok(Self::InvalidChallengeResponse),
2 => Ok(Self::InvalidForkDepth),
3 => Ok(Self::INeedToSyncFirst),
4 => Ok(Self::NoReasonGiven),
5 => Ok(Self::ProtocolViolation),
6 => Ok(Self::OutdatedClientVersion),
7 => Ok(Self::PeerHasDisconnected),
8 => Ok(Self::PeerRefresh),
9 => Ok(Self::ShuttingDown),
10 => Ok(Self::SyncComplete),
11 => Ok(Self::TooManyFailures),
12 => Ok(Self::TooManyPeers),
13 => Ok(Self::YouNeedToSyncFirst),
let index = match u8::read_le(&mut reader) {
Ok(index) => index,
Err(err) => return Err(io_error(format!("Failed to deserialize disconnect reason: {err}"))),
};

let reason = match index {
0 => Self::ExceededForkRange,
1 => Self::InvalidChallengeResponse,
2 => Self::InvalidForkDepth,
3 => Self::INeedToSyncFirst,
4 => Self::NoReasonGiven,
5 => Self::ProtocolViolation,
6 => Self::OutdatedClientVersion,
7 => Self::PeerHasDisconnected,
8 => Self::PeerRefresh,
9 => Self::ShuttingDown,
10 => Self::SyncComplete,
11 => Self::TooManyFailures,
12 => Self::TooManyPeers,
13 => Self::YouNeedToSyncFirst,
14 => {
let port = u16::read_le(reader)?;
Ok(Self::YourPortIsClosed(port))
Self::YourPortIsClosed(port)
}
val => {
warn!("Received unknown disconnect reason (id={val})");
Self::UnknownReason
}
_ => Err(error("Invalid disconnect reason")),
};

Ok(reason)
}
}

impl std::fmt::Display for DisconnectReason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ExceededForkRange => write!(f, "exceeded fork range"),
Self::InvalidChallengeResponse => write!(f, "invalid challenge response"),
Self::InvalidForkDepth => write!(f, "invalid fork depth"),
Self::INeedToSyncFirst => write!(f, "I need to sync first"),
Self::NoReasonGiven => write!(f, "no reason given"),
Self::ProtocolViolation => write!(f, "protocol violation"),
Self::OutdatedClientVersion => write!(f, "outdated client version"),
Self::PeerHasDisconnected => write!(f, "peer has disconnected"),
Self::PeerRefresh => write!(f, "periodic peer refresh"),
Self::ShuttingDown => write!(f, "shutting down"),
Self::SyncComplete => write!(f, "block sync complete"),
Self::TooManyFailures => write!(f, "too many failures"),
Self::TooManyPeers => write!(f, "too many peers"),
Self::YouNeedToSyncFirst => write!(f, "you need to sync first"),
Self::YourPortIsClosed(port) => write!(f, "your port is closed ({port})"),
Self::UnknownReason => write!(f, "unknown reason"),
}
}
}
28 changes: 15 additions & 13 deletions node/router/src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use snarkos_node_network::log_repo_sha_comparison;
use snarkos_node_tcp::{ConnectionSide, P2P, Tcp};
use snarkvm::{
ledger::narwhal::Data,
prelude::{Address, Field, Network, block::Header, error},
prelude::{Address, Field, Network, block::Header, error, io_error},
};

use anyhow::{Result, bail};
Expand All @@ -45,35 +45,37 @@ impl<N: Network> P2P for Router<N> {
/// A macro unwrapping the expected handshake message or returning an error for unexpected messages.
#[macro_export]
macro_rules! expect_message {
($msg_ty:path, $framed:expr, $peer_addr:expr) => {
($msg_ty:path, $framed:expr, $peer_addr:expr) => {{
use snarkvm::utilities::io_error;

match $framed.try_next().await? {
// Received the expected message, proceed.
Some($msg_ty(data)) => {
trace!("Received '{}' from '{}'", data.name(), $peer_addr);
data
}
// Received a disconnect message, abort.
Some(Message::Disconnect(reason)) => {
return Err(error(format!("'{}' disconnected: {reason:?}", $peer_addr)))
Some(Message::Disconnect($crate::messages::Disconnect { reason })) => {
return Err(io_error(format!("'{}' disconnected: {reason}", $peer_addr)));
}
// Received an unexpected message, abort.
Some(ty) => {
return Err(error(format!(
return Err(io_error(format!(
"'{}' did not follow the handshake protocol: received {:?} instead of {}",
$peer_addr,
ty.name(),
stringify!($msg_ty),
)))
)));
}
// Received nothing.
None => {
return Err(error(format!(
return Err(io_error(format!(
"the peer disconnected before sending {:?}, likely due to peer saturation or shutdown",
stringify!($msg_ty),
)))
)));
}
}
};
}};
}

/// Send the given message to the peer.
Expand Down Expand Up @@ -217,12 +219,12 @@ impl<N: Network> Router<N> {
.await
{
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}

/* Step 3: Send the challenge response. */
Expand Down Expand Up @@ -280,7 +282,7 @@ impl<N: Network> Router<N> {
// Verify the challenge request. If a disconnect reason was returned, send the disconnect message and abort.
if let Some(reason) = self.verify_challenge_request(peer_addr, &peer_request) {
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}

/* Step 2: Send the challenge response followed by own challenge request. */
Expand Down Expand Up @@ -327,7 +329,7 @@ impl<N: Network> Router<N> {
.await
{
send(&mut framed, peer_addr, reason.into()).await?;
return Err(error(format!("Dropped '{peer_addr}' for reason: {reason:?}")));
return Err(io_error(format!("Dropped '{peer_addr}' for reason: {reason}")));
}

Ok(Some(peer_request))
Expand Down
2 changes: 1 addition & 1 deletion node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
}
Message::Disconnect(message) => {
// The peer informs us that they had disconnected. Disconnect from them too.
debug!("Peer '{peer_ip}' decided to disconnect due to '{:?}'", message.reason);
debug!("Peer '{peer_ip}' decided to disconnect due to '{}'", message.reason);
self.router().disconnect(peer_ip);
Ok(false)
}
Expand Down
2 changes: 1 addition & 1 deletion node/src/client/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Client<N, C> {

/// Saves the latest epoch hash and latest block header in the node.
fn puzzle_response(&self, peer_ip: SocketAddr, _epoch_hash: N::BlockHash, _header: Header<N>) -> bool {
debug!("Disconnecting '{peer_ip}' for the following reason - {:?}", DisconnectReason::ProtocolViolation);
debug!("Disconnecting '{peer_ip}' for the following reason - {}", DisconnectReason::ProtocolViolation);
false
}

Expand Down
Loading