Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use std::{
sync::{Arc, atomic::AtomicBool},
};
use tokio::{
runtime::{self, Runtime},
runtime::{self, Handle, Runtime},
sync::mpsc,
task,
};
Expand Down Expand Up @@ -323,7 +323,9 @@ impl Start {
}

// Initialize the runtime.
Self::runtime().block_on(async move {
let runtime = Self::runtime();
let handle = runtime.handle().clone();
runtime.block_on(async move {
// Error messages.
let node_parse_error = || "Failed to start node";

Expand All @@ -332,9 +334,15 @@ impl Start {

// Parse the node arguments, start it, and block until shutdown.
match self_.network {
MainnetV0::ID => self_.parse_node::<MainnetV0>(log_receiver).await.with_context(node_parse_error)?,
TestnetV0::ID => self_.parse_node::<TestnetV0>(log_receiver).await.with_context(node_parse_error)?,
CanaryV0::ID => self_.parse_node::<CanaryV0>(log_receiver).await.with_context(node_parse_error)?,
MainnetV0::ID => {
self_.parse_node::<MainnetV0>(handle, log_receiver).await.with_context(node_parse_error)?
}
TestnetV0::ID => {
self_.parse_node::<TestnetV0>(handle, log_receiver).await.with_context(node_parse_error)?
}
CanaryV0::ID => {
self_.parse_node::<CanaryV0>(handle, log_receiver).await.with_context(node_parse_error)?
}
_ => panic!("Invalid network ID specified"),
};

Expand Down Expand Up @@ -665,7 +673,7 @@ impl Start {

/// Start the node and blocks until it terminates.
#[rustfmt::skip]
async fn parse_node<N: Network>(&mut self, log_receiver: mpsc::Receiver<Vec<u8>>) -> Result<()> {
async fn parse_node<N: Network>(&mut self, handle: Handle, log_receiver: mpsc::Receiver<Vec<u8>>) -> Result<()> {
if !self.nobanner {
// Print the welcome banner.
println!("{}", crate::helpers::welcome_message());
Expand Down Expand Up @@ -842,7 +850,7 @@ impl Start {
}

// Register the signal handler.
let signal_handler = SignalHandler::new();
let signal_handler = SignalHandler::new(Some(handle));

// Initialize the node.
let node = match node_type {
Expand Down
4 changes: 4 additions & 0 deletions node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,4 +474,8 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
}
}
}

fn is_stopped(&self) -> bool {
self.stoppable.is_stopped()
}
}
5 changes: 5 additions & 0 deletions node/bft/ledger-service/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,9 @@ impl<N: Network> LedgerService<N> for MockLedgerService<N> {
let height = N::CONSENSUS_HEIGHT(consensus_version).unwrap();
Ok(consensus_config_value!(N, TRANSACTION_SPEND_LIMIT, height).unwrap())
}

fn is_stopped(&self) -> bool {
// No ledger to check against.
false
}
}
5 changes: 5 additions & 0 deletions node/bft/ledger-service/src/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,9 @@ impl<N: Network> LedgerService<N> for ProverLedgerService<N> {
) -> Result<u64> {
bail!("Cannot compute spent_cost in prover")
}

fn is_stopped(&self) -> bool {
// No ledger to check against.
false
}
}
2 changes: 2 additions & 0 deletions node/bft/ledger-service/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,6 @@ pub trait LedgerService<N: Network>: std::fmt::Debug + Send + Sync {
transaction: &Transaction<N>,
consensus_version: ConsensusVersion,
) -> Result<u64>;

fn is_stopped(&self) -> bool;
}
4 changes: 4 additions & 0 deletions node/bft/ledger-service/src/translucent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,8 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for TranslucentLedgerS
) -> Result<u64> {
self.inner.transaction_spend_in_microcredits(transaction, consensus_version)
}

fn is_stopped(&self) -> bool {
self.inner.is_stopped()
}
}
2 changes: 2 additions & 0 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2015,6 +2015,8 @@ impl<N: Network> Primary<N> {
info!("Shutting down the primary...");
// Remove the callback.
self.primary_callback.clear();
// Shut down the sync service.
self.sync.shut_down().await;
// Shut down the workers.
self.workers().iter().for_each(|worker| worker.shut_down());
// Abort the tasks.
Expand Down
1 change: 1 addition & 0 deletions node/bft/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ mod tests {
fn check_block_subdag(&self, _block: Block<N>, _prefix: &[PendingBlock<N>]) -> Result<PendingBlock<N>, CheckBlockError<N>>;
fn begin_ledger_update<'a>(&'a self) -> Result<Box<dyn LedgerUpdateService<N> + 'a>, BeginLedgerUpdateError>;
fn transaction_spend_in_microcredits(&self, transaction: &Transaction<N>, consensus_version: ConsensusVersion) -> Result<u64>;
fn is_stopped(&self) -> bool;
}
}

Expand Down
5 changes: 5 additions & 0 deletions node/rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
pub const fn handles(&self) -> &Arc<Mutex<Vec<JoinHandle<()>>>> {
&self.handles
}

/// Shuts down the REST instance.
pub fn shut_down(&self) {
self.handles.lock().iter().for_each(|handle| handle.abort());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter as much but using drain(..) would also drop the handles when calling shut down.

}
}

impl<N: Network, C: ConsensusStorage<N>, R: Routing<N>> Rest<N, C, R> {
Expand Down
5 changes: 5 additions & 0 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ impl<N: Network> Router<N> {
&self.cache
}

/// Returns a reference to the ledger.
pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
&self.ledger
}

