Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ pub struct Start {
#[clap(long, requires = "validator")]
pub bft: Option<SocketAddr>,

/// Set the IP address and port used for providing block synchronization.
#[clap(long = "sync-listener")]
pub sync_listener: Option<SocketAddr>,

/// Specify the host:port address pairs of the peer(s) to connect to (as a comma-separated list).
///
/// These peers will be set as "trusted", which means the node will not disconnect from them when performing peer rotation.
Expand Down Expand Up @@ -714,6 +718,9 @@ impl Start {
// Parse the node IP or use the default IP/port.
let node_ip = self.node.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_NODE_PORT)));

// Parse the sync listener.
let sync_listener = self.sync_listener.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 6130)));

// Parse the REST IP.
let rest_ip = match self.norest {
true => None,
Expand Down Expand Up @@ -846,7 +853,7 @@ impl Start {

// Initialize the node.
let node = match node_type {
NodeType::Validator => Node::new_validator(node_ip, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await,
NodeType::Validator => Node::new_validator(node_ip, sync_listener, self.bft, rest_ip, self.rest_rps, account, &trusted_peers, &trusted_validators, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), dev_txs, self.dev, signal_handler.clone()).await,
NodeType::Prover => Node::new_prover(node_ip, account, &trusted_peers, genesis, node_data_dir, self.trusted_peers_only, self.dev, signal_handler.clone()).await,
NodeType::Client => Node::new_client(node_ip, rest_ip, self.rest_rps, account, &trusted_peers, genesis, cdn, storage_mode, node_data_dir, self.trusted_peers_only, self.auto_db_checkpoints.clone(), self.dev, signal_handler.clone()).await,
NodeType::BootstrapClient => Node::new_bootstrap_client(node_ip, account, *genesis.header(), self.dev).await,
Expand Down
3 changes: 3 additions & 0 deletions node/bft/events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ features = [ "serde", "rayon" ]
[dependencies.serde]
workspace = true

[dependencies.snarkos-node-network]
workspace = true

[dependencies.snarkos-node-sync-locators]
workspace = true

Expand Down
85 changes: 67 additions & 18 deletions node/bft/events/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,6 @@ pub use batch_propose::BatchPropose;
mod batch_signature;
pub use batch_signature::BatchSignature;

mod block_request;
pub use block_request::BlockRequest;

mod block_response;
pub use block_response::{BlockResponse, DataBlocks};

mod certificate_request;
pub use certificate_request::CertificateRequest;

mod certificate_response;
pub use certificate_response::CertificateResponse;

mod challenge_request;
pub use challenge_request::ChallengeRequest;

Expand Down Expand Up @@ -69,17 +57,22 @@ pub use worker_ping::WorkerPing;
#[cfg(any(test, feature = "test-helpers"))]
pub mod committee_prop_tests;

pub use snarkos_node_network::{
BlockRequest,
BlockResponse,
CertificateRequest,
CertificateResponse,
SyncResponse,
SyncToken,
};

use snarkos_node_sync_locators::BlockLocators;
use snarkvm::{
console::prelude::{FromBytes, Network, Read, ToBytes, Write, error, io_error},
ledger::{
block::Block,
narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
},
ledger::narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
prelude::{Address, Field, Signature},
};

use anyhow::{Result, bail, ensure};
use indexmap::{IndexMap, IndexSet};
use serde::{Deserialize, Serialize};
pub use std::io::{self, Result as IoResult};
Expand All @@ -90,6 +83,52 @@ pub trait EventTrait: ToBytes + FromBytes {
fn name(&self) -> Cow<'static, str>;
}

// TODO: remove once the compatibility layer for Gateway-based sync is gone
impl EventTrait for BlockRequest {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
let start = self.start_height;
let end = self.end_height;
match start + 1 == end {
true => format!("BlockRequest {start}"),
false => format!("BlockRequest {start}..{end}"),
}
.into()
}
}

// TODO: remove once the compatibility layer for Gateway-based sync is gone
impl<N: Network> EventTrait for BlockResponse<N> {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
let start = self.request.start_height;
let end = self.request.end_height;
match start + 1 == end {
true => format!("BlockResponse {start}"),
false => format!("BlockResponse {start}..{end}"),
}
.into()
}
}

impl<N: Network> EventTrait for CertificateRequest<N> {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
"CertificateRequest".into()
}
}

