Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions crates/hashi/src/btc_monitor/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use kyoto::AddrV2;
use kyoto::FeeRate;
use kyoto::HeaderCheckpoint;
use kyoto::ServiceFlags;
use kyoto::Warning;
use sui_futures::service::Service;
use tokio::sync::oneshot;
Expand Down Expand Up @@ -42,6 +44,14 @@ const KYOTO_RESTART_DELAY: Duration = Duration::from_secs(5);
/// refreshed before it's dropped from the confirmation-metrics cache.
const STALE_OBSERVATION_BLOCKS: u32 = 10;

/// How often to poll kyoto for peer info and update peer-flag metrics.
const KYOTO_METRICS_POLL_INTERVAL: Duration = Duration::from_secs(30);

/// Timeout for individual `peer_info()` / `chain_tip()` calls into kyoto.
/// These should be near-instant in steady state — the timeout exists so that
/// a wedged kyoto node cannot stall a worker forever.
const KYOTO_QUERY_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TxStatus {
Confirmed { confirmations: u32 },
Expand Down Expand Up @@ -266,8 +276,19 @@ impl Monitor {
let mut consecutive_failures: u32 = 0;
let mut required_peers: usize = 0;

let mut metrics_poll = tokio::time::interval(KYOTO_METRICS_POLL_INTERVAL);
// If we fall behind (long-running select arm), skip the missed
// ticks rather than firing a burst of polls.
metrics_poll.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
tokio::select! {
_ = metrics_poll.tick() => {
self.rpc_workers.spawn(Self::poll_kyoto_peer_metrics(
self.requester.clone(),
self.metrics.clone(),
));
}
Some(event) = kyoto_client.event_rx.recv() => {
self.process_kyoto_event(event);
}
Expand Down Expand Up @@ -449,6 +470,14 @@ impl Monitor {
result_tx,
));
}
MonitorMessage::GetChainTip(result_tx) => {
self.rpc_workers
.spawn(Self::get_chain_tip(self.requester.clone(), result_tx));
}
MonitorMessage::GetPeerInfo(result_tx) => {
self.rpc_workers
.spawn(Self::get_peer_info(self.requester.clone(), result_tx));
}
MonitorMessage::RecordDepositObservation {
outpoint,
observation,
Expand Down Expand Up @@ -604,6 +633,69 @@ impl Monitor {
}
}

async fn get_chain_tip(
requester: kyoto::Requester,
result_tx: oneshot::Sender<Result<HeaderCheckpoint>>,
) {
let result = match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.chain_tip()).await {
Ok(Ok(tip)) => Ok(tip),
Ok(Err(e)) => Err(anyhow::anyhow!("kyoto chain_tip() failed: {e}")),
Err(_) => Err(anyhow::anyhow!(
"kyoto chain_tip() timed out after {:?}",
KYOTO_QUERY_TIMEOUT
)),
};
let _ = result_tx.send(result);
}

async fn get_peer_info(
requester: kyoto::Requester,
result_tx: oneshot::Sender<Result<Vec<(AddrV2, ServiceFlags)>>>,
) {
let result = match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.peer_info()).await {
Ok(Ok(peers)) => Ok(peers),
Ok(Err(e)) => Err(anyhow::anyhow!("kyoto peer_info() failed: {e}")),
Err(_) => Err(anyhow::anyhow!(
"kyoto peer_info() timed out after {:?}",
KYOTO_QUERY_TIMEOUT
)),
};
let _ = result_tx.send(result);
}

/// Background poller that updates `kyoto_peers_with_compact_filters`
/// and `kyoto_peers_v2` from a snapshot of the live peer set. Runs on
/// every metrics tick; failures (rebuild in progress, etc.) are
/// logged at debug level and the metrics are left at their previous
/// value until the next tick.
async fn poll_kyoto_peer_metrics(requester: kyoto::Requester, metrics: Arc<Metrics>) {
match tokio::time::timeout(KYOTO_QUERY_TIMEOUT, requester.peer_info()).await {
Ok(Ok(peers)) => {
let mut compact = 0i64;
let mut v2 = 0i64;
for (_, flags) in &peers {
if flags.has(ServiceFlags::COMPACT_FILTERS) {
compact += 1;
}
if flags.has(ServiceFlags::P2P_V2) {
v2 += 1;
}
}
metrics.kyoto_peers_with_compact_filters.set(compact);
metrics.kyoto_peers_v2.set(v2);
}
Ok(Err(e)) => {
debug!("kyoto peer_info() failed during metrics poll: {e}");
}
Err(_) => {
debug!(
"kyoto peer_info() timed out after {:?} during metrics poll",
KYOTO_QUERY_TIMEOUT
);
}
}
}

