diff --git a/Cargo.lock b/Cargo.lock index e4b3bfff5..03732fe8d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,7 +63,7 @@ version = "0.69.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cexpr", "clang-sys", "itertools", @@ -172,6 +172,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.9.1" @@ -213,7 +219,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2bfbc36312494041e2cdd5f06697b7e89d4b76f42773a0b5556ac290ff22acc2" dependencies = [ "serde", - "toml", + "toml 0.5.11", ] [[package]] @@ -236,6 +242,16 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-expr" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d067ad48b8650848b989a59a86c6c36a995d02d2bf778d45c3c5d57bc2718f02" +dependencies = [ + "smallvec", + "target-lexicon", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -273,7 +289,7 @@ dependencies = [ "parse_arg 0.1.6", "serde", "serde_derive", - "toml", + "toml 0.5.11", ] [[package]] @@ -286,7 +302,7 @@ dependencies = [ "fmt2io", "serde", "serde_derive", - "toml", + "toml 0.5.11", "unicode-segmentation", "void", ] @@ -300,6 +316,19 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1137cd7e7fc0fb5d3c5a8678be38ec56e819125d8d7907411fe24ccb943faca8" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -328,6 +357,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -365,6 +403,17 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "dircpy" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a88521b0517f5f9d51d11925d8ab4523497dcf947073fa3231a311b63941131c" +dependencies = [ + "jwalk", + "log", + "walkdir", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -392,7 +441,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" dependencies = [ - "bitflags", + "bitflags 2.9.1", "block2", "libc", "objc2", @@ -431,6 +480,7 @@ dependencies = [ "signal-hook", "tempfile", "tiny_http", + "zmq", ] [[package]] @@ -446,6 +496,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.12" @@ -513,6 +569,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.5.1" @@ -552,6 +620,16 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" +[[package]] +name = "indexmap" +version = "2.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "is-terminal" version = "0.4.16" @@ -600,6 +678,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "jwalk" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2735847566356cd2179a2a38264839308f7079fa96e6bd5a42d740460e003c56" +dependencies = [ + "crossbeam", + "rayon", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -624,7 +712,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags", + "bitflags 2.9.1", "libc", ] @@ -696,7 +784,7 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "225e7cfe711e0ba79a68baeddb2982723e4235247aefce1482f2f16c27865b66" dependencies = [ - "bitflags", + "bitflags 2.9.1", "cfg-if", "cfg_aliases", "libc", @@ -801,7 +889,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc5b72d8145275d844d4b5f6d4e1eef00c8cd889edb6035c21675d1bb1f45c9f" dependencies = [ - "bitflags", + "bitflags 2.9.1", "hex", "procfs-core", "rustix 0.38.44", @@ -813,7 +901,7 @@ version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "239df02d8349b06fc07398a3a1697b06418223b1c7725085e801e7c0fc6a12ec" dependencies = [ - "bitflags", + "bitflags 2.9.1", "hex", ] @@ -925,7 +1013,7 @@ version = "0.5.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "928fca9cf2aa042393a8325b9ead81d2f0df4cb12e1e24cef072922ccd99c5af" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -1005,7 +1093,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.4.15", @@ -1018,13 +1106,22 @@ version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" dependencies = [ - "bitflags", + "bitflags 2.9.1", "errno", "libc", "linux-raw-sys 0.11.0", "windows-sys 0.61.0", ] +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1095,6 +1192,15 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_spanned" +version = "0.6.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3" +dependencies = [ + "serde", +] + [[package]] name = "sha2" version = "0.10.9" @@ -1148,6 +1254,25 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck", + "pkg-config", + "toml 0.8.23", + "version-compare", +] + +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.24.0" @@ -1231,6 +1356,40 @@ dependencies = [ "serde", ] +[[package]] +name = "toml" +version = "0.8.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362" +dependencies = [ + "serde", + "serde_spanned", + "toml_datetime", + "toml_edit", +] + +[[package]] +name = "toml_datetime" +version = "0.6.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" +dependencies = [ + "serde", +] + +[[package]] +name = "toml_edit" +version = "0.22.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" +dependencies = [ + "indexmap", + "serde", + "serde_spanned", + "toml_datetime", + "winnow", +] + [[package]] name = "typenum" version = "1.18.0" @@ -1255,6 +1414,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version-compare" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03c2856837ef78f57382f06b2b8563a2f512f7185d732608fd9176cb3b8edf0e" + [[package]] name = "version_check" version = "0.9.5" @@ -1267,6 +1432,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -1401,13 +1576,22 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" +dependencies = [ + "memchr", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags", + "bitflags 2.9.1", ] [[package]] @@ -1430,12 +1614,44 @@ dependencies = [ "syn", ] +[[package]] +name = "zeromq-src" +version = "0.2.6+4.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc120b771270365d5ed0dfb4baf1005f2243ae1ae83703265cb3504070f4160b" +dependencies = [ + "cc", + "dircpy", +] + [[package]] name = "zmij" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0f4a4e8e9dc5c62d159f04fcdbe07f4c3fb710415aab4754bf11505501e3251d" +[[package]] +name = "zmq" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd3091dd571fb84a9b3e5e5c6a807d186c411c812c8618786c3c30e5349234e7" +dependencies = [ + "bitflags 1.3.2", + "libc", + "zmq-sys", +] + +[[package]] +name = "zmq-sys" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e8351dc72494b4d7f5652a681c33634063bbad58046c1689e75270908fdc864" +dependencies = [ + "libc", + "system-deps", + "zeromq-src", +] + [[package]] name = "zstd-sys" version = "2.0.13+zstd.1.5.6" diff --git a/Cargo.toml b/Cargo.toml index 75d41a273..0cb079ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ serde = "1.0.184" serde_derive = "1.0.184" serde_json = "1.0" tiny_http = { version = "0.12", optional = true } +zmq = "0.10" [target.'cfg(windows)'.dependencies] ctrlc = "=3.5.2" diff --git a/internal/config_specification.toml b/internal/config_specification.toml index f4ae65e36..76cb972e7 100644 --- a/internal/config_specification.toml +++ b/internal/config_specification.toml @@ -81,6 +81,11 @@ name = "daemon_p2p_addr" type = "crate::config::ResolvAddr" doc = "Bitcoin daemon p2p 'addr:port' to connect (default: 127.0.0.1:8333 for mainnet, 127.0.0.1:18333 for testnet, 127.0.0.1:18444 for regtest and 127.0.0.1:38333 for signet)" +[[param]] +name = "daemon_zmq_addr" +type = "String" +doc = "ZMQ endpoint for new block notifications (e.g. 'tcp://127.0.0.1:28332'). When set, uses REST+ZMQ instead of p2p for block fetching." + [[param]] name = "monitoring_addr" type = "crate::config::ResolvAddr" diff --git a/src/config.rs b/src/config.rs index b844dda06..c75a02d8e 100644 --- a/src/config.rs +++ b/src/config.rs @@ -132,6 +132,7 @@ pub struct Config { pub daemon_auth: SensitiveAuth, pub daemon_rpc_addr: SocketAddr, pub daemon_p2p_addr: SocketAddr, + pub daemon_zmq_addr: Option, pub electrum_rpc_addr: SocketAddr, pub monitoring_addr: SocketAddr, pub wait_duration: Duration, @@ -345,6 +346,7 @@ impl Config { daemon_auth, daemon_rpc_addr, daemon_p2p_addr, + daemon_zmq_addr: config.daemon_zmq_addr, electrum_rpc_addr, monitoring_addr, wait_duration: Duration::from_secs(config.wait_duration_secs), diff --git a/src/connection/mod.rs b/src/connection/mod.rs new file mode 100644 index 000000000..6dbb610d8 --- /dev/null +++ b/src/connection/mod.rs @@ -0,0 +1,42 @@ +use crate::{ + chain::{Chain, NewHeader}, + connection::rpc_zmq::RestZmqBlockSource, + metrics::Metrics, + types::SerBlock, +}; +use bitcoin::{p2p::Magic, BlockHash}; +use crossbeam_channel::Receiver; +use std::net::SocketAddr; + +mod p2p; +mod rpc_zmq; + +pub trait BlockSource: Send + Sync { + fn get_new_headers(&mut self, chain: &Chain) -> anyhow::Result>; + fn for_blocks<'a>( + &'a mut self, + blockhashes: Vec, + func: Box, + ) -> anyhow::Result<()>; + fn new_block_notification(&self) -> Receiver<()>; +} + +pub fn make_p2p_connection( + address: SocketAddr, + metrics: &Metrics, + magic: Magic, +) -> anyhow::Result> { + Ok(Box::new(p2p::Connection::connect(address, metrics, magic)?)) +} + +pub fn make_rpc_zmq_connection( + rest_addr: SocketAddr, + zmq_endpoint: &str, + metrics: &Metrics, +) -> anyhow::Result> { + Ok(Box::new(RestZmqBlockSource::connect( + rest_addr, + zmq_endpoint, + metrics, + )?)) +} diff --git a/src/p2p.rs b/src/connection/p2p.rs similarity index 94% rename from src/p2p.rs rename to src/connection/p2p.rs index 0787f36b4..ccae04fd0 100644 --- a/src/p2p.rs +++ b/src/connection/p2p.rs @@ -24,6 +24,7 @@ use std::io::Write; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use crate::connection::BlockSource; use crate::types::SerBlock; use crate::{ chain::{Chain, NewHeader}, @@ -54,7 +55,7 @@ impl Request { } } -pub(crate) struct Connection { +pub struct Connection { req_send: Sender, blocks_recv: Receiver, headers_recv: Receiver>, @@ -67,7 +68,7 @@ impl Connection { /// Get new block headers (supporting reorgs). /// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result> { + fn get_new_headers(&mut self, chain: &Chain) -> Result> { self.req_send.send(Request::get_new_headers(chain))?; let headers = self .headers_recv @@ -93,7 +94,7 @@ impl Connection { /// Request and process the specified blocks (in the specified order). /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> where B: IntoIterator, F: FnMut(BlockHash, SerBlock), @@ -131,11 +132,11 @@ impl Connection { } /// Note: only a single receiver will get the notification (https://github.com/romanz/electrs/pull/526#issuecomment-934687415). - pub(crate) fn new_block_notification(&self) -> Receiver<()> { + fn new_block_notification(&self) -> Receiver<()> { self.new_block_recv.clone() } - pub(crate) fn connect(address: SocketAddr, metrics: &Metrics, magic: Magic) -> Result { + pub fn connect(address: SocketAddr, metrics: &Metrics, magic: Magic) -> Result { let recv_conn = TcpStream::connect(address) .with_context(|| format!("p2p failed to connect: {:?}", address))?; let mut send_conn = recv_conn @@ -404,3 +405,21 @@ pub fn duration_to_seconds(d: Duration) -> f64 { let nanos = f64::from(d.subsec_nanos()) / 1e9; d.as_secs() as f64 + nanos } + +impl BlockSource for Connection { + fn get_new_headers(&mut self, chain: &Chain) -> anyhow::Result> { + self.get_new_headers(chain) + } + + fn for_blocks<'a>( + &mut self, + blockhashes: Vec, + func: Box, + ) -> anyhow::Result<()> { + self.for_blocks(blockhashes, func) + } + + fn new_block_notification(&self) -> Receiver<()> { + self.new_block_notification() + } +} diff --git a/src/connection/rpc_zmq.rs b/src/connection/rpc_zmq.rs new file mode 100644 index 000000000..1c420c21c --- /dev/null +++ b/src/connection/rpc_zmq.rs @@ -0,0 +1,519 @@ +use anyhow::{Context, Result}; +use bitcoin::blockdata::block::Header as BlockHeader; +use bitcoin::consensus::deserialize; +use bitcoin::BlockHash; +use crossbeam_channel::{bounded, Receiver}; + +use std::io::{BufRead, BufReader, Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::time::Duration; + +use crate::chain::{Chain, NewHeader}; +use crate::connection::BlockSource; +use crate::metrics::{default_duration_buckets, Histogram, Metrics}; +use crate::types::SerBlock; + +const MAX_REORG_DEPTH: usize = 1000; + +/// Max headers returned per REST call — matches p2p `getheaders` cap. +const HEADER_BATCH_SIZE: usize = 2000; + +/// Number of parallel REST connections for block fetching. +const FETCH_CONNECTIONS: usize = 4; + +/// Bounded channel depth between fetcher and consumer in `for_blocks`. +const BLOCK_CHANNEL_DEPTH: usize = 10; + +/// Size of a serialized block header in bytes. +const HEADER_SIZE: usize = 80; + +/// Block source backed entirely by the Bitcoin Core REST interface (for +/// headers and blocks) and ZMQ (for new-block notifications). +/// +/// No RPC, no authentication. Requires `rest=1` and `zmqpubhashblock=tcp://:` in bitcoin.conf. +pub struct RestZmqBlockSource { + rest_conn: RestConn, + block_fetchers: Vec, + new_block_recv: Receiver<()>, + blocks_duration: Histogram, +} + +impl RestZmqBlockSource { + pub fn connect(rest_addr: SocketAddr, zmq_endpoint: &str, metrics: &Metrics) -> Result { + // Verify REST is enabled. + let mut conn = RestConn::connect(rest_addr)?; + conn.get("/rest/chaininfo.json") + .context("REST not reachable — ensure bitcoind is started with rest=1")?; + + let new_block_recv = spawn_zmq_listener(zmq_endpoint)?; + + let blocks_duration = metrics.histogram_vec( + "rest_zmq_blocks_duration", + "Time spent getting blocks via REST (in seconds)", + "step", + default_duration_buckets(), + ); + + let mut block_fetchers = Vec::with_capacity(FETCH_CONNECTIONS); + for _ in 0..FETCH_CONNECTIONS { + block_fetchers.push(RestConn::connect(rest_addr)?); + } + + info!( + "REST+ZMQ block source ready (rest={}, zmq={})", + rest_addr, zmq_endpoint + ); + + Ok(Self { + rest_conn: conn, + block_fetchers, + new_block_recv, + blocks_duration, + }) + } + + /// `/rest/chaininfo.json` → (best block hash, height). + fn chain_info(&mut self) -> Result<(BlockHash, usize)> { + let body = self.rest_conn.get("/rest/chaininfo.json")?; + let v: serde_json::Value = + serde_json::from_slice(&body).context("invalid chaininfo JSON")?; + + let tip: BlockHash = v["bestblockhash"] + .as_str() + .context("missing bestblockhash")? + .parse() + .context("invalid bestblockhash")?; + let height = v["blocks"].as_u64().context("missing blocks")? as usize; + Ok((tip, height)) + } + + /// `/rest/blockhashbyheight/.json` → block hash at height. + fn block_hash_at_height(&mut self, height: usize) -> Result { + let path = format!("/rest/blockhashbyheight/{}.json", height); + let body = self.rest_conn.get(&path)?; + let v: serde_json::Value = + serde_json::from_slice(&body).context("invalid blockhashbyheight JSON")?; + + v["blockhash"] + .as_str() + .context("missing blockhash")? + .parse() + .context("invalid blockhash") + } + + /// `/rest/headers//.bin` → raw 80-byte headers. + /// + /// Returns up to `count` headers starting from **and including** `hash`. + fn raw_headers(&mut self, hash: &BlockHash, count: usize) -> Result> { + let path = format!("/rest/headers/{}/{}.bin", count, hash); + let body = self.rest_conn.get(&path)?; + + if body.len() % HEADER_SIZE != 0 { + bail!( + "REST headers: unexpected length {} (not a multiple of {})", + body.len(), + HEADER_SIZE + ); + } + + body.chunks_exact(HEADER_SIZE) + .map(|chunk| deserialize(chunk).context("invalid header from REST")) + .collect() + } + + /// Fetch new headers after our tip in one REST call. + /// + /// Gets the hash at local_height+1 via blockhashbyheight, then requests + /// headers starting from that hash. All returned headers are new. + /// Validates prev_blockhash continuity to detect races with reorgs. + fn fetch_headers_after_tip(&mut self, chain: &Chain) -> Result> { + let local_height = chain.height(); + + // Get the hash of the first block we don't have. + let next_hash = self.block_hash_at_height(local_height + 1)?; + + // REST headers include the starting hash, so all returned are new. + let headers = self.raw_headers(&next_hash, HEADER_BATCH_SIZE)?; + + if headers.is_empty() { + return Ok(vec![]); + } + + // Verify the first header connects to our tip. + if headers[0].prev_blockhash != chain.tip() { + bail!( + "REST header discontinuity: header at height {} has prev_blockhash {}, expected tip {}", + local_height + 1, + headers[0].prev_blockhash, + chain.tip(), + ); + } + + // Verify internal continuity. + for i in 1..headers.len() { + let expected = headers[i - 1].block_hash(); + if headers[i].prev_blockhash != expected { + bail!( + "REST header chain broken at index {}: prev_blockhash {} != expected {}", + i, + headers[i].prev_blockhash, + expected, + ); + } + } + + debug!( + "got {} new headers via REST (heights {}..={})", + headers.len(), + local_height + 1, + local_height + headers.len(), + ); + + Ok(headers + .into_iter() + .zip((local_height + 1)..) + .map(NewHeader::from) + .collect()) + } + + /// Reorg slow path: walk backwards from the remote tip until we find a + /// hash present in our local chain (the fork point). + fn walk_backwards_for_headers( + &mut self, + chain: &Chain, + remote_tip: BlockHash, + ) -> Result> { + let mut headers: Vec = Vec::new(); + let mut current = remote_tip; + + loop { + if headers.len() >= MAX_REORG_DEPTH { + bail!( + "reorg deeper than {} blocks — aborting backwards walk", + MAX_REORG_DEPTH + ); + } + + let fetched = self.raw_headers(¤t, 1)?; + let header = fetched + .into_iter() + .next() + .with_context(|| format!("REST: no header for {}", current))?; + + let prev = header.prev_blockhash; + headers.push(header); + + if let Some(fork_height) = chain.get_block_height(&prev) { + headers.reverse(); + debug!( + "got {} new headers via REST backwards walk (fork at height {})", + headers.len(), + fork_height, + ); + return Ok(headers + .into_iter() + .zip((fork_height + 1)..) + .map(NewHeader::from) + .collect()); + } + current = prev; + } + } +} + +impl BlockSource for RestZmqBlockSource { + /// Fetch new headers, capped at HEADER_BATCH_SIZE per call. + /// + /// Fast path: verify our tip is on the main chain, fetch headers in one + /// REST call. Slow path (reorg): walk backwards to find fork point. + fn get_new_headers(&mut self, chain: &Chain) -> Result> { + let (remote_tip, remote_height) = self.chain_info()?; + + // Already at tip. + if chain.get_block_height(&remote_tip).is_some() { + return Ok(vec![]); + } + + let local_height = chain.height(); + + // Fast path: verify our tip is still on the active chain. + if local_height <= remote_height { + let hash_at_ours = self.block_hash_at_height(local_height)?; + if hash_at_ours == chain.tip() { + return self.fetch_headers_after_tip(chain); + } + } + + // Reorg (or remote behind us) — walk backwards. + self.walk_backwards_for_headers(chain, remote_tip) + } + + /// Fetch blocks via `/rest/block/.bin` using multiple parallel + /// keep-alive connections to eliminate per-block dead time. + /// + /// Block hashes are striped across N fetcher threads (each with its own + /// TCP connection). A reorder buffer ensures blocks arrive at the consumer + /// in the original requested order. + fn for_blocks<'a>( + &'a mut self, + blockhashes: Vec, + mut func: Box, + ) -> Result<()> { + if blockhashes.is_empty() { + return Ok(()); + } + + debug!( + "REST: fetching {} blocks ({} connections)", + blockhashes.len(), + FETCH_CONNECTIONS + ); + + let blocks_duration = &self.blocks_duration; + let block_fetchers = &mut self.block_fetchers; + + blocks_duration.observe_duration("total", || { + // Each fetcher gets (index, hash) pairs so we can reorder. + let mut work: Vec> = + (0..FETCH_CONNECTIONS).map(|_| Vec::new()).collect(); + for (i, hash) in blockhashes.iter().enumerate() { + work[i % FETCH_CONNECTIONS].push((i, *hash)); + } + + let (tx, rx) = bounded::<(usize, BlockHash, SerBlock)>(BLOCK_CHANNEL_DEPTH); + + std::thread::scope(|s| { + // Spawn fetcher threads, each borrowing a persistent connection. + let fetchers: Vec<_> = work + .into_iter() + .zip(block_fetchers.iter_mut()) + .enumerate() + .map(|(conn_id, (assignments, conn))| { + let tx = tx.clone(); + s.spawn(move || { + let mut err = None; + for (idx, hash) in assignments { + let path = format!("/rest/block/{}.bin", hash); + match conn.get(&path) { + Ok(block) => { + if tx.send((idx, hash, block)).is_err() { + break; + } + } + Err(e) => { + err = Some(e.context(format!( + "REST conn {}: block {}", + conn_id, hash + ))); + break; + } + } + } + err + }) + }) + .collect(); + + // Drop our copy so channel closes when all fetchers finish. + drop(tx); + + // Reorder buffer: blocks arrive out of order, consumer needs + // them in sequence. + let mut next_idx = 0; + let mut pending: std::collections::HashMap = + std::collections::HashMap::new(); + + for (idx, hash, block) in rx { + pending.insert(idx, (hash, block)); + + // Flush all consecutive ready blocks. + while let Some((h, b)) = pending.remove(&next_idx) { + blocks_duration.observe_duration("process", || func(h, b)); + next_idx += 1; + } + } + + // Check for fetcher errors. + let mut first_err = None; + for f in fetchers { + let err = f.join().expect("fetcher panicked"); + if first_err.is_none() { + first_err = err; + } + } + + if let Some(e) = first_err { + return Err(e); + } + + assert_eq!(next_idx, blockhashes.len(), "not all blocks were processed"); + Ok(()) + }) + }) + } + + fn new_block_notification(&self) -> Receiver<()> { + self.new_block_recv.clone() + } +} + +/// Persistent HTTP/1.1 connection to bitcoind REST. +/// Reuses a single TCP stream across requests to avoid per-request overhead. +struct RestConn { + addr: SocketAddr, + reader: BufReader, +} + +impl RestConn { + fn connect(addr: SocketAddr) -> Result { + let stream = TcpStream::connect(addr) + .with_context(|| format!("REST: connect to {} failed", addr))?; + stream.set_read_timeout(Some(Duration::from_secs(120))).ok(); + stream.set_write_timeout(Some(Duration::from_secs(30))).ok(); + let reader = BufReader::new(stream); + Ok(Self { addr, reader }) + } + + /// GET a path; auto-reconnects once on failure. + fn get(&mut self, path: &str) -> Result> { + match self.do_get(path) { + Ok(body) => Ok(body), + Err(first_err) => { + debug!("REST: retrying {} after error: {:#}", path, first_err); + *self = Self::connect(self.addr)?; + self.do_get(path) + } + } + } + + fn do_get(&mut self, path: &str) -> Result> { + let req = format!( + "GET {} HTTP/1.1\r\nHost: {}\r\nConnection: keep-alive\r\nAccept-Encoding: identity\r\n\r\n", + path, self.addr + ); + self.reader + .get_mut() + .write_all(req.as_bytes()) + .context("REST: write failed")?; + + let mut status = String::new(); + let n = self + .reader + .read_line(&mut status) + .context("REST: read status")?; + if n == 0 { + bail!("REST: connection closed (EOF reading status)"); + } + let status_code = status + .split_whitespace() + .nth(1) + .and_then(|s| s.parse::().ok()) + .with_context(|| format!("REST: malformed status line: {}", status.trim()))?; + let is_ok = status_code == 200; + + let mut content_length: Option = None; + let mut chunked = false; + loop { + let mut line = String::new(); + let n = self + .reader + .read_line(&mut line) + .context("REST: read header")?; + if n == 0 { + bail!("REST: connection closed (EOF reading headers)"); + } + let t = line.trim(); + if t.is_empty() { + break; + } + let lower = t.to_ascii_lowercase(); + if let Some(v) = lower.strip_prefix("content-length:") { + content_length = v.trim().parse().ok(); + } + if lower.starts_with("transfer-encoding:") && lower.contains("chunked") { + chunked = true; + } + } + + let body = if let Some(len) = content_length { + let mut body = vec![0u8; len]; + self.reader + .read_exact(&mut body) + .context("REST: read body")?; + body + } else if chunked { + self.read_chunked()? + } else if !is_ok { + // Unknown body framing on error response — connection is + // potentially desynced. Force reconnect on next request. + *self = Self::connect(self.addr)?; + Vec::new() + } else { + bail!("REST: no Content-Length and not chunked"); + }; + + if !is_ok { + bail!("REST: {} → {} {}", path, status_code, status.trim()); + } + + Ok(body) + } + + fn read_chunked(&mut self) -> Result> { + let mut body = Vec::new(); + loop { + let mut line = String::new(); + self.reader.read_line(&mut line)?; + // Strip chunk extensions (e.g. "1a;foo=bar" → "1a"). + let hex = line.trim().split(';').next().unwrap_or("0"); + let size = usize::from_str_radix(hex, 16).context("bad chunk size")?; + if size == 0 { + // Drain trailers: read until empty line. + loop { + let mut trailer = String::new(); + self.reader.read_line(&mut trailer)?; + if trailer.trim().is_empty() { + break; + } + } + break; + } + let mut chunk = vec![0u8; size]; + self.reader.read_exact(&mut chunk)?; + body.extend_from_slice(&chunk); + let mut crlf = [0u8; 2]; + self.reader.read_exact(&mut crlf)?; + } + Ok(body) + } +} + +fn spawn_zmq_listener(endpoint: &str) -> Result> { + let ctx = zmq::Context::new(); + let sub = ctx.socket(zmq::SUB).context("ZMQ: create socket")?; + sub.connect(endpoint) + .with_context(|| format!("ZMQ: connect to {}", endpoint))?; + sub.set_subscribe(b"hashblock").context("ZMQ: subscribe")?; + + info!("subscribed to ZMQ hashblock at {}", endpoint); + + let (tx, rx) = bounded::<()>(0); + let ep = endpoint.to_owned(); + + crate::thread::spawn("zmq_listener", move || loop { + match sub.recv_bytes(0) { + Ok(_) => {} + Err(zmq::Error::ETERM) => { + debug!("ZMQ terminated"); + return Ok(()); + } + Err(e) => bail!("ZMQ recv on {}: {}", ep, e), + } + while sub.get_rcvmore().unwrap_or(false) { + let _ = sub.recv_bytes(0); + } + let _ = tx.try_send(()); + }); + + Ok(rx) +} diff --git a/src/daemon.rs b/src/daemon.rs index 40df169ba..6e9c6ebba 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -13,11 +13,11 @@ use std::fs::File; use std::io::Read; use std::path::Path; +use crate::connection::{make_p2p_connection, make_rpc_zmq_connection, BlockSource}; use crate::{ chain::{Chain, NewHeader}, config::Config, metrics::Metrics, - p2p::Connection, signals::ExitFlag, types::SerBlock, }; @@ -100,7 +100,7 @@ fn rpc_connect(config: &Config) -> Result { } pub struct Daemon { - p2p: Mutex, + block_source: Mutex>, rpc: Client, } @@ -139,12 +139,23 @@ impl Daemon { bail!("electrs requires non-pruned bitcoind node"); } - let p2p = Mutex::new(Connection::connect( - config.daemon_p2p_addr, - metrics, - config.magic, - )?); - Ok(Self { p2p, rpc }) + let source: Box = if let Some(ref zmq_ep) = config.daemon_zmq_addr { + info!("using RPC+ZMQ block source (zmq_endpoint={})", zmq_ep); + make_rpc_zmq_connection(config.daemon_rpc_addr, zmq_ep, metrics)? + } else { + if !network_info.network_active { + bail!("electrs requires active bitcoind p2p network (or use --zmq-endpoint)"); + } + info!("using p2p block source"); + make_p2p_connection(config.daemon_p2p_addr, metrics, config.magic)? + }; + + let source = Mutex::new(source); + + Ok(Self { + block_source: source, + rpc, + }) } pub(crate) fn estimate_fee(&self, nblocks: u16) -> Result> { @@ -289,19 +300,21 @@ impl Daemon { } pub(crate) fn get_new_headers(&self, chain: &Chain) -> Result> { - self.p2p.lock().get_new_headers(chain) + self.block_source.lock().get_new_headers(chain) } - pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> + pub(crate) fn for_blocks<'a, B, F>(&'a self, blockhashes: B, func: F) -> Result<()> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) + 'a, { - self.p2p.lock().for_blocks(blockhashes, func) + self.block_source + .lock() + .for_blocks(blockhashes.into_iter().collect(), Box::new(func)) } pub(crate) fn new_block_notification(&self) -> Receiver<()> { - self.p2p.lock().new_block_notification() + self.block_source.lock().new_block_notification() } } diff --git a/src/lib.rs b/src/lib.rs index 1458f06ff..983a0c07c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ extern crate serde_derive; mod cache; mod chain; mod config; +mod connection; mod daemon; mod db; mod electrum; @@ -17,7 +18,6 @@ mod index; mod mempool; mod merkle; mod metrics; -mod p2p; mod server; mod signals; mod status;