Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 107 additions & 93 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ default-features = false
[workspace.dependencies.snarkvm]
#path = "../snarkVM"
git = "https://github.com/ProvableHQ/snarkVM.git"
rev = "b176b1409"
#rev = "b176b1409"
#version = "=4.4.0"
branch = "feat/tracing-instrument"
#version = "=4.2.1"
default-features = false

[workspace.dependencies.anyhow]
Expand Down Expand Up @@ -295,6 +297,13 @@ test_targets = [ "snarkos-cli/test_targets" ]
test_consensus_heights = [ "snarkos-cli/test_consensus_heights" ]
test_network = [ "snarkos-cli/test_network" ]
tokio_console = [ "snarkos-cli/tokio_console" ]
instrumentation = [
"snarkos-node/instrumentation",
"snarkos-node-router/instrumentation",
"snarkos-node-sync/instrumentation",
"snarkos-node-cdn/instrumentation"
]
flamegraph = [ "instrumentation", "snarkos-cli/flamegraph" ]

[dependencies.clap]
workspace = true
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,8 @@ By default, the metrics feature is turned on for some internal crates.
This feature turns on code for detecting deadlocks.
* **test_targets** -
This feature allows the lowering of coinbase and proof targets for testing.
* **flamegraph** -
Enables generation of flamegraphs (using the [tracing-flame crate](https://docs.rs/tracing-flame/latest/tracing_flame/index.html)). This feature should only be used for profiling.

## 6.5 Local backups

Expand Down
15 changes: 14 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ test_network = [
"test_consensus_heights",
"snarkvm/dev-print"
]
serial = [ ]
flamegraph = [ "dep:tracing-flame" ]
serial = [
"snarkvm/serial",
"snarkos-node-metrics/serial",
"snarkos-node/serial"
]
tokio_console = [ "dep:console-subscriber", "tokio/tracing" ]

[dependencies.aleo-std]
Expand All @@ -55,6 +60,9 @@ workspace = true
[dependencies.base64]
workspace = true

[dependencies.cfg-if]
version = "1"

[dependencies.clap]
workspace = true
features = [ "derive", "color", "unstable-styles", "help", "cargo", "usage", "suggestions" ]
Expand All @@ -73,6 +81,10 @@ workspace = true
workspace = true
features = [ "serde", "rayon" ]

[dependencies.tracing-flame]
version = "0.2"
optional = true

[dependencies.locktick]
workspace = true
features = [ "parking_lot" ]
Expand Down Expand Up @@ -138,6 +150,7 @@ version = "3"

[dependencies.time]
workspace = true
features = [ "std", "formatting" ]

[dependencies.thiserror]
workspace = true
Expand Down
9 changes: 7 additions & 2 deletions cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,10 @@ pub struct Start {
#[clap(long)]
pub nocdn: bool,

#[clap(long, hide = true)]
/// Generate a flamegraph using `tracing`. Only works if the `flamegraph` feature is enabled.
pub enable_flamegraph: bool,

/// Enables development mode used to set up test networks.
///
/// The purpose of this flag is to run multiple nodes on the same machine and in the same working directory.
Expand Down Expand Up @@ -273,11 +277,12 @@ impl Start {
let shutdown: Arc<AtomicBool> = Default::default();

// Initialize the logger.
let log_receiver = crate::helpers::initialize_logger(
let (log_receiver, _log_guard) = crate::helpers::initialize_logger(
self.verbosity,
&self.log_filter,
self.log_filter.clone(),
self.nodisplay,
self.logfile.clone(),
self.enable_flamegraph,
shutdown.clone(),
)
.with_context(|| "Failed to set up logger")?;
Expand Down
69 changes: 55 additions & 14 deletions cli/src/helpers/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::helpers::{DynamicFormatter, LogWriter};

use anyhow::{Result, bail};

use anyhow::Context;
use crossterm::tty::IsTty;
use std::{
fs::File,
Expand All @@ -25,13 +26,17 @@ use std::{
str::FromStr,
sync::{Arc, atomic::AtomicBool},
};
use time::{UtcDateTime, macros::format_description};
use tokio::sync::mpsc;
use tracing_subscriber::{
EnvFilter,
layer::{Layer, SubscriberExt},
util::SubscriberInitExt,
};

#[cfg(feature = "flamegraph")]
use tracing_flame::FlameLayer;

fn parse_log_verbosity(verbosity: u8) -> Result<EnvFilter> {
// First, set default log verbosity.
// Note, that this must not be prefixed with `RUST_LOG=`.
Expand Down Expand Up @@ -105,7 +110,30 @@ fn parse_log_verbosity(verbosity: u8) -> Result<EnvFilter> {
}

fn parse_log_filter(filter_str: &str) -> Result<EnvFilter> {
EnvFilter::from_str(filter_str).map_err(|err| err.into())
EnvFilter::from_str(filter_str).with_context(|| "Failed to set up log filter")
}

/// Holds any logging state that needs to be kept alive during the nodes execution.
/// Dropping this guard will flush the logs and close the flamegraph file (if any).
#[derive(Default)]
pub struct LogGuard {
#[cfg(feature = "flamegraph")]
flame_guard: Option<(String, tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>)>,
}

impl Drop for LogGuard {
fn drop(&mut self) {
#[cfg(feature = "flamegraph")]
if let Some((filename, flame_guard)) = &mut self.flame_guard {
// Flush the flamegraph file and a message on what to do with it.
match flame_guard.flush() {
Ok(()) => println!(
"Written folded tracing data to disk. Run `inferno-flamegraph {filename} > flamegraph.svg` to generate a graph from it."
),
Err(err) => eprintln!("Failed to flush flamegraph file at {filename}: {err}"),
}
}
}
}

/// Sets the log filter based on the given verbosity level.
Expand All @@ -120,15 +148,18 @@ fn parse_log_filter(filter_str: &str) -> Result<EnvFilter> {
/// 5 => info, debug, trace, snarkos_node_router=trace
/// 6 => info, debug, trace, snarkos_node_tcp=trace
/// ```
///
/// Do not drop the returned log guard until shutdown.
pub fn initialize_logger<P: AsRef<Path>>(
verbosity: u8,
log_filter: &Option<String>,
log_filter: Option<String>,
nodisplay: bool,
logfile: P,
enable_flamegraph: bool,
shutdown: Arc<AtomicBool>,
) -> Result<mpsc::Receiver<Vec<u8>>> {
) -> Result<(mpsc::Receiver<Vec<u8>>, LogGuard)> {
let [stdout_filter, logfile_filter] = std::array::from_fn(|_| {
if let Some(filter) = log_filter { parse_log_filter(filter) } else { parse_log_verbosity(verbosity) }
if let Some(filter) = &log_filter { parse_log_filter(filter) } else { parse_log_verbosity(verbosity) }
});

// Create the directories tree for a logfile if it doesn't exist.
Expand Down Expand Up @@ -159,8 +190,8 @@ pub fn initialize_logger<P: AsRef<Path>>(
// of the log event, i.e., the file/module where the log message was created.
let show_target = verbosity > 2 || log_filter.is_some();

// Attach tracing-subscriber.
let layered = tracing_subscriber::registry()
// Initialize tracing.
let registry = tracing_subscriber::registry()
.with(
// Add layer using LogWriter for stdout / terminal
tracing_subscriber::fmt::Layer::default()
Expand All @@ -179,14 +210,24 @@ pub fn initialize_logger<P: AsRef<Path>>(
.with_filter(logfile_filter?),
);

// Attach console-subscriber, if enabled.
#[cfg(feature = "tokio_console")]
let layered = layered.with(console_subscriber::spawn());

// Initialize tracing.
let _ = layered.try_init();

Ok(log_receiver)
if enable_flamegraph {
cfg_if::cfg_if! {
if #[cfg(feature = "flamegraph")] {
// Add a timestamp to the trace file name, so we do not overwrite an existing one.
let timestamp = UtcDateTime::now().format(format_description!("[year][month][day]-[hour][minute][second]")).with_context(|| "Failed to format timestamp")?;
let trace_file_name = format!("snarkos-trace-{timestamp}.folded");
let (flame_layer, log_guard) = FlameLayer::with_file(trace_file_name.clone()).with_context(|| format!("Failed to create flamegraph file at {trace_file_name}"))?;
let _ = registry.with(flame_layer).try_init();

Ok((log_receiver, LogGuard { flame_guard: Some((trace_file_name, log_guard)) }))
} else {
bail!("Cannot enable flamegraph. Disabled at compile time.");
}
}
} else {
let _ = registry.try_init();
Ok((log_receiver, LogGuard::default()))
}
}

/// Set up only terminal logging
Expand Down
5 changes: 5 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ cuda = [
"snarkos-node-router/cuda",
"snarkos-node-sync/cuda"
]
instrumentation = [
"snarkvm/instrumentation",
"snarkos-node-router/instrumentation",
"snarkos-node-bft/instrumentation"
]
serial = [ "snarkos-node-bft/serial" ]
test = []

Expand Down
1 change: 1 addition & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ test = [
"snarkos-node-bft-ledger-service/test",
"snarkos-node-bft-storage-service/test"
]
instrumentation = [ ]
serial = [ "snarkos-node-bft-ledger-service/serial" ]

[dependencies.aleo-std]
Expand Down
16 changes: 12 additions & 4 deletions node/bft/events/src/challenge_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ impl<N: Network> FromBytes for ChallengeRequest<N> {
let listener_port = u16::read_le(&mut reader)?;
let address = Address::<N>::read_le(&mut reader)?;
let nonce = u64::read_le(&mut reader)?;
let snarkos_sha = {
let bytes =
<[u8; 40]>::read_le(&mut reader).map_err(|err| io_error(format!("Invalid snarkOS SHA - {err}")))?;
if bytes == Self::UNKNOWN_COMMIT_HASH { None } else { Some(bytes) }
let snarkos_sha = match <[u8; 40]>::read_le(&mut reader) {
Ok(bytes) => {
if bytes == Self::UNKNOWN_COMMIT_HASH {
None
} else {
Some(bytes)
}
}
Err(err) => {
tracing::warn!("Invalid snarkOS SHA - {err}");
None
}
};

Ok(Self { version, listener_port, address, nonce, snarkos_sha })
Expand Down
2 changes: 2 additions & 0 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,7 @@ impl<N: Network> Primary<N> {
/// 3. Store the signature.
/// 4. Certify the batch if enough signatures have been received.
/// 5. Broadcast the batch certificate to all validators.
#[cfg_attr(feature = "instrumentation", tracing::instrument(skip(self, batch_signature), fields(batch_id = %batch_signature.batch_id)))]
async fn process_batch_signature_from_peer(
&self,
peer_ip: SocketAddr,
Expand Down Expand Up @@ -1664,6 +1665,7 @@ impl<N: Network> Primary<N> {
}

/// Stores the certified batch and broadcasts it to all validators, returning the certificate.
#[cfg_attr(feature = "instrumentation", tracing::instrument(skip(self, proposal, committee), fields(round = proposal.round())))]
async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
// Create the batch certificate and transmissions.
let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
Expand Down
1 change: 1 addition & 0 deletions node/cdn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locktick = [ "dep:locktick", "snarkvm/locktick" ]
parallel = [ "rayon" ]
cuda = [ "snarkvm/cuda" ]
metrics = [ "dep:snarkos-node-metrics" ]
instrumentation = [ ]

[dependencies.anyhow]
workspace = true
Expand Down
1 change: 1 addition & 0 deletions node/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locktick = [ "dep:locktick", "snarkos-node-tcp/locktick", "snarkvm/locktick" ]
metrics = [ "dep:snarkos-node-metrics" ]
cuda = [ "snarkvm/cuda", "snarkos-account/cuda" ]
serial = [ "snarkos-node-bft-ledger-service/serial" ]
instrumentation = [ ]

[dependencies.aleo-std]
workspace = true
Expand Down
18 changes: 13 additions & 5 deletions node/router/messages/src/challenge_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use super::*;

use snarkos_node_network::NodeType;
use snarkvm::prelude::{FromBytes, ToBytes, io_error};
use snarkvm::prelude::{FromBytes, ToBytes};

use std::borrow::Cow;

Expand Down Expand Up @@ -59,10 +59,18 @@ impl<N: Network> FromBytes for ChallengeRequest<N> {
let address = Address::<N>::read_le(&mut reader)?;
let nonce = u64::read_le(&mut reader)?;

let snarkos_sha = {
let bytes =
<[u8; 40]>::read_le(&mut reader).map_err(|err| io_error(format!("Invalid snarkOS SHA - {err}")))?;
if bytes == Self::UNKNOWN_COMMIT_HASH { None } else { Some(bytes) }
let snarkos_sha = match <[u8; 40]>::read_le(&mut reader) {
Ok(bytes) => {
if bytes == Self::UNKNOWN_COMMIT_HASH {
None
} else {
Some(bytes)
}
}
Err(err) => {
tracing::warn!("Invalid snarkOS SHA - {err}");
None
}
};

Ok(Self { version, listener_port, node_type, address, nonce, snarkos_sha })
Expand Down
1 change: 1 addition & 0 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {
/// Handles the inbound message from the peer. The returned value indicates whether
/// the connection is still active, and errors cause a disconnect once they are
/// propagated to the caller.
#[cfg_attr(feature = "instrumentation", tracing::instrument(skip(self, message)))]
async fn inbound(&self, peer_addr: SocketAddr, message: Message<N>) -> Result<bool> {
// Retrieve the listener IP for the peer.
let peer_ip = match self.router().resolve_to_listener(peer_addr) {
Expand Down
Loading