Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ sui-http = "0.2.0"
bitcoin = { version = "0.32.8", features =["serde"] }
corepc-client = { version = "0.10.0", features = ["client-sync"] }
jsonrpc = "0.18.0"
kyoto = { package = "bip157", version = "0.5.0" }

# Dependencies for grpc and protobuf
tonic = { version = "0.14", features = ["zstd", "transport", "tls-webpki-roots"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/hashi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ hashi-types = { path = "../hashi-types" }
bitcoin.workspace = true
corepc-client.workspace = true
jsonrpc.workspace = true
kyoto = { package = "bip157", git = "https://github.com/0xsiddharthks/kyoto", rev = "f2ba601c" }
kyoto.workspace = true

# Dependencies for the protobuf definitions
tonic.workspace = true
Expand Down
21 changes: 12 additions & 9 deletions crates/hashi/examples/testnet_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use bitcoin::Network;
use clap::Parser;
use hashi::btc_monitor::config::MonitorConfig;
use hashi::btc_monitor::monitor::Monitor;
use kyoto::DnsPeer;
use kyoto::TrustedPeer;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -72,11 +72,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

info!("Starting BTC testnet4 monitor");

// DNS peers are re-resolved by kyoto on each connection attempt.
let dns_peers = vec![
DnsPeer::new("seed.testnet4.bitcoin.sprovoost.nl", 48333),
DnsPeer::new("seed.testnet4.wiz.biz", 48333),
let peer_specs: &[(&str, u16)] = &[
("seed.testnet4.bitcoin.sprovoost.nl", 48333),
("seed.testnet4.wiz.biz", 48333),
];
let trusted_peers: Vec<TrustedPeer> = peer_specs
.iter()
.map(|&(host, port)| TrustedPeer::from_hostname(host, port))
.collect();

let bitcoind_auth = match (args.bitcoind_user, args.bitcoind_password) {
(Some(user), Some(pass)) => corepc_client::client_sync::Auth::UserPass(user, pass),
Expand All @@ -91,7 +94,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let config = MonitorConfig::builder()
.network(Network::Testnet4)
.dns_peers(dns_peers)
.trusted_peers(trusted_peers)
.start_height(args.start_height)
.bitcoind_rpc_config(args.bitcoind_url.clone(), bitcoind_auth)
.build();
Expand All @@ -101,10 +104,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!(" Confirmations required: {}", args.confirmations);
info!(" Starting height: {}", config.start_height);
info!(" bitcoind RPC URL: {}", config.bitcoind_rpc_url);
info!(" Initial peers: {}", config.dns_peers.len());
info!(" Initial peers: {}", config.trusted_peers.len());
info!(" Peer addresses:");
for peer in &config.dns_peers {
info!(" - {}:{}", peer.hostname, peer.port);
for (host, port) in peer_specs {
info!(" - {host}:{port}");
}

// Create and start the monitor
Expand Down
19 changes: 10 additions & 9 deletions crates/hashi/src/btc_monitor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ pub struct MonitorConfig {
pub network: Network,

/// Peers for P2P connections, identified by hostname (or IP) and port.
/// Re-resolved via DNS on each connection attempt, so IP changes
/// (e.g., Kubernetes pod rotation) are followed automatically.
pub dns_peers: Vec<kyoto::DnsPeer>,
/// Hostnames are resolved at connection time; the monitor's supervisor
/// rebuilds the kyoto node on disconnect, which re-resolves them so IP
/// changes (e.g. Kubernetes pod rotation) are followed.
pub trusted_peers: Vec<kyoto::TrustedPeer>,

/// Starting block height for synchronization
pub start_height: u32,
Expand All @@ -38,7 +39,7 @@ impl Default for MonitorConfig {
fn default() -> Self {
Self {
network: Network::Bitcoin,
dns_peers: Vec::new(),
trusted_peers: Vec::new(),
start_height: 800_000,
bitcoind_rpc_url: "http://localhost:8332".to_string(),
bitcoind_rpc_auth: corepc_client::client_sync::Auth::None,
Expand All @@ -58,7 +59,7 @@ impl MonitorConfig {
#[derive(Debug, Default)]
pub struct MonitorConfigBuilder {
network: Option<Network>,
dns_peers: Vec<kyoto::DnsPeer>,
trusted_peers: Vec<kyoto::TrustedPeer>,
start_height: u32,
bitcoind_rpc_url: Option<String>,
bitcoind_rpc_auth: Option<corepc_client::client_sync::Auth>,
Expand All @@ -73,9 +74,9 @@ impl MonitorConfigBuilder {
}

/// Set peers for P2P connections. Accepts hostnames or IPs with port.
/// Hostnames are re-resolved via DNS on each connection attempt.
pub fn dns_peers(mut self, peers: Vec<kyoto::DnsPeer>) -> Self {
self.dns_peers = peers;
/// Hostnames are resolved at connection time.
pub fn trusted_peers(mut self, peers: Vec<kyoto::TrustedPeer>) -> Self {
self.trusted_peers = peers;
self
}

Expand Down Expand Up @@ -107,7 +108,7 @@ impl MonitorConfigBuilder {

MonitorConfig {
network: self.network.unwrap_or(default.network),
dns_peers: self.dns_peers,
trusted_peers: self.trusted_peers,
start_height: self.start_height,
bitcoind_rpc_url: self.bitcoind_rpc_url.unwrap_or(default.bitcoind_rpc_url),
bitcoind_rpc_auth: self.bitcoind_rpc_auth.unwrap_or(default.bitcoind_rpc_auth),
Expand Down
2 changes: 1 addition & 1 deletion crates/hashi/src/btc_monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
pub mod config;
pub mod monitor;

pub use kyoto::DnsPeer;
pub use kyoto::TrustedPeer;
70 changes: 42 additions & 28 deletions crates/hashi/src/btc_monitor/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::metrics::Metrics;
const FALLBACK_FEE_RATE_SAT_PER_KWU: u64 = 250;

/// Number of consecutive connection failures before restarting Kyoto.
const KYOTO_MAX_CONSECUTIVE_FAILURES: u32 = 30;
const KYOTO_MAX_CONSECUTIVE_FAILURES: u32 = 15;

/// Delay before restarting Kyoto after connectivity loss.
const KYOTO_RESTART_DELAY: Duration = Duration::from_secs(5);
Expand Down Expand Up @@ -63,6 +63,7 @@ pub enum DepositConfirmError {

enum KyotoEventLoopExit {
ConnectivityLost,
KyotoNodeExited,
Shutdown,
}

Expand Down Expand Up @@ -109,7 +110,7 @@ impl Monitor {
};

let mut builder = kyoto::Builder::new(config.network)
.add_dns_peers(config.dns_peers.iter().cloned())
.add_peers(config.trusted_peers.iter().cloned())
// Only connect to the configured trusted peers. Prevents Kyoto from
// discovering additional peers via DNS seeding or addr gossip.
// If all peers disconnect, the node exits with NoReachablePeers
Expand Down Expand Up @@ -188,13 +189,28 @@ impl Monitor {
self.config.network
);

// Spawn the Kyoto node as a background task. Node::run() takes
// ownership, so we move it in and get a JoinHandle back.
let kyoto_handle = tokio::spawn(async move { current_node.run().await });

let result = self.run_event_loop(&mut current_client, client_rx).await;
let mut kyoto_handle = tokio::spawn(async move { current_node.run().await });

// Race the event loop against the node task. In bip157 ≥ 0.5.0
// hostname peers are popped on use, so a single peer drop ends
// `Node::run()`; without this, the event loop would wait on
// silent channels forever.
let result = tokio::select! {
event_loop_exit = self.run_event_loop(&mut current_client, client_rx) => event_loop_exit,
join_result = &mut kyoto_handle => {
match join_result {
Ok(Ok(())) => warn!("Kyoto node exited cleanly; restarting"),
Ok(Err(e)) => warn!("Kyoto node exited with error: {e}; restarting"),
Err(e) if e.is_cancelled() => {
info!("Bitcoin monitor stopped");
return Ok(());
}
Err(e) => error!("Kyoto node task panicked: {e}; restarting"),
}
KyotoEventLoopExit::KyotoNodeExited
}
};

// Abort the Kyoto node task regardless of exit reason.
kyoto_handle.abort();

match result {
Expand All @@ -203,29 +219,27 @@ impl Monitor {
"Lost connectivity to Bitcoin peers after {KYOTO_MAX_CONSECUTIVE_FAILURES} \
consecutive failures. Restarting Kyoto node..."
);

self.metrics.kyoto_restarts.inc();
self.metrics.kyoto_connected_peers.set(0);
self.metrics.kyoto_synced.set(0);
self.metrics.kyoto_consecutive_failures.set(0);

// Wait before restarting to avoid tight restart loops.
tokio::time::sleep(KYOTO_RESTART_DELAY).await;

// Build a fresh Kyoto node with the trusted peers re-added
// to the whitelist.
let (new_node, new_client) = Self::build_kyoto_node(&self.config);
current_node = new_node;
current_client = new_client;
self.requester = current_client.requester.clone();

info!("Kyoto node rebuilt, resuming monitor");
}
KyotoEventLoopExit::KyotoNodeExited => {}
KyotoEventLoopExit::Shutdown => {
info!("Bitcoin monitor stopped");
return Ok(());
}
}

self.metrics.kyoto_restarts.inc();
self.metrics.kyoto_connected_peers.set(0);
self.metrics.kyoto_synced.set(0);
self.metrics.kyoto_consecutive_failures.set(0);

tokio::time::sleep(KYOTO_RESTART_DELAY).await;

let (new_node, new_client) = Self::build_kyoto_node(&self.config);
current_node = new_node;
current_client = new_client;
self.requester = current_client.requester.clone();

info!("Kyoto node rebuilt, resuming monitor");
}
}

Expand Down Expand Up @@ -590,7 +604,7 @@ impl Monitor {
}
}

match requester.broadcast_tx(tx).await {
match requester.submit_package(tx).await {
Ok(wtxid) => {
info!("Transaction {txid} broadcast acknowledged (wtxid: {wtxid})");
let _ = result_tx.send(Ok(()));
Expand Down Expand Up @@ -735,8 +749,8 @@ impl Monitor {
Ok(Some(height)) => height,
Ok(None) => {
warn!(
"Block hash {block_hash} not found in kyoto's chain of most work. \
Possibly malicious behavior by the Bitcoin Core node."
"Block hash {block_hash} from bitcoind not yet on kyoto's chain \
(sync lag, recent reorg, or malicious bitcoind); will retry."
);
return;
}
Expand Down
11 changes: 6 additions & 5 deletions crates/hashi/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,11 @@ impl Config {
.to_corepc_auth()
}

/// Parse configured Bitcoin peer strings as DNS peers. The hostnames are
/// NOT resolved here — kyoto re-resolves them on each connection attempt,
/// so IP changes (e.g., Kubernetes pod rotation) are followed automatically.
pub fn bitcoin_dns_peers(&self) -> anyhow::Result<Vec<kyoto::DnsPeer>> {
/// Parse configured Bitcoin peer strings into kyoto trusted peers.
/// Hostnames are not resolved here — kyoto resolves them at connection
/// time, and the monitor's supervisor rebuilds the node on disconnect
/// to re-resolve and follow IP changes (e.g., Kubernetes pod rotation).
pub fn bitcoin_trusted_peers(&self) -> anyhow::Result<Vec<kyoto::TrustedPeer>> {
let Some(peer_strs) = self.bitcoin_trusted_peers.as_ref() else {
return Ok(Vec::new());
};
Expand All @@ -327,7 +328,7 @@ impl Config {
let port = port_str
.parse::<u16>()
.map_err(|e| anyhow::anyhow!("Invalid port in bitcoin peer '{s}': {e}"))?;
peers.push(kyoto::DnsPeer::new(host, port));
peers.push(kyoto::TrustedPeer::from_hostname(host, port));
}
Ok(peers)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/hashi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl Hashi {
self.config.bitcoin_rpc().to_string(),
self.config.bitcoin_rpc_auth(),
)
.dns_peers(self.config.bitcoin_dns_peers()?)
.trusted_peers(self.config.bitcoin_trusted_peers()?)
.data_dir(
self.config
.db
Expand Down
Loading