async fn get_transaction_status(
bitcoind_rpc: Arc<corepc_client::client_sync::v29::Client>,
txid: bitcoin::Txid,
Expand Down Expand Up @@ -1048,6 +1140,31 @@ impl MonitorClient {
.map_err(|e| anyhow::anyhow!(e))?;
rx.await.map_err(|e| anyhow::anyhow!(e))?
}

/// Query kyoto's current chain tip. Returns the best known
/// [`HeaderCheckpoint`] as kyoto sees it — distinct from
/// [`Self::subscribe_block_height`], which delivers the cached tip
/// updated from chain events.
pub async fn chain_tip(&self) -> Result<HeaderCheckpoint> {
let (tx, rx) = oneshot::channel();
self.tx
.send(MonitorMessage::GetChainTip(tx))
.await
.map_err(|e| anyhow::anyhow!(e))?;
rx.await.map_err(|e| anyhow::anyhow!(e))?
}

/// Snapshot the set of P2P peers kyoto is currently connected to.
/// Each entry is the peer's network address and the service flags
/// it advertised during the BIP-157/BIP-324 handshake.
pub async fn peer_info(&self) -> Result<Vec<(AddrV2, ServiceFlags)>> {
let (tx, rx) = oneshot::channel();
self.tx
.send(MonitorMessage::GetPeerInfo(tx))
.await
.map_err(|e| anyhow::anyhow!(e))?;
rx.await.map_err(|e| anyhow::anyhow!(e))?
}
}

enum MonitorMessage {
Expand All @@ -1065,6 +1182,14 @@ enum MonitorMessage {
// Query the status of a transaction (confirmed, in mempool, or not found).
GetTransactionStatus(bitcoin::Txid, oneshot::Sender<Result<TxStatus>>),

// Returns kyoto's current chain tip (synchronous query into the kyoto node;
// useful for healthz endpoints that don't want to depend on the in-memory cache).
GetChainTip(oneshot::Sender<Result<HeaderCheckpoint>>),

// Returns the live set of peers kyoto is currently connected to and the
// service flags negotiated for each.
GetPeerInfo(oneshot::Sender<Result<Vec<(AddrV2, ServiceFlags)>>>),

// Updates `deposit_observation_cache`
RecordDepositObservation {
outpoint: bitcoin::OutPoint,
Expand Down
16 changes: 16 additions & 0 deletions crates/hashi/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub struct Metrics {
pub kyoto_reorgs: IntCounter,
pub kyoto_consecutive_failures: IntGauge,
pub kyoto_sync_percent: IntGauge,
pub kyoto_peers_with_compact_filters: IntGauge,
pub kyoto_peers_v2: IntGauge,

// General Sui metrics
sui_epoch: IntGauge,
Expand Down Expand Up @@ -293,6 +295,20 @@ impl Metrics {
registry,
)
.unwrap(),
kyoto_peers_with_compact_filters: register_int_gauge_with_registry!(
"hashi_kyoto_peers_with_compact_filters",
"Number of currently connected peers advertising NODE_COMPACT_FILTERS. \
If this drops to 0, kyoto cannot make filter sync progress even when \
connection counts look healthy.",
registry,
)
.unwrap(),
kyoto_peers_v2: register_int_gauge_with_registry!(
"hashi_kyoto_peers_v2",
"Number of currently connected peers negotiated to BIP-324 (P2P V2) transport.",
registry,
)
.unwrap(),

epoch: register_int_gauge_with_registry!(
"hashi_epoch",
Expand Down
Loading