diff --git a/cli/src/commands/start.rs b/cli/src/commands/start.rs index 8e2e510b58..7dc7b12c01 100644 --- a/cli/src/commands/start.rs +++ b/cli/src/commands/start.rs @@ -64,7 +64,7 @@ use std::{ sync::{Arc, atomic::AtomicBool}, }; use tokio::{ - runtime::{self, Runtime}, + runtime::{self, Handle, Runtime}, sync::mpsc, task, }; @@ -323,7 +323,9 @@ impl Start { } // Initialize the runtime. - Self::runtime().block_on(async move { + let runtime = Self::runtime(); + let handle = runtime.handle().clone(); + runtime.block_on(async move { // Error messages. let node_parse_error = || "Failed to start node"; @@ -332,9 +334,15 @@ impl Start { // Parse the node arguments, start it, and block until shutdown. match self_.network { - MainnetV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, - TestnetV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, - CanaryV0::ID => self_.parse_node::(log_receiver).await.with_context(node_parse_error)?, + MainnetV0::ID => { + self_.parse_node::(handle, log_receiver).await.with_context(node_parse_error)? + } + TestnetV0::ID => { + self_.parse_node::(handle, log_receiver).await.with_context(node_parse_error)? + } + CanaryV0::ID => { + self_.parse_node::(handle, log_receiver).await.with_context(node_parse_error)? + } _ => panic!("Invalid network ID specified"), }; @@ -665,7 +673,7 @@ impl Start { /// Start the node and blocks until it terminates. #[rustfmt::skip] - async fn parse_node(&mut self, log_receiver: mpsc::Receiver>) -> Result<()> { + async fn parse_node(&mut self, handle: Handle, log_receiver: mpsc::Receiver>) -> Result<()> { if !self.nobanner { // Print the welcome banner. println!("{}", crate::helpers::welcome_message()); @@ -842,7 +850,7 @@ impl Start { } // Register the signal handler. - let signal_handler = SignalHandler::new(); + let signal_handler = SignalHandler::new(Some(handle)); // Initialize the node. let node = match node_type { diff --git a/node/bft/ledger-service/src/ledger.rs b/node/bft/ledger-service/src/ledger.rs index 405cafabc1..a2baa63931 100644 --- a/node/bft/ledger-service/src/ledger.rs +++ b/node/bft/ledger-service/src/ledger.rs @@ -474,4 +474,8 @@ impl> LedgerService for CoreLedgerService< } } } + + fn is_stopped(&self) -> bool { + self.stoppable.is_stopped() + } } diff --git a/node/bft/ledger-service/src/mock.rs b/node/bft/ledger-service/src/mock.rs index 70408704bd..c448de6205 100644 --- a/node/bft/ledger-service/src/mock.rs +++ b/node/bft/ledger-service/src/mock.rs @@ -235,4 +235,9 @@ impl LedgerService for MockLedgerService { let height = N::CONSENSUS_HEIGHT(consensus_version).unwrap(); Ok(consensus_config_value!(N, TRANSACTION_SPEND_LIMIT, height).unwrap()) } + + fn is_stopped(&self) -> bool { + // No ledger to check against. + false + } } diff --git a/node/bft/ledger-service/src/prover.rs b/node/bft/ledger-service/src/prover.rs index 4493ac365d..e579caccaa 100644 --- a/node/bft/ledger-service/src/prover.rs +++ b/node/bft/ledger-service/src/prover.rs @@ -190,4 +190,9 @@ impl LedgerService for ProverLedgerService { ) -> Result { bail!("Cannot compute spent_cost in prover") } + + fn is_stopped(&self) -> bool { + // No ledger to check against. + false + } } diff --git a/node/bft/ledger-service/src/traits.rs b/node/bft/ledger-service/src/traits.rs index 7ab4a238eb..6215cb25ca 100644 --- a/node/bft/ledger-service/src/traits.rs +++ b/node/bft/ledger-service/src/traits.rs @@ -160,4 +160,6 @@ pub trait LedgerService: std::fmt::Debug + Send + Sync { transaction: &Transaction, consensus_version: ConsensusVersion, ) -> Result; + + fn is_stopped(&self) -> bool; } diff --git a/node/bft/ledger-service/src/translucent.rs b/node/bft/ledger-service/src/translucent.rs index e2c5249730..c9c6f02fd0 100644 --- a/node/bft/ledger-service/src/translucent.rs +++ b/node/bft/ledger-service/src/translucent.rs @@ -200,4 +200,8 @@ impl> LedgerService for TranslucentLedgerS ) -> Result { self.inner.transaction_spend_in_microcredits(transaction, consensus_version) } + + fn is_stopped(&self) -> bool { + self.inner.is_stopped() + } } diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index eaa428802f..fea1522b57 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -2015,6 +2015,8 @@ impl Primary { info!("Shutting down the primary..."); // Remove the callback. self.primary_callback.clear(); + // Shut down the sync service. + self.sync.shut_down().await; // Shut down the workers. self.workers().iter().for_each(|worker| worker.shut_down()); // Abort the tasks. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index e20d7d9eeb..599c16d0b7 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -676,6 +676,7 @@ mod tests { fn check_block_subdag(&self, _block: Block, _prefix: &[PendingBlock]) -> Result, CheckBlockError>; fn begin_ledger_update<'a>(&'a self) -> Result + 'a>, BeginLedgerUpdateError>; fn transaction_spend_in_microcredits(&self, transaction: &Transaction, consensus_version: ConsensusVersion) -> Result; + fn is_stopped(&self) -> bool; } } diff --git a/node/rest/src/lib.rs b/node/rest/src/lib.rs index fb96cb492d..fa4bd1e2cd 100644 --- a/node/rest/src/lib.rs +++ b/node/rest/src/lib.rs @@ -131,6 +131,11 @@ impl, R: Routing> Rest { pub const fn handles(&self) -> &Arc>>> { &self.handles } + + /// Shuts down the REST instance. + pub fn shut_down(&self) { + self.handles.lock().iter().for_each(|handle| handle.abort()); + } } impl, R: Routing> Rest { diff --git a/node/router/src/lib.rs b/node/router/src/lib.rs index d527322c6a..98c9942cc8 100644 --- a/node/router/src/lib.rs +++ b/node/router/src/lib.rs @@ -238,6 +238,11 @@ impl Router { &self.cache } + /// Returns a reference to the ledger. + pub fn ledger(&self) -> &Arc> { + &self.ledger + } + /// Returns `true` if the node is only engaging with trusted peers. pub fn trusted_peers_only(&self) -> bool { self.trusted_peers_only diff --git a/node/src/client/mod.rs b/node/src/client/mod.rs index 0ee4b42350..26c312a5e0 100644 --- a/node/src/client/mod.rs +++ b/node/src/client/mod.rs @@ -544,6 +544,12 @@ impl> NodeInterface for Client { // Shut down the node. trace!("Shutting down the node..."); + // Shut down the REST instance. + if let Some(rest) = &self.rest { + trace!("Shutting down the REST server..."); + rest.shut_down(); + } + // Abort the tasks. trace!("Shutting down the client..."); self.handles.lock().iter().for_each(|handle| handle.abort()); diff --git a/node/src/traits.rs b/node/src/traits.rs index dd043b7f44..cbde673213 100644 --- a/node/src/traits.rs +++ b/node/src/traits.rs @@ -15,11 +15,12 @@ use snarkos_node_network::{NodeType, PeerPoolHandling}; use snarkos_node_router::Routing; - use snarkos_utilities::SignalHandler; - use snarkvm::prelude::{Address, Network, PrivateKey, ViewKey}; +use std::time::Duration; +use tokio::time::sleep; + #[async_trait] pub trait NodeInterface: Routing { /// Returns the node type. @@ -53,6 +54,17 @@ pub trait NodeInterface: Routing { // If the node is already initialized, then shut it down. self.shut_down().await; + + // Allow a bit of time for the tasks to wind down. + sleep(Duration::from_secs(1)).await; + + // Check if there are any stragglers left. + if let Some(handle) = &handler.handle { + let live_tasks = handle.metrics().num_alive_tasks(); + if live_tasks != 0 { + error!("There are still {live_tasks} live tasks"); + } + } } /// Shuts down the node. diff --git a/node/src/validator/mod.rs b/node/src/validator/mod.rs index fbbf78b13d..ff10bd5f65 100644 --- a/node/src/validator/mod.rs +++ b/node/src/validator/mod.rs @@ -469,6 +469,12 @@ impl> NodeInterface for Validator { // Shut down the node. trace!("Shutting down the node..."); + // Shut down the REST instance. + if let Some(rest) = &self.rest { + trace!("Shutting down the REST server..."); + rest.shut_down(); + } + // Abort the tasks. trace!("Shutting down the validator..."); self.handles.lock().iter().for_each(|handle| handle.abort()); @@ -539,7 +545,7 @@ mod tests { false, dev_txs, None, - SignalHandler::new(), + SignalHandler::new(None), ) .await .unwrap(); diff --git a/node/sync/src/ping.rs b/node/sync/src/ping.rs index 5049ea69b6..db78957869 100644 --- a/node/sync/src/ping.rs +++ b/node/sync/src/ping.rs @@ -73,11 +73,11 @@ impl Ping { { let inner = inner.clone(); - let router = router.clone(); + let router_ = router.clone(); let notify = notify.clone(); - tokio::spawn(async move { - Self::ping_task(&inner, &router, ¬ify).await; + router.spawn(async move { + Self::ping_task(&inner, &router_, ¬ify).await; }); } @@ -92,11 +92,11 @@ impl Ping { { let inner = inner.clone(); - let router = router.clone(); + let router_ = router.clone(); let notify = notify.clone(); - tokio::spawn(async move { - Self::ping_task(&inner, &router, ¬ify).await; + router.spawn(async move { + Self::ping_task(&inner, &router_, ¬ify).await; }); } @@ -135,6 +135,10 @@ impl Ping { let mut new_block = false; loop { + if router.ledger().is_stopped() { + break; + } + // Do not hold the lock while waiting. let sleep_time = { let mut inner = inner.lock(); diff --git a/node/tests/common/node.rs b/node/tests/common/node.rs index 44485fa03b..9a0fe3afc4 100644 --- a/node/tests/common/node.rs +++ b/node/tests/common/node.rs @@ -37,7 +37,7 @@ pub async fn client() -> Client> NodeDataDir::new_test(None), false, // Connect to untrusted peers. None, - SignalHandler::new(), + SignalHandler::new(None), ) .await .expect("couldn't create client instance") @@ -52,7 +52,7 @@ pub async fn prover() -> Prover> NodeDataDir::new_test(None), false, None, - SignalHandler::new(), + SignalHandler::new(None), ) .await .expect("couldn't create prover instance") @@ -74,7 +74,7 @@ pub async fn validator() -> Validator>>, + + /// An optional tokio runtime handle. + pub handle: Option, } impl SignalHandler { /// Spawns a background tasks that listens for Ctrl+C and returns `Self`. - pub fn new() -> Arc { + pub fn new(handle: Option) -> Arc { let (stopped_sender, stopped_receiver) = oneshot::channel(); let obj = Arc::new(Self { stopped_sender: RwLock::new(Some(stopped_sender)), stopped_receiver: Mutex::new(Some(stopped_receiver)), + handle, }); {