diff --git a/crates/hashi/src/btc_monitor/monitor.rs b/crates/hashi/src/btc_monitor/monitor.rs index 9f9f5eea2..446da4c11 100644 --- a/crates/hashi/src/btc_monitor/monitor.rs +++ b/crates/hashi/src/btc_monitor/monitor.rs @@ -6,8 +6,10 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use kyoto::AddrV2; use kyoto::FeeRate; use kyoto::HeaderCheckpoint; +use kyoto::ServiceFlags; use kyoto::Warning; use sui_futures::service::Service; use tokio::sync::oneshot; @@ -42,6 +44,14 @@ const KYOTO_RESTART_DELAY: Duration = Duration::from_secs(5); /// refreshed before it's dropped from the confirmation-metrics cache. const STALE_OBSERVATION_BLOCKS: u32 = 10; +/// How often to poll kyoto for peer info and update peer-flag metrics. +const KYOTO_METRICS_POLL_INTERVAL: Duration = Duration::from_secs(30); + +/// Timeout for individual `peer_info()` / `chain_tip()` calls into kyoto. +/// These should be near-instant in steady state — the timeout exists so that +/// a wedged kyoto node cannot stall a worker forever. +const KYOTO_QUERY_TIMEOUT: Duration = Duration::from_secs(5); + #[derive(Debug, Clone, PartialEq, Eq)] pub enum TxStatus { Confirmed { confirmations: u32 }, @@ -266,8 +276,19 @@ impl Monitor { let mut consecutive_failures: u32 = 0; let mut required_peers: usize = 0; + let mut metrics_poll = tokio::time::interval(KYOTO_METRICS_POLL_INTERVAL); + // If we fall behind (long-running select arm), skip the missed + // ticks rather than firing a burst of polls. + metrics_poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { + _ = metrics_poll.tick() => { + self.rpc_workers.spawn(Self::poll_kyoto_peer_metrics( + self.requester.clone(), + self.metrics.clone(), + )); + } Some(event) = kyoto_client.event_rx.recv() => { self.process_kyoto_event(event); } @@ -449,6 +470,14 @@ impl Monitor { result_tx, )); } + MonitorMessage::GetChainTip(result_tx) => { + self.rpc_workers + .spawn(Self::get_chain_tip(self.requester.clone(), result_tx)); + } + MonitorMessage::GetPeerInfo(result_tx) => { + self.rpc_workers + .spawn(Self::get_peer_info(self.requester.clone(), result_tx)); + } MonitorMessage::RecordDepositObservation { outpoint, observation, @@ -604,6 +633,69 @@ impl Monitor { } } + async fn get_chain_tip( + requester: kyoto::Requester, + result_tx: oneshot::Sender>, + ) { + let result = match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.chain_tip()).await { + Ok(Ok(tip)) => Ok(tip), + Ok(Err(e)) => Err(anyhow::anyhow!("kyoto chain_tip() failed: {e}")), + Err(_) => Err(anyhow::anyhow!( + "kyoto chain_tip() timed out after {:?}", + KYOTO_QUERY_TIMEOUT + )), + }; + let _ = result_tx.send(result); + } + + async fn get_peer_info( + requester: kyoto::Requester, + result_tx: oneshot::Sender>>, + ) { + let result = match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.peer_info()).await { + Ok(Ok(peers)) => Ok(peers), + Ok(Err(e)) => Err(anyhow::anyhow!("kyoto peer_info() failed: {e}")), + Err(_) => Err(anyhow::anyhow!( + "kyoto peer_info() timed out after {:?}", + KYOTO_QUERY_TIMEOUT + )), + }; + let _ = result_tx.send(result); + } + + /// Background poller that updates `kyoto_peers_with_compact_filters` + /// and `kyoto_peers_v2` from a snapshot of the live peer set. Runs on + /// every metrics tick; failures (rebuild in progress, etc.) are + /// logged at debug level and the metrics are left at their previous + /// value until the next tick. + async fn poll_kyoto_peer_metrics(requester: kyoto::Requester, metrics: Arc) { + match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.peer_info()).await { + Ok(Ok(peers)) => { + let mut compact = 0i64; + let mut v2 = 0i64; + for (_, flags) in &peers { + if flags.has(ServiceFlags::COMPACT_FILTERS) { + compact += 1; + } + if flags.has(ServiceFlags::P2P_V2) { + v2 += 1; + } + } + metrics.kyoto_peers_with_compact_filters.set(compact); + metrics.kyoto_peers_v2.set(v2); + } + Ok(Err(e)) => { + debug!("kyoto peer_info() failed during metrics poll: {e}"); + } + Err(_) => { + debug!( + "kyoto peer_info() timed out after {:?} during metrics poll", + KYOTO_QUERY_TIMEOUT + ); + } + } + } + async fn get_transaction_status( bitcoind_rpc: Arc, txid: bitcoin::Txid, @@ -1048,6 +1140,31 @@ impl MonitorClient { .map_err(|e| anyhow::anyhow!(e))?; rx.await.map_err(|e| anyhow::anyhow!(e))? } + + /// Query kyoto's current chain tip. Returns the best known + /// [`HeaderCheckpoint`] as kyoto sees it — distinct from + /// [`Self::subscribe_block_height`], which delivers the cached tip + /// updated from chain events. + pub async fn chain_tip(&self) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx + .send(MonitorMessage::GetChainTip(tx)) + .await + .map_err(|e| anyhow::anyhow!(e))?; + rx.await.map_err(|e| anyhow::anyhow!(e))? + } + + /// Snapshot the set of P2P peers kyoto is currently connected to. + /// Each entry is the peer's network address and the service flags + /// it advertised during the BIP-157/BIP-324 handshake. + pub async fn peer_info(&self) -> Result> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(MonitorMessage::GetPeerInfo(tx)) + .await + .map_err(|e| anyhow::anyhow!(e))?; + rx.await.map_err(|e| anyhow::anyhow!(e))? + } } enum MonitorMessage { @@ -1065,6 +1182,14 @@ enum MonitorMessage { // Query the status of a transaction (confirmed, in mempool, or not found). GetTransactionStatus(bitcoin::Txid, oneshot::Sender>), + // Returns kyoto's current chain tip (synchronous query into the kyoto node; + // useful for healthz endpoints that don't want to depend on the in-memory cache). + GetChainTip(oneshot::Sender>), + + // Returns the live set of peers kyoto is currently connected to and the + // service flags negotiated for each. + GetPeerInfo(oneshot::Sender>>), + // Updates `deposit_observation_cache` RecordDepositObservation { outpoint: bitcoin::OutPoint, diff --git a/crates/hashi/src/metrics.rs b/crates/hashi/src/metrics.rs index eeee4b537..96d931f44 100644 --- a/crates/hashi/src/metrics.rs +++ b/crates/hashi/src/metrics.rs @@ -43,6 +43,8 @@ pub struct Metrics { pub kyoto_reorgs: IntCounter, pub kyoto_consecutive_failures: IntGauge, pub kyoto_sync_percent: IntGauge, + pub kyoto_peers_with_compact_filters: IntGauge, + pub kyoto_peers_v2: IntGauge, // General Sui metrics sui_epoch: IntGauge, @@ -293,6 +295,20 @@ impl Metrics { registry, ) .unwrap(), + kyoto_peers_with_compact_filters: register_int_gauge_with_registry!( + "hashi_kyoto_peers_with_compact_filters", + "Number of currently connected peers advertising NODE_COMPACT_FILTERS. \ + If this drops to 0, kyoto cannot make filter sync progress even when \ + connection counts look healthy.", + registry, + ) + .unwrap(), + kyoto_peers_v2: register_int_gauge_with_registry!( + "hashi_kyoto_peers_v2", + "Number of currently connected peers negotiated to BIP-324 (P2P V2) transport.", + registry, + ) + .unwrap(), epoch: register_int_gauge_with_registry!( "hashi_epoch",