/// Returns `true` if the node is only engaging with trusted peers.
pub fn trusted_peers_only(&self) -> bool {
self.trusted_peers_only
Expand Down
9 changes: 9 additions & 0 deletions node/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,13 +544,22 @@ impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Client<N, C> {
// Shut down the node.
trace!("Shutting down the node...");

// Shut down the REST instance.
if let Some(rest) = &self.rest {
trace!("Shutting down the REST server...");
rest.shut_down();
}

// Abort the tasks.
trace!("Shutting down the client...");
self.handles.lock().iter().for_each(|handle| handle.abort());

// Shut down the router.
self.router.shut_down().await;

// Notify the Ping.
self.ping.stop();

info!("Node has shut down.");
}
}
3 changes: 3 additions & 0 deletions node/src/prover/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Prover<N, C> {
// Shut down the router.
self.router.shut_down().await;

// Notify the Ping.
self.ping.stop();

info!("Node has shut down.");
}
}
Expand Down
16 changes: 14 additions & 2 deletions node/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@

use snarkos_node_network::{NodeType, PeerPoolHandling};
use snarkos_node_router::Routing;

use snarkos_utilities::SignalHandler;

use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey};

use std::time::Duration;
use tokio::time::sleep;

#[async_trait]
pub trait NodeInterface<N: Network>: Routing<N> {
/// Returns the node type.
Expand Down Expand Up @@ -53,6 +54,17 @@ pub trait NodeInterface<N: Network>: Routing<N> {

// If the node is already initialized, then shut it down.
self.shut_down().await;

// Allow a bit of time for the tasks to wind down.
sleep(Duration::from_secs(1)).await;

// Check if there are any stragglers left.
if let Some(handle) = &handler.handle {
let live_tasks = handle.metrics().num_alive_tasks();
if live_tasks != 0 {
error!("There are still {live_tasks} live tasks");
}
}
}

/// Shuts down the node.
Expand Down
11 changes: 10 additions & 1 deletion node/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,12 @@ impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
// Shut down the node.
trace!("Shutting down the node...");

// Shut down the REST instance.
if let Some(rest) = &self.rest {
trace!("Shutting down the REST server...");
rest.shut_down();
}

// Abort the tasks.
trace!("Shutting down the validator...");
self.handles.lock().iter().for_each(|handle| handle.abort());
Expand All @@ -480,6 +486,9 @@ impl<N: Network, C: ConsensusStorage<N>> NodeInterface<N> for Validator<N, C> {
trace!("Shutting down consensus...");
self.consensus.shut_down().await;

// Notify the Ping.
self.ping.stop();

info!("Node has shut down.");
}
}
Expand Down Expand Up @@ -539,7 +548,7 @@ mod tests {
false,
dev_txs,
None,
SignalHandler::new(),
SignalHandler::new(None),
)
.await
.unwrap();
Expand Down
10 changes: 10 additions & 0 deletions node/sync/src/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,21 @@ impl<N: Network> Ping<N> {
self.notify.notify_one();
}

/// Wake up the ping task so that its inner loop iterates and breaks.
/// It should only be called after the ledger has already been stopped.
pub fn stop(&self) {
self.notify.notify_one();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just abort the ping task here?

}

/// Background task that periodically sends out new ping messages.
async fn ping_task(inner: &Mutex<PingInner<N>>, router: &Router<N>, notify: &Notify) {
let mut new_block = false;

loop {
if router.ledger().is_stopped() {
break;
Comment thread
vicsn marked this conversation as resolved.
}

// Do not hold the lock while waiting.
let sleep_time = {
let mut inner = inner.lock();
Expand Down
6 changes: 3 additions & 3 deletions node/tests/common/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn client() -> Client<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
NodeDataDir::new_test(None),
false, // Connect to untrusted peers.
None,
SignalHandler::new(),
SignalHandler::new(None),
)
.await
.expect("couldn't create client instance")
Expand All @@ -52,7 +52,7 @@ pub async fn prover() -> Prover<CurrentNetwork, ConsensusMemory<CurrentNetwork>>
NodeDataDir::new_test(None),
false,
None,
SignalHandler::new(),
SignalHandler::new(None),
)
.await
.expect("couldn't create prover instance")
Expand All @@ -74,7 +74,7 @@ pub async fn validator() -> Validator<CurrentNetwork, ConsensusMemory<CurrentNet
false, // This test requires validators to connect to peers.
false, // No dev traffic in production mode.
None,
SignalHandler::new(),
SignalHandler::new(None),
)
.await
.expect("couldn't create validator instance")
Expand Down
8 changes: 6 additions & 2 deletions utilities/src/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
};

use tokio::sync::oneshot;
use tokio::{runtime::Handle, sync::oneshot};

use tracing::{debug, error, trace};

Expand Down Expand Up @@ -71,15 +71,19 @@ pub struct SignalHandler {

/// This receiver is used to wait for the node to be stopped.
stopped_receiver: Mutex<Option<oneshot::Receiver<()>>>,

/// An optional tokio runtime handle.
pub handle: Option<Handle>,
}

impl SignalHandler {
/// Spawns a background tasks that listens for Ctrl+C and returns `Self`.
pub fn new() -> Arc<Self> {
pub fn new(handle: Option<Handle>) -> Arc<Self> {
let (stopped_sender, stopped_receiver) = oneshot::channel();
let obj = Arc::new(Self {
stopped_sender: RwLock::new(Some(stopped_sender)),
stopped_receiver: Mutex::new(Some(stopped_receiver)),
handle,
});

{
Expand Down