Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 229 additions & 13 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ serde = "1.0.184"
serde_derive = "1.0.184"
serde_json = "1.0"
tiny_http = { version = "0.12", optional = true }
zmq = "0.10"

[target.'cfg(windows)'.dependencies]
ctrlc = "=3.5.2"
Expand Down
5 changes: 5 additions & 0 deletions internal/config_specification.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ name = "daemon_p2p_addr"
type = "crate::config::ResolvAddr"
doc = "Bitcoin daemon p2p 'addr:port' to connect (default: 127.0.0.1:8333 for mainnet, 127.0.0.1:18333 for testnet, 127.0.0.1:18444 for regtest and 127.0.0.1:38333 for signet)"

[[param]]
name = "daemon_zmq_addr"
type = "String"
doc = "ZMQ endpoint for new block notifications (e.g. 'tcp://127.0.0.1:28332'). When set, uses REST+ZMQ instead of p2p for block fetching."

[[param]]
name = "monitoring_addr"
type = "crate::config::ResolvAddr"
Expand Down
2 changes: 2 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ pub struct Config {
pub daemon_auth: SensitiveAuth,
pub daemon_rpc_addr: SocketAddr,
pub daemon_p2p_addr: SocketAddr,
pub daemon_zmq_addr: Option<String>,
pub electrum_rpc_addr: SocketAddr,
pub monitoring_addr: SocketAddr,
pub wait_duration: Duration,
Expand Down Expand Up @@ -345,6 +346,7 @@ impl Config {
daemon_auth,
daemon_rpc_addr,
daemon_p2p_addr,
daemon_zmq_addr: config.daemon_zmq_addr,
electrum_rpc_addr,
monitoring_addr,
wait_duration: Duration::from_secs(config.wait_duration_secs),
Expand Down
42 changes: 42 additions & 0 deletions src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::{
chain::{Chain, NewHeader},
connection::rpc_zmq::RestZmqBlockSource,
metrics::Metrics,
types::SerBlock,
};
use bitcoin::{p2p::Magic, BlockHash};
use crossbeam_channel::Receiver;
use std::net::SocketAddr;

mod p2p;
mod rpc_zmq;

pub trait BlockSource: Send + Sync {
fn get_new_headers(&mut self, chain: &Chain) -> anyhow::Result<Vec<NewHeader>>;
fn for_blocks<'a>(
&'a mut self,
blockhashes: Vec<BlockHash>,
func: Box<dyn FnMut(BlockHash, SerBlock) + 'a>,
) -> anyhow::Result<()>;
fn new_block_notification(&self) -> Receiver<()>;
}

pub fn make_p2p_connection(
address: SocketAddr,
metrics: &Metrics,
magic: Magic,
) -> anyhow::Result<Box<dyn BlockSource>> {
Ok(Box::new(p2p::Connection::connect(address, metrics, magic)?))
}

pub fn make_rpc_zmq_connection(
rest_addr: SocketAddr,
zmq_endpoint: &str,
metrics: &Metrics,
) -> anyhow::Result<Box<dyn BlockSource>> {
Ok(Box::new(RestZmqBlockSource::connect(
rest_addr,
zmq_endpoint,
metrics,
)?))
}
29 changes: 24 additions & 5 deletions src/p2p.rs → src/connection/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::io::Write;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use crate::connection::BlockSource;
use crate::types::SerBlock;
use crate::{
chain::{Chain, NewHeader},
Expand Down Expand Up @@ -54,7 +55,7 @@ impl Request {
}
}

pub(crate) struct Connection {
pub struct Connection {
req_send: Sender<Request>,
blocks_recv: Receiver<SerBlock>,
headers_recv: Receiver<Vec<BlockHeader>>,
Expand All @@ -67,7 +68,7 @@ impl Connection {
/// Get new block headers (supporting reorgs).
/// https://en.bitcoin.it/wiki/Protocol_documentation#getheaders
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn get_new_headers(&mut self, chain: &Chain) -> Result<Vec<NewHeader>> {
fn get_new_headers(&mut self, chain: &Chain) -> Result<Vec<NewHeader>> {
self.req_send.send(Request::get_new_headers(chain))?;
let headers = self
.headers_recv
Expand All @@ -93,7 +94,7 @@ impl Connection {
/// Request and process the specified blocks (in the specified order).
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
where
B: IntoIterator<Item = BlockHash>,
F: FnMut(BlockHash, SerBlock),
Expand Down Expand Up @@ -131,11 +132,11 @@ impl Connection {
}

/// Note: only a single receiver will get the notification (https://github.com/romanz/electrs/pull/526#issuecomment-934687415).
pub(crate) fn new_block_notification(&self) -> Receiver<()> {
fn new_block_notification(&self) -> Receiver<()> {
self.new_block_recv.clone()
}

pub(crate) fn connect(address: SocketAddr, metrics: &Metrics, magic: Magic) -> Result<Self> {
pub fn connect(address: SocketAddr, metrics: &Metrics, magic: Magic) -> Result<Self> {
let recv_conn = TcpStream::connect(address)
.with_context(|| format!("p2p failed to connect: {:?}", address))?;
let mut send_conn = recv_conn
Expand Down Expand Up @@ -404,3 +405,21 @@ pub fn duration_to_seconds(d: Duration) -> f64 {
let nanos = f64::from(d.subsec_nanos()) / 1e9;
d.as_secs() as f64 + nanos
}

impl BlockSource for Connection {
fn get_new_headers(&mut self, chain: &Chain) -> anyhow::Result<Vec<NewHeader>> {
self.get_new_headers(chain)
}

fn for_blocks<'a>(
&mut self,
blockhashes: Vec<BlockHash>,
func: Box<dyn FnMut(BlockHash, SerBlock) + 'a>,
) -> anyhow::Result<()> {
self.for_blocks(blockhashes, func)
}

fn new_block_notification(&self) -> Receiver<()> {
self.new_block_notification()
}
}
Loading