impl<N: Network> EventTrait for CertificateResponse<N> {
/// Returns the event name.
#[inline]
fn name(&self) -> Cow<'static, str> {
"CertificateResponse".into()
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
// TODO (howardwu): For mainnet - Remove this clippy lint. The CertificateResponse should not
// be a large enum variant, after removing the versioning.
Expand All @@ -111,6 +150,8 @@ pub enum Event<N: Network> {
ValidatorsRequest(ValidatorsRequest),
ValidatorsResponse(ValidatorsResponse<N>),
WorkerPing(WorkerPing<N>),
SyncRequest(BlockRequest),
SyncResponse(SyncResponse),
}

impl<N: Network> From<DisconnectReason> for Event<N> {
Expand Down Expand Up @@ -143,6 +184,8 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(event) => event.name(),
Self::ValidatorsResponse(event) => event.name(),
Self::WorkerPing(event) => event.name(),
Self::SyncRequest(event) => event.name(),
Self::SyncResponse(event) => event.to_string().into(),
}
}

Expand All @@ -166,6 +209,8 @@ impl<N: Network> Event<N> {
Self::ValidatorsRequest(..) => 13,
Self::ValidatorsResponse(..) => 14,
Self::WorkerPing(..) => 15,
Self::SyncRequest(..) => 16,
Self::SyncResponse(..) => 17,
}
}
}
Expand All @@ -191,6 +236,8 @@ impl<N: Network> ToBytes for Event<N> {
Self::ValidatorsRequest(event) => event.write_le(writer),
Self::ValidatorsResponse(event) => event.write_le(writer),
Self::WorkerPing(event) => event.write_le(writer),
Self::SyncRequest(event) => event.write_le(writer),
Self::SyncResponse(event) => event.write_le(writer),
}
}
}
Expand Down Expand Up @@ -218,7 +265,9 @@ impl<N: Network> FromBytes for Event<N> {
13 => Self::ValidatorsRequest(ValidatorsRequest::read_le(&mut reader)?),
14 => Self::ValidatorsResponse(ValidatorsResponse::read_le(&mut reader)?),
15 => Self::WorkerPing(WorkerPing::read_le(&mut reader)?),
16.. => return Err(error(format!("Unknown event ID {id}"))),
16 => Self::SyncRequest(BlockRequest::read_le(&mut reader)?),
17 => Self::SyncResponse(SyncResponse::read_le(&mut reader)?),
18.. => return Err(error(format!("Unknown event ID {id}"))),
};

// Ensure that there are no "dangling" bytes.
Expand Down
2 changes: 1 addition & 1 deletion node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl<'a, N: Network, C: ConsensusStorage<N>> LedgerUpdateService<N> for LedgerUp
metrics::update_block_metrics(block);
}

tracing::info!("Advanced to block {} at round {} - {}\n", block.height(), block.round(), block.hash());
tracing::info!("Advanced to block {} at round {} - {}", block.height(), block.round(), block.hash());
Ok(())
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/bft/src/bft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl<N: Network> BFT<N> {
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
block_sync: Arc<BlockSync<N>>,
sync_listener: SocketAddr,
ip: Option<SocketAddr>,
trusted_validators: &[SocketAddr],
trusted_peers_only: bool,
Expand All @@ -88,6 +89,7 @@ impl<N: Network> BFT<N> {
storage,
ledger,
block_sync,
sync_listener,
ip,
trusted_validators,
trusted_peers_only,
Expand Down
73 changes: 65 additions & 8 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ use crate::{
MEMORY_POOL_PORT,
Worker,
events::{DisconnectReason, EventCodec, PrimaryPing},
helpers::{Cache, PrimarySender, Storage, SyncSender, WorkerSender, assign_to_worker},
helpers::{Cache, PrimarySender, Storage, WorkerSender, assign_to_worker},
spawn_blocking,
};
use smol_str::SmolStr;
use snarkos_account::Account;
use snarkos_node_bft_events::{
BlockRequest,
BlockResponse,
CertificateRequest,
CertificateResponse,
ChallengeRequest,
ChallengeResponse,
DataBlocks,
Event,
EventTrait,
TransmissionRequest,
Expand All @@ -43,17 +40,27 @@ use snarkos_node_bft_events::{
};
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_network::{
BlockRequest,
BlockResponse,
ConnectionMode,
DataBlocks,
NodeType,
Peer,
PeerPoolHandling,
Resolver,
SyncResponse,
bootstrap_peers,
get_repo_commit_hash,
log_repo_sha_comparison,
shorten_snarkos_sha,
};
use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService};
use snarkos_node_sync::{
MAX_BLOCKS_BEHIND,
SYNC_STREAM_TOKEN_LIFETIME,
SyncSender,
SyncStreams,
communication_service::CommunicationService,
};
use snarkos_node_tcp::{
Config,
ConnectError,
Expand Down Expand Up @@ -161,6 +168,8 @@ pub struct InnerGateway<N: Network> {
worker_senders: OnceCell<IndexMap<u8, WorkerSender<N>>>,
/// The sync sender.
sync_sender: OnceCell<SyncSender<N>>,
/// The handler for sync streams.
sync_streams: SyncStreams<N>,
/// The spawned handles.
handles: Mutex<Vec<JoinHandle<()>>>,
/// The storage mode.
Expand Down Expand Up @@ -205,6 +214,7 @@ impl<N: Network> Gateway<N> {
storage: Storage<N>,
ledger: Arc<dyn LedgerService<N>>,
ip: Option<SocketAddr>,
sync_listener: SocketAddr,
trusted_validators: &[SocketAddr],
trusted_peers_only: bool,
node_data_dir: NodeDataDir,
Expand Down Expand Up @@ -240,6 +250,9 @@ impl<N: Network> Gateway<N> {
// some of the cached validators to trusted ones.
initial_peers.extend(trusted_validators.iter().copied().map(|addr| (addr, Peer::new_candidate(addr, true))));

// TODO: pick addr from CLI
let sync_streams = SyncStreams::new(sync_listener, ledger.clone());

// Return the gateway.
Ok(Self(Arc::new(InnerGateway {
account,
Expand All @@ -254,6 +267,7 @@ impl<N: Network> Gateway<N> {
primary_sender: Default::default(),
worker_senders: Default::default(),
sync_sender: Default::default(),
sync_streams,
handles: Default::default(),
node_data_dir,
trusted_peers_only,
Expand All @@ -278,7 +292,8 @@ impl<N: Network> Gateway<N> {

// If the sync sender was provided, set the sync sender.
if let Some(sync_sender) = sync_sender {
self.sync_sender.set(sync_sender).expect("Sync sender already set in gateway");
self.sync_sender.set(sync_sender.clone()).expect("Sync sender already set in gateway");
self.sync_streams.set_sync_sender(sync_sender);
}

// Enable the TCP protocols.
Expand All @@ -288,6 +303,8 @@ impl<N: Network> Gateway<N> {
self.enable_disconnect().await;
self.enable_on_connect().await;

self.sync_streams.enable().await;

// Spawn a loop for periodic metrics.
#[cfg(feature = "metrics")]
{
Expand Down Expand Up @@ -349,7 +366,7 @@ impl<N: Network> CommunicationService for Gateway<N> {
/// Prepares a block request to be sent.
fn prepare_block_request(start_height: u32, end_height: u32) -> Self::Message {
debug_assert!(start_height < end_height, "Invalid block request format");
Event::BlockRequest(BlockRequest { start_height, end_height })
Event::SyncRequest(BlockRequest { start_height, end_height })
}

/// Sends the given message to specified peer.
Expand Down Expand Up @@ -578,7 +595,7 @@ impl<N: Network> Gateway<N> {
return Ok(true);
}
}
Event::BlockRequest(_) => {
Event::BlockRequest(_) | Event::SyncRequest(_) => {
let num_events = self.cache.insert_inbound_block_request(peer_ip, CACHE_REQUESTS_INTERVAL);
if num_events >= self.max_cache_duplicates() {
return Ok(true);
Expand Down Expand Up @@ -852,6 +869,46 @@ impl<N: Network> Gateway<N> {
}
Ok(true)
}
Event::SyncRequest(request) => {
let self_ = self.clone();
tokio::spawn(async move {
// Prepare a response with the syncing stream address and the access token.
let sync_addr = self_.sync_streams.listener_addr();
let response = SyncResponse::new(sync_addr);

// Register the access token.
let token = response.token.clone();
self_.sync_streams.register_token_for_peer(token.clone(), request);
debug!("[SyncStreams] Activated a sync token for {peer_ip}");

// Send the response to the peer.
Transport::send(&self_, peer_ip, Event::SyncResponse(response)).await;

// Remove the access token after a short while.
tokio::time::sleep(SYNC_STREAM_TOKEN_LIFETIME).await;
self_.sync_streams.remove_token_for_peer(token);
debug!("[SyncStreams] Deactivated a sync token for {peer_ip}");
});

Ok(true)
}
Event::SyncResponse(SyncResponse { addr, token }) => {
let self_ = self.clone();
let _: JoinHandle<anyhow::Result<()>> = tokio::spawn(async move {
debug!("[SyncStreams] Got a sync token from {peer_ip}");

// Register the access token and the sync stream address.
self_.sync_streams.register_token_from_peer(addr, token);
// Connect to the dedicated sync stream.
self_.sync_streams.tcp().connect(addr).await?;

// The syncing continues in the dedicated stream.

Ok(())
});

Ok(true)
}
}
}

Expand Down
Loading