diff --git a/bindex-cli/Cargo.toml b/bindex-cli/Cargo.toml index 36efc11..d2c8fc3 100644 --- a/bindex-cli/Cargo.toml +++ b/bindex-cli/Cargo.toml @@ -13,7 +13,7 @@ readme = "../README.md" [dependencies] -bindex = { path = "../bindex-lib" } +bindex = { path = "../bindex-lib", features = ["cache"] } chrono = { version = "0.4", default-features = false } clap = { version = "4", features = ["derive"] } env_logger = "0.11" diff --git a/bindex-cli/src/bin/bindex-cli.rs b/bindex-cli/src/bin/bindex-cli.rs index f5dd117..1de3db4 100644 --- a/bindex-cli/src/bin/bindex-cli.rs +++ b/bindex-cli/src/bin/bindex-cli.rs @@ -1,5 +1,5 @@ use bindex::{ - bitcoin::{self, consensus::deserialize, hashes::Hash, BlockHash, Txid}, + bitcoin::{self, consensus::deserialize, hashes::Hash, Txid}, cache, IndexedChain, Network, }; use chrono::{TimeZone, Utc}; @@ -7,9 +7,8 @@ use clap::Parser; use log::*; use std::{ collections::HashSet, - io::{BufRead, BufReader, Read, Write}, + io::Read, path::{Path, PathBuf}, - process::{ChildStdin, ChildStdout, Command, Stdio}, str::FromStr, time::Instant, }; @@ -177,10 +176,6 @@ struct Args { /// Exit after one sync is over #[arg(short = '1', long = "sync-once", default_value_t = false)] sync_once: bool, - - /// Start Electrum server - #[arg(short = 'e', long = "electrum", default_value_t = false)] - electrum: bool, } fn collect_addresses(args: &Args) -> Result> { @@ -204,50 +199,6 @@ fn collect_addresses(args: &Args) -> Result> { Ok(addresses) } -struct Electrum { - stdin: ChildStdin, - stdout: BufReader, - line: String, -} - -impl Electrum { - fn start(cache_file: &Path, network: bitcoin::Network) -> Result { - let mut server = Command::new("python") - .arg("-m") - .arg("electrum.server") - .arg("--cache-db") - .arg(cache_file) - .arg("--network") - .arg(network.to_string()) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn()?; - info!("Launched server @ pid={}", server.id()); - - let stdin = server.stdin.take().unwrap(); - let stdout = BufReader::new(server.stdout.take().unwrap()); - - Ok(Self { - stdin, - stdout, - line: String::new(), - }) - } - - fn notify(&mut self, new_tip: BlockHash) -> Result<()> { - debug!("chain best block={}", new_tip); - writeln!(self.stdin, "{new_tip}")?; - self.stdin.flush()?; - Ok(()) - } - - fn wait(&mut self) -> Result<()> { - self.line.clear(); - self.stdout.read_line(&mut self.line)?; // wait for notification - Ok(()) - } -} - fn run() -> Result<()> { let args = Args::parse(); env_logger::builder().format_timestamp_micros().init(); @@ -259,37 +210,16 @@ fn run() -> Result<()> { let cache = cache::Cache::open(cache_db)?; cache.add(collect_addresses(&args)?)?; - let mut server = None; - if args.electrum { - let cache_file = args - .cache_file - .ok_or("Electrum requires setting a cache file")?; - server = Some(Electrum::start(&cache_file, args.network.into())?); - } - let mut index = IndexedChain::open_default(&args.db_path, args.network)?; + let mut index = IndexedChain::open(&args.db_path, args.network)?; loop { // index new blocks (also handle reorgs) - let tip = loop { - let stats = index.sync_chain(1000)?; - if stats.indexed_blocks == 0 { - break stats.tip; - } - }; + while index.sync_chain(1000)?.indexed_blocks > 0 {} // make sure to update new scripthashes (even if there are no new blocks) cache.sync(&index)?; - let entries = get_history(cache.db())?; - match server.as_mut() { - Some(s) => { - s.notify(tip)?; - s.wait()?; // Electrum should send an ACK for an index sync request - } - None => { - print_history(entries, args.history_limit); - std::thread::sleep(std::time::Duration::from_secs(1)); - if args.sync_once { - break; - } - } + print_history(get_history(cache.db())?, args.history_limit); + std::thread::sleep(std::time::Duration::from_secs(1)); + if args.sync_once { + break; } } Ok(()) diff --git a/bindex-lib/Cargo.toml b/bindex-lib/Cargo.toml index d98c11f..649dc3f 100644 --- a/bindex-lib/Cargo.toml +++ b/bindex-lib/Cargo.toml @@ -11,6 +11,8 @@ keywords = ["bitcoin", "index", "database"] documentation = "https://docs.rs/bindex/" readme = "../README.md" +[features] +cache = ["dep:rusqlite"] [dependencies] bitcoin = { version = "0.32", default-features = false } @@ -21,7 +23,7 @@ log = "0.4" rocksdb = { version = "0.23", default-features = false, features = ["zstd"]} thiserror = "2.0" ureq = { version = "3", default-features = false } -rusqlite = "0.37" +rusqlite = { version = "0.37", optional = true } [dev-dependencies] hex_lit = "0.1" diff --git a/bindex-lib/src/cache/mod.rs b/bindex-lib/src/cache/mod.rs index 314d0db..5aa118a 100644 --- a/bindex-lib/src/cache/mod.rs +++ b/bindex-lib/src/cache/mod.rs @@ -10,8 +10,8 @@ use log::*; use rusqlite::OptionalExtension; use crate::{ + chain::{self, IndexedChain}, index::{self, ScriptHash}, - store::{self, IndexedChain}, Location, }; @@ -23,8 +23,8 @@ pub enum Error { #[error("invalid address: {0}")] Address(#[from] bitcoin::address::ParseError), - #[error("store error: {0}")] - Index(#[from] store::Error), + #[error("chain error: {0}")] + Index(#[from] chain::Error), } pub struct Cache { @@ -116,9 +116,10 @@ impl Cache { })?; let mut delete_from = None; // Find the first non-stale block (scanning backwards from tip): + let headers = chain.headers(); for row in rows_iter { let (hash, height) = row?; - match chain.check_header(hash, height) { + match headers.get_header(hash, height) { Ok(_header) => break, Err(err) => { warn!("reorg detected: {}", err); @@ -155,14 +156,16 @@ impl Cache { SELECT script_hash, block_height, block_hash FROM max_heights LEFT JOIN headers USING (block_height)", )?; + let headers = chain.headers(); let rows_iter = stmt.query_map([], |row| { let script_hash = ScriptHash::from_byte_array(row.get(0)?); let block_height: Option = row.get(1)?; let latest_header = if let Some(height) = block_height { let block_hash = bitcoin::BlockHash::from_byte_array(row.get(2)?); - let header = chain - .check_header(block_hash, height) - .expect("unexpected reorg"); + // Stale blocks should be removed by `drop_stale_blocks` call above. + let header = headers + .get_header(block_hash, height) + .expect("unexpected stale block"); Some(header) } else { None diff --git a/bindex-lib/src/store.rs b/bindex-lib/src/chain.rs similarity index 88% rename from bindex-lib/src/store.rs rename to bindex-lib/src/chain.rs index 0acc2c7..cb6aa4c 100644 --- a/bindex-lib/src/store.rs +++ b/bindex-lib/src/chain.rs @@ -23,9 +23,6 @@ pub enum Error { #[error("Genesis block hash mismatch: {0} != {1}")] ChainMismatch(bitcoin::BlockHash, bitcoin::BlockHash), - #[error("rusqlite failed: {0}")] - Sqlite(#[from] rusqlite::Error), - #[error("invalid address: {0}")] Address(#[from] bitcoin::address::ParseError), @@ -33,6 +30,7 @@ pub enum Error { BlockNotFound(#[from] headers::Reorg), } +#[derive(Debug)] pub struct Stats { pub tip: bitcoin::BlockHash, pub indexed_blocks: usize, @@ -61,9 +59,10 @@ pub struct IndexedChain { impl IndexedChain { /// Open an existing DB, or create if missing. /// Use binary format REST API for fetching the data from bitcoind. - pub fn open(db_path: impl AsRef, url: impl Into) -> Result { - let db_path = db_path.as_ref(); - let url = url.into(); + pub fn open(db_path: impl AsRef, network: Network) -> Result { + let db_path = db_path.as_ref().to_path_buf().join(network.to_string()); + let url = format!("http://localhost:{}", network.default_rpc_port()); + info!("index DB: {:?}, node URL: {}", db_path, url); let agent = ureq::Agent::new_with_config( ureq::config::Config::builder() @@ -106,13 +105,6 @@ impl IndexedChain { }) } - pub fn open_default(db_path: &str, network: Network) -> Result { - let bitcoin_network: bitcoin::Network = network.into(); - let default_db_path = format!("{db_path}/{bitcoin_network}"); - let default_rest_url = format!("http://localhost:{}", network.default_rpc_port()); - Self::open(default_db_path, default_rest_url) - } - fn drop_tip(&mut self) -> Result { let stale = self .headers @@ -243,19 +235,7 @@ impl IndexedChain { .get_block_part(location.indexed_header.hash(), pos)?) } - /// Iterate over block headers. - pub fn iter_headers(&self) -> impl Iterator { - self.headers - .iter_headers() - .map(index::IndexedHeader::header) - } - - /// Make sure this header has been indexed. - pub fn check_header( - &self, - hash: bitcoin::BlockHash, - height: usize, - ) -> Result<&index::IndexedHeader, Error> { - Ok(self.headers.get_header(hash, height)?) + pub fn headers(&self) -> &headers::Headers { + &self.headers } } diff --git a/bindex-lib/src/lib.rs b/bindex-lib/src/lib.rs index 9c4cf63..ea4ff50 100644 --- a/bindex-lib/src/lib.rs +++ b/bindex-lib/src/lib.rs @@ -1,16 +1,19 @@ pub use bitcoin; +#[cfg(feature = "cache")] pub mod cache; +mod chain; mod client; mod db; mod headers; mod index; mod network; -mod store; +pub use chain::IndexedChain; +pub use headers::Headers; +pub use index::ScriptHash; pub use network::Network; -pub use store::IndexedChain; #[derive(PartialEq, Eq, PartialOrd, Clone, Copy, Debug)] pub struct Location<'a> { @@ -20,6 +23,16 @@ pub struct Location<'a> { indexed_header: &'a index::IndexedHeader, } +impl Location<'_> { + pub fn block_hash(&self) -> bitcoin::BlockHash { + self.indexed_header.hash() + } + + pub fn block_height(&self) -> usize { + self.block_height + } +} + impl Ord for Location<'_> { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.txnum.cmp(&other.txnum) diff --git a/bindex-lib/src/network.rs b/bindex-lib/src/network.rs index ed5476a..aaaa68e 100644 --- a/bindex-lib/src/network.rs +++ b/bindex-lib/src/network.rs @@ -21,6 +21,18 @@ impl From for bitcoin::Network { } } +impl From for Network { + fn from(value: bitcoin::Network) -> Self { + match value { + bitcoin::Network::Bitcoin => Network::Bitcoin, + bitcoin::Network::Testnet => Network::Testnet, + bitcoin::Network::Testnet4 => Network::Testnet4, + bitcoin::Network::Signet => Network::Signet, + bitcoin::Network::Regtest => Network::Regtest, + } + } +} + impl Network { pub fn default_rpc_port(&self) -> u16 { match self { @@ -32,3 +44,9 @@ impl Network { } } } + +impl std::fmt::Display for Network { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + bitcoin::Network::from(*self).fmt(f) + } +} diff --git a/electrum.sh b/electrum.sh deleted file mode 100755 index 4de5001..0000000 --- a/electrum.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -set -eux - -export ZMQ_ADDR=tcp://127.0.0.1:55555 -./bindex.sh -c `mktemp` -e $* diff --git a/electrum/LICENCE b/electrum/LICENCE deleted file mode 100644 index ee2d4de..0000000 --- a/electrum/LICENCE +++ /dev/null @@ -1,25 +0,0 @@ -Copyright (c) 2020, The Electrum developers -Copyright (c) 2016-2020, Neil Booth - -All rights reserved. - -The MIT License (MIT) - -Permission is hereby granted, free of charge, to any person obtaining -a copy of this software and associated documentation files (the -"Software"), to deal in the Software without restriction, including -without limitation the rights to use, copy, modify, merge, publish, -distribute, sublicense, and/or sell copies of the Software, and to -permit persons to whom the Software is furnished to do so, subject to -the following conditions: - -The above copyright notice and this permission notice shall be -included in all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/electrum/__init__.py b/electrum/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/electrum/events.py b/electrum/events.py deleted file mode 100644 index e7724ab..0000000 --- a/electrum/events.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python3 -import enum -import logging -import struct - -import zmq -import zmq.asyncio - -LOG = logging.getLogger(__name__) - - -class Event(enum.IntEnum): - TX_ADD = ord("A") - TX_REMOVE = ord("R") - BLOCK_CONNECT = ord("C") - BLOCK_DISCONNECT = ord("D") - - -PREFIX_FMT = "32s1s" -PREFIX_LEN = struct.calcsize(PREFIX_FMT) - - -async def subscribe_events(addr: str, emit_fn): - try: - ctx = zmq.asyncio.Context() - - sub = ctx.socket(zmq.SUB) - sub.setsockopt(zmq.RCVHWM, 0) - sub.setsockopt_string(zmq.SUBSCRIBE, "sequence") - sub.connect(addr) - - next_seq = None - while True: - topic, body, seq = await sub.recv_multipart() - seq = int.from_bytes(seq, "little") - if next_seq is not None and next_seq != seq: - LOG.warning("sequence # skipped: %d -> %d", next_seq, seq) - next_seq = seq + 1 - if topic == b"sequence": - hash, event = struct.unpack(PREFIX_FMT, body[:PREFIX_LEN]) - mempool_seq = body[PREFIX_LEN:] - emit_fn( - Event(event[0]), - hash.hex(), - int.from_bytes(mempool_seq, "little") if mempool_seq else None, - ) - except Exception: - LOG.exception("subscribe_events() failed") diff --git a/electrum/merkle.py b/electrum/merkle.py deleted file mode 100644 index 152b4f7..0000000 --- a/electrum/merkle.py +++ /dev/null @@ -1,101 +0,0 @@ -# Copyright (c) 2018, Neil Booth -# -# All rights reserved. -# -# The MIT License (MIT) -# -# Permission is hereby granted, free of charge, to any person obtaining -# a copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, -# distribute, sublicense, and/or sell copies of the Software, and to -# permit persons to whom the Software is furnished to do so, subject to -# the following conditions: -# -# The above copyright notice and this permission notice shall be -# included in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, -# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF -# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND -# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE -# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION -# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -# and warranty status of this software. - -"""Merkle trees, branches, proofs and roots.""" - -from math import ceil, log -from typing import Optional, Callable, Tuple, Iterable, List - -import hashlib - -_sha256 = hashlib.sha256 - - -def sha256(x): - """Simple wrapper of hashlib sha256.""" - return _sha256(x).digest() - - -def double_sha256(x): - """SHA-256 of SHA-256, as used extensively in bitcoin.""" - return sha256(sha256(x)) - - -class Merkle: - """Perform merkle tree calculations on binary hashes using a given hash - function. - - If the hash count is not even, the final hash is repeated when - calculating the next merkle layer up the tree. - """ - - def __init__(self, hash_func: Callable[[bytes], bytes] = double_sha256): - self.hash_func = hash_func - - def branch_length(self, hash_count: int) -> int: - """Return the length of a merkle branch given the number of hashes.""" - if not isinstance(hash_count, int): - raise TypeError("hash_count must be an integer") - if hash_count < 1: - raise ValueError("hash_count must be at least 1") - return ceil(log(hash_count, 2)) - - def branch_and_root( - self, - hashes: Iterable[bytes], - index: int, - length: Optional[int] = None, - ) -> Tuple[List[bytes], bytes]: - """Return a (merkle branch, merkle_root) pair given hashes, and the - index of one of those hashes. - """ - hashes = list(hashes) - if not isinstance(index, int): - raise TypeError("index must be an integer") - # This also asserts hashes is not empty - if not 0 <= index < len(hashes): - raise ValueError("index out of range") - natural_length = self.branch_length(len(hashes)) - if length is None: - length = natural_length - else: - if not isinstance(length, int): - raise TypeError("length must be an integer") - if length < natural_length: - raise ValueError("length out of range") - - hash_func = self.hash_func - branch = [] - for _ in range(length): - if len(hashes) & 1: - hashes.append(hashes[-1]) - branch.append(hashes[index ^ 1]) - index >>= 1 - hashes = [ - hash_func(hashes[n] + hashes[n + 1]) for n in range(0, len(hashes), 2) - ] - - return branch, hashes[0] diff --git a/electrum/server.py b/electrum/server.py deleted file mode 100644 index b2d6614..0000000 --- a/electrum/server.py +++ /dev/null @@ -1,963 +0,0 @@ -# Copyright (c) 2016-2018, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -"""Classes for local RPC server and remote client TCP/SSL servers.""" - -import aiohttp -import argparse -import asyncio -import base64 -import contextlib -import functools -import itertools -import json -import logging -import os -import sqlite3 -import sys -import time - -from collections import defaultdict -from dataclasses import dataclass -from hashlib import sha256 -from pathlib import Path - -from asyncio.exceptions import TimeoutError -from aiohttp.client_exceptions import ClientError -from aiorpcx import TaskGroup -from aiorpcx import ( - JSONRPCAutoDetect, - JSONRPCConnection, - Request, - RPCError, - RPCSession, - handler_invocation, - serve_rs, - NewlineFramer, -) - -from . import merkle -from .events import Event, subscribe_events - -import typing as t - -BAD_REQUEST = 1 -DAEMON_ERROR = 2 - -MAX_CHUNK_SIZE = 2016 - - -def parse_args(): - parser = argparse.ArgumentParser() - parser.add_argument("-c", "--cache-db") - parser.add_argument("-n", "--network", default="bitcoin") - return parser.parse_args() - - -ARGS = parse_args() - - -class Env: - def __init__(self, genesis_header): - self.genesis_hash = hash_to_hex_str(merkle.double_sha256(genesis_header)) - self.max_recv = 10**7 - self.max_send = 10**7 - self.donation_address = None - - -VERSION = os.environ.get("ELECTRUM_VERSION", "electrs/0.999") -HOST = os.environ.get("ELECTRUM_HOST", "localhost") -PORT = int(os.environ.get("ELECTRUM_PORT", 50001)) -ZMQ_ADDR = os.environ.get("ZMQ_ADDR") - -(DEFAULT_PORT, DEFAULT_DIR) = { - "bitcoin": (8332, "~/.bitcoin"), - "signet": (38332, "~/.bitcoin/signet"), - "testnet4": (48332, "~/.bitcoin/testnet4"), -}[ARGS.network] - -BITCOIND_URL = os.environ.get("BITCOIND_URL", f"http://localhost:{DEFAULT_PORT}") -BITCOIND_COOKIE_PATH = Path( - os.environ.get("BITCOIND_COOKIE_PATH", f"{DEFAULT_DIR}/.cookie") -).expanduser() - - -class DummyContext: - def __init__(self, session): - self.session = session - - async def __aenter__(self): - return self.session - - async def __aexit__(self, exc_type, exc, tb): - pass - - -class NotFound(Exception): - pass - - -class UnavailableDaemon(Exception): - pass - - -LOG = logging.getLogger() - - -class HttpClient: - def __init__(self, client_session: aiohttp.ClientSession): - self.session = client_session - - async def rest_get(self, path, f) -> dict: - for _ in range(60): - try: - async with self.session.get(f"{BITCOIND_URL}/rest/{path}") as response: - if response.status == 503: - LOG.warning("bitcoind is unavailable: %s", response) - time.sleep(1) - continue - if response.status == 404: - raise NotFound(response.url) - response.raise_for_status() - return await f(response) - except ClientError as e: - LOG.warning("%s", e) - time.sleep(1) - continue - raise UnavailableDaemon() - - async def json_rpc(self, method, *params) -> dict: - for _ in range(60): - if not BITCOIND_COOKIE_PATH.exists(): - LOG.warning("%s is missing", BITCOIND_COOKIE_PATH) - continue - - headers = { - "Authorization": f"Basic {base64.b64encode(BITCOIND_COOKIE_PATH.read_bytes()).decode()}", - } - data = json.dumps( - {"method": method, "params": params, "id": 0, "jsonrpc": "2.0"} - ) - async with self.session.post( - BITCOIND_URL, headers=headers, data=data - ) as response: - if response.status == 503: - LOG.warning("bitcoind is unavailable: %s", response) - time.sleep(1) - continue - response.raise_for_status() - json_obj = await response.json() - if err := json_obj.get("error"): - raise RPCError(DAEMON_ERROR, err) - return json_obj["result"] - - response.raise_for_status() - raise UnavailableDaemon() - - -class MissingPrevout(Exception): - pass - - -@dataclass -class MempoolEntry: - tx: dict - scripthashes: set[bytes] # input & output scripthashes - fee: int # in sats (may be inexact) - - -@dataclass -class MempoolUpdate: - scripthashes: set[bytes] - new_tip: bool - - -class Mempool: - def __init__(self, http: HttpClient): - self.zmq_messages = asyncio.Queue() # ZMQ message queue - self.tx_entries = {} # txid->MempoolEntry - self.by_scripthash: dict[bytes, set[str]] = {} # hashX -> set[txid_hex] - self.next_seq = None - self.http = http - - async def try_get_tx(self, txid_hex: str) -> dict | None: - path = f"tx/{txid_hex}.json" - try: - return await self.http.rest_get(path, lambda r: r.json()) - except NotFound: - return None - - async def try_get_utxo(self, txid_hex: str, vout: int) -> dict | None: - path = f"getutxos/{txid_hex}-{vout}.json" - try: - return await self.http.rest_get(path, lambda r: r.json()) - except NotFound: - return None - - async def _get_entry(self, tx: dict) -> MempoolEntry: - spks_hex = [txo["scriptPubKey"]["hex"] for txo in tx["vout"]] - fee = -sum(txo["value"] for txo in tx["vout"]) - - for txi in tx["vin"]: - prev_txid = txi["txid"] - prev_vout = txi["vout"] - entry = self.tx_entries.get(prev_txid) - if entry is not None: - txo = entry.tx["vout"][prev_vout] - else: - res = await self.try_get_utxo(prev_txid, prev_vout) - utxos = res and res["utxos"] - if not utxos: - # probably a new/stale block - raise MissingPrevout(prev_txid, prev_vout) - txo = utxos[0] - - spks_hex.append(txo["scriptPubKey"]["hex"]) - fee += txo["value"] - - scripthashes = set( - (sha256(bytes.fromhex(spk_hex)).digest() for spk_hex in spks_hex) - ) - fee = round(fee * 1e8) # good enough for fee estimation - assert fee >= 0 - return MempoolEntry(tx, scripthashes, fee) - - async def add_tx(self, txid_hex: str, tx, scripthashes: set[bytes]): - assert txid_hex == tx["txid"] - if txid_hex in self.tx_entries: - LOG.warning("add: tx %s already exists", txid_hex) - return - - entry = await self._get_entry(tx) - - # collect input and output scripthashes for the added tx - scripthashes.update(entry.scripthashes) - - for scripthash in entry.scripthashes: - self.by_scripthash.setdefault(scripthash, set()).add(txid_hex) - self.tx_entries[txid_hex] = entry - - def remove_tx(self, txid_hex: str, scripthashes: set[bytes]): - entry = self.tx_entries.pop(txid_hex, None) - if entry is None: - LOG.warning("remove: tx %s not found", txid_hex) - return - - # collect input and output scripthashes for the removed tx - scripthashes.update(entry.scripthashes) - - for scripthash in entry.scripthashes: - txids = self.by_scripthash.get(scripthash) - if txids is not None: - txids.discard(txid_hex) - if not txids: - self.by_scripthash.pop(scripthash) - - def enqueue_message(self, *args): - self.zmq_messages.put_nowait(args) - - async def resync(self, scripthashes: set[bytes]): - t = time.time() - resp = await self.http.rest_get( - "mempool/contents.json?mempool_sequence=true&verbose=false", - lambda r: r.json(), - ) - new_txids = resp["txids"] - new_txids_set = set(new_txids) - old_txids_set = set(self.tx_entries) - for txid_hex in old_txids_set - new_txids_set: - # transaction is removed from mempool - self.remove_tx(txid_hex, scripthashes=scripthashes) - - next_report = time.time() + 1 # at most once per second - # iterate over new txids in original order - total_size = 0 - for i, txid_hex in enumerate(new_txids): - if txid_hex in old_txids_set: - continue - if time.time() > next_report: - LOG.info( - "fetched %d mempool txs [%.1f%%] %.3f MB", - i, - 100.0 * i / len(new_txids), - total_size / 1e6, - ) - next_report += 1 - if tx := await self.try_get_tx(txid_hex): - # transaction is added to mempool - await self.add_tx(txid_hex, tx, scripthashes=scripthashes) - total_size += tx["size"] - - # marks that mempool resync is over - self.next_seq = resp["mempool_sequence"] - LOG.info( - "fetched %d mempool txs, next_seq=%s (%.3fs)", - len(self.tx_entries), - self.next_seq, - time.time() - t, - ) - - def get_zmq_message(self) -> tuple | None: - try: - return self.zmq_messages.get_nowait() - except asyncio.QueueEmpty: - return None - - async def update(self) -> MempoolUpdate: - t = time.time() - result = MempoolUpdate(scripthashes=set(), new_tip=False) - - # Handle a batch of ZMQ messages (without blocking) - # If a block is found, drop all previous mempool events (since we'll resync mempool anyway) - messages = [] - for event, hash_hex, mempool_seq in iter(self.get_zmq_message, None): - if event in (Event.BLOCK_CONNECT, Event.BLOCK_DISCONNECT): - LOG.info("block %s event [%s]", hash_hex, chr(event)) - # resync mempool after new/stale block - self.next_seq = None - # will trigger `bindex` update and lookup - result.new_tip = True - messages.clear() - else: - assert mempool_seq is not None - messages.append((event, hash_hex, mempool_seq)) - - if self.next_seq is None and ZMQ_ADDR: - # We are out of sync - resync mempool transactions (using REST API) - await self.resync(scripthashes=result.scripthashes) - - stats = defaultdict(int) - for event, hash_hex, mempool_seq in messages: - if mempool_seq < self.next_seq: - continue - if mempool_seq > self.next_seq: - LOG.warning("skipped seq: %d > %d", mempool_seq, self.next_seq) - - if event is Event.TX_ADD: - if tx := await self.try_get_tx(hash_hex): - await self.add_tx( - hash_hex, - tx, - scripthashes=result.scripthashes, - ) - elif event is Event.TX_REMOVE: - self.remove_tx(hash_hex, scripthashes=result.scripthashes) - else: - raise NotImplementedError(event) - - LOG.debug("%s @ %d (%s)", hash_hex, mempool_seq, event) - self.next_seq = mempool_seq + 1 - stats[event] += 1 - - LOG.info( - "handled %d events: %d txs, seq=%s (%.3fs) %s", - len(messages), - len(self.tx_entries), - self.next_seq, - time.time() - t, - ", ".join(f"{chr(k)}={v}" for k, v in stats.items()), - ) - - return result - - -class Manager: - def __init__(self, http: HttpClient): - self.db = sqlite3.connect(ARGS.cache_db) - self.merkle = merkle.Merkle() - self.subscription_queue = asyncio.Queue() - self.mempool = Mempool(http) - self.http = http - self.sessions: set[ElectrumSession] = set() - - async def notify_sessions(self): - for session in self.sessions: - try: - await session.notify() - except Exception: - LOG.exception("failed to notify session #%d", session.session_id) - - async def latest_header(self) -> dict: - j = await self.chaininfo() - height = j["blocks"] - raw = await self.raw_header(height) - return {"hex": raw.hex(), "height": height} - - async def chaininfo(self): - return await self.http.rest_get("chaininfo.json", lambda r: r.json()) - - async def get_history(self, hashx: bytes) -> list[dict]: - query = """ -SELECT DISTINCT - t.tx_id, - t.block_height -FROM - transactions t, - history h -WHERE - t.block_offset = h.block_offset AND - t.block_height = h.block_height AND - h.script_hash = ? -ORDER BY - t.block_height ASC, - t.block_offset ASC -""" - result = [] - - confirmed = ( - (hash_to_hex_str(txid), height) - for txid, height in self.db.execute(query, [hashx]).fetchall() - ) - for tx_hash, height in confirmed: - result.append({"tx_hash": tx_hash, "height": height}) - - unconfirmed = ( - (txid_hex, self.mempool.tx_entries[txid_hex].fee) - for txid_hex in self.mempool.by_scripthash.get(hashx, ()) - ) - for tx_hash, fee in sorted(unconfirmed): - result.append({"tx_hash": tx_hash, "height": 0, "fee": fee}) - - return result - - async def subscribe(self, hashX: bytes): - event = asyncio.Event() - await self.subscription_queue.put((hashX, event.set)) - await event.wait() - - async def _merkle_branch(self, height, tx_hashes, tx_pos): - branch, _root = self.merkle.branch_and_root(tx_hashes, tx_pos) - branch = [hash_to_hex_str(hash) for hash in branch] - return branch - - async def merkle_branch_for_tx_hash(self, height, tx_hash): - """Return a triple (branch, tx_pos).""" - tx_hashes = await self.tx_hashes_at_blockheight(height) - try: - tx_pos = tx_hashes.index(tx_hash) - except ValueError: - raise RPCError( - BAD_REQUEST, - f"tx {hash_to_hex_str(tx_hash)} not in block at height {height:,d}", - ) - branch = await self._merkle_branch(height, tx_hashes, tx_pos) - return branch, tx_pos - - async def tx_hashes_at_blockheight(self, height): - """Returns a pair (tx_hashes). - - tx_hashes is an ordered list of binary hashes. Raises RPCError. - """ - - h = await self.http.rest_get( - f"blockhashbyheight/{height}.hex", lambda r: r.text() - ) - j = await self.http.rest_get(f"block/notxdetails/{h}.json", lambda r: r.json()) - tx_hashes = [hex_str_to_hash(h) for h in j["tx"]] - return tx_hashes - - async def getrawtransaction(self, tx_hash, verbose) -> str: - assert verbose is False - rows = self.db.execute( - "SELECT tx_bytes FROM transactions WHERE tx_id = ?", [tx_hash] - ).fetchall() - if len(rows) == 1: - return rows[0][0].hex() - - if entry := self.mempool.tx_entries.get(hash_to_hex_str(tx_hash)): - return entry.tx["hex"] - - raise RPCError(BAD_REQUEST, f"{tx_hash.hex()} not found") - - async def raw_header(self, height: int): - raw, _ = await self.raw_headers(height, count=1) - return raw - - async def raw_headers(self, height: int, count: int): - chunks = [] - while count > 0: - h = await self.http.rest_get( - f"blockhashbyheight/{height}.hex", lambda r: r.text() - ) - chunk_size = min(count, MAX_CHUNK_SIZE // 2) - chunk = await self.http.rest_get( - f"headers/{chunk_size}/{h}.bin", lambda r: r.read() - ) - assert len(chunk) % 80 == 0 - chunk_size = len(chunk) // 80 - assert chunk_size <= count - chunks.append(chunk) - height += chunk_size - count -= chunk_size - - raw = b"".join(chunks) - return raw, len(raw) // 80 - - -HASHX_LEN = 32 - - -hex_to_bytes = bytes.fromhex - - -def hash_to_hex_str(x): - """Convert a big-endian binary hash to displayed hex string. - - Display form of a binary hash is reversed and converted to hex. - """ - return bytes(reversed(x)).hex() - - -def hex_str_to_hash(x: str) -> bytes: - """Convert a displayed hex string to a binary hash.""" - return bytes(reversed(hex_to_bytes(x))) - - -def scripthash_to_hashX(scripthash: str): - try: - bin_hash = hex_str_to_hash(scripthash) - if len(bin_hash) == 32: - return bin_hash[:HASHX_LEN] - except (ValueError, TypeError): - pass - raise RPCError(BAD_REQUEST, f"{scripthash} is not a valid script hash") - - -def non_negative_integer(value): - """Return param value it is or can be converted to a non-negative - integer, otherwise raise an RPCError.""" - try: - value = int(value) - if value >= 0: - return value - except (ValueError, TypeError): - pass - raise RPCError(BAD_REQUEST, f"{value} should be a non-negative integer") - - -def assert_boolean(value): - """Return param value it is boolean otherwise raise an RPCError.""" - if value in (False, True): - return value - raise RPCError(BAD_REQUEST, f"{value} should be a boolean value") - - -def assert_tx_hash(value): - """Raise an RPCError if the value is not a valid hexadecimal transaction hash. - - If it is valid, return it as 32-byte binary hash. - """ - try: - raw_hash = hex_str_to_hash(value) - if len(raw_hash) == 32: - return raw_hash - except (ValueError, TypeError): - pass - raise RPCError(BAD_REQUEST, f"{value} should be a transaction hash") - - -class SessionBase(RPCSession): - """Base class of ElectrumX JSON sessions. - - Each session runs its tasks in asynchronous parallelism with other - sessions. - """ - - log_me = False - initial_concurrent = 100 - - session_counter = itertools.count() - - def __init__( - self, - transport, - env, - ): - connection = JSONRPCConnection(JSONRPCAutoDetect) - super().__init__(transport, connection=connection) - self.env = env - self.txs_sent = 0 - self.session_id = None - self.session_id = next(self.session_counter) - self.logger = logging.getLogger() - self.request_handlers = {} - - def default_framer(self): - return NewlineFramer(max_size=self.env.max_recv) - - async def connection_lost(self): - """Handle client disconnection.""" - await super().connection_lost() - msg = "" - if self._incoming_concurrency.max_concurrent < self.initial_concurrent * 0.8: - msg += " whilst throttled" - if self.send_size >= 1_000_000: - msg += f". Sent {self.send_size:,d} bytes in {self.send_count:,d} messages" - if msg: - msg = "disconnected" + msg - self.logger.info(msg) - - async def handle_request(self, request): - """Handle an incoming request. ElectrumX doesn't receive - notifications from client sessions. - """ - if isinstance(request, Request): - handler = self.request_handlers.get(request.method) - else: - handler = None - - coro = handler_invocation(handler, request)() - return await coro - - -def version_string(ptuple): - """Convert a version tuple such as (1, 2) to "1.2". - There is always at least one dot, so (1, ) becomes "1.0".""" - while len(ptuple) < 2: - ptuple += (0,) - return ".".join(str(p) for p in ptuple) - - -class ElectrumSession(SessionBase): - """A TCP server that handles incoming Electrum connections.""" - - PROTOCOL_MIN = (1, 4) - PROTOCOL_MAX = (1, 4, 3) - - def __init__(self, *args, mgr: Manager, **kwargs): - super().__init__(*args, **kwargs) - self.manager = mgr - self.subscribe_headers: dict | None = None - self.connection.max_response_size = self.env.max_send - self.hashX_subs = {} - self.sv_seen = False - self.set_request_handlers() - self.is_peer = False - self.protocol_tuple = self.PROTOCOL_MIN - self.manager.sessions.add(self) - - async def connection_lost(self): - """Handle client disconnection.""" - await super().connection_lost() - self.manager.sessions.remove(self) - - @classmethod - def protocol_min_max_strings(cls): - return [version_string(ver) for ver in (cls.PROTOCOL_MIN, cls.PROTOCOL_MAX)] - - @classmethod - def server_features(cls, env): - """Return the server features dictionary.""" - min_str, max_str = cls.protocol_min_max_strings() - return { - "hosts": {}, - "pruning": None, - "server_version": VERSION, - "protocol_min": min_str, - "protocol_max": max_str, - "genesis_hash": env.genesis_hash, - "hash_function": "sha256", - "services": [], - } - - async def server_features_async(self): - return self.server_features(self.env) - - @classmethod - def server_version_args(cls): - return [VERSION, cls.protocol_min_max_strings()] - - def protocol_version_string(self): - return version_string(self.protocol_tuple) - - def unsubscribe_hashX(self, hashX): - return self.hashX_subs.pop(hashX, None) - - async def subscribe_headers_result(self) -> dict: - return await self.manager.latest_header() - - async def headers_subscribe(self): - self.subscribe_headers = await self.subscribe_headers_result() - return self.subscribe_headers - - async def add_peer(self, features): - pass - - async def peers_subscribe(self): - return [] - - async def address_status(self, hashX): - entries = await self.manager.get_history(hashX) - status = "".join(f"{e['tx_hash']}:{e['height']:d}:" for e in entries) - return merkle.sha256(status.encode()).hex() if status else None - - async def hashX_subscribe(self, hashX, alias): - await self.manager.subscribe(hashX) - - # Store the subscription only after address_status succeeds - result = await self.address_status(hashX) - self.hashX_subs[hashX] = result - return result - - async def notify(self): - if self.subscribe_headers is not None: - new_result = await self.subscribe_headers_result() - if self.subscribe_headers != new_result: - self.subscribe_headers = new_result - await self.send_notification( - "blockchain.headers.subscribe", (new_result,) - ) - - for hashX in list(self.hashX_subs): - new_status = await self.address_status(hashX) - status = self.hashX_subs[hashX] - if status != new_status: - self.hashX_subs[hashX] = new_status - await self.send_notification( - "blockchain.scripthash.subscribe", (hashX, new_status) - ) - - async def confirmed_history(self, hashX): - return await self.manager.get_history(hashX) - - async def scripthash_get_history(self, scripthash): - hashX = scripthash_to_hashX(scripthash) - return await self.confirmed_history(hashX) - - async def scripthash_subscribe(self, scripthash): - hashX = scripthash_to_hashX(scripthash) - return await self.hashX_subscribe(hashX, scripthash) - - async def scripthash_unsubscribe(self, scripthash): - hashX = scripthash_to_hashX(scripthash) - return self.unsubscribe_hashX(hashX) is not None - - async def block_header(self, height): - height = non_negative_integer(height) - raw_header_hex = (await self.manager.raw_header(height)).hex() - return raw_header_hex - - async def block_headers(self, start_height, count): - start_height = non_negative_integer(start_height) - count = non_negative_integer(count) - - max_size = MAX_CHUNK_SIZE - count = min(count, max_size) - headers, count = await self.manager.raw_headers(start_height, count) - result = {"hex": headers.hex(), "count": count, "max": max_size} - return result - - async def donation_address(self): - return self.env.donation_address - - async def banner(self): - return "" - - async def relayfee(self): - return 0 - - async def estimatefee(self, number, mode=None): - return 0 - - async def ping(self): - return None - - async def server_version(self, client_name="", protocol_version=None): - return VERSION, self.protocol_version_string() - - async def transaction_get(self, tx_hash, verbose=False): - tx_hash = assert_tx_hash(tx_hash) - if verbose not in (True, False): - raise RPCError(BAD_REQUEST, '"verbose" must be a boolean') - - return await self.manager.getrawtransaction(tx_hash, verbose) - - async def transaction_merkle(self, tx_hash, height): - tx_hash = assert_tx_hash(tx_hash) - height = non_negative_integer(height) - - branch, tx_pos = await self.manager.merkle_branch_for_tx_hash(height, tx_hash) - return {"block_height": height, "merkle": branch, "pos": tx_pos} - - async def transaction_broadcast(self, tx_hex): - assert hex_to_bytes(tx_hex) - txid = await self.manager.http.json_rpc("sendrawtransaction", tx_hex) - assert_tx_hash(txid) - return txid - - async def compact_fee_histogram(self): - return [] - - def set_request_handlers(self): - handlers = { - "blockchain.block.header": self.block_header, - "blockchain.block.headers": self.block_headers, - "blockchain.estimatefee": self.estimatefee, - "blockchain.headers.subscribe": self.headers_subscribe, - "blockchain.relayfee": self.relayfee, - "blockchain.scripthash.get_history": self.scripthash_get_history, - "blockchain.scripthash.subscribe": self.scripthash_subscribe, - "blockchain.transaction.get": self.transaction_get, - "blockchain.transaction.get_merkle": self.transaction_merkle, - "blockchain.transaction.broadcast": self.transaction_broadcast, - "mempool.get_fee_histogram": self.compact_fee_histogram, - "server.add_peer": self.add_peer, - "server.banner": self.banner, - "server.donation_address": self.donation_address, - "server.features": self.server_features_async, - "server.peers.subscribe": self.peers_subscribe, - "server.ping": self.ping, - "server.version": self.server_version, - } - handlers["blockchain.scripthash.unsubscribe"] = self.scripthash_unsubscribe - - self.request_handlers = handlers - - -async def get_items( - q: asyncio.Queue, timeout=1.0 -) -> list[tuple[bytes, t.Callable[[], None]]]: - items = [] - try: - while True: - item = await asyncio.wait_for(q.get(), timeout) - items.append(item) - while not q.empty(): - items.append(q.get_nowait()) - # use a shorter timeout for coalescing subsequent subscriptions - timeout = 0.01 - except TimeoutError: - pass - return items - - -@contextlib.contextmanager -def transaction(c: sqlite3.Cursor): - # update bindex DB with the new scripthashes - c.execute("BEGIN") - try: - yield - c.execute("COMMIT") - except Exception: - c.execute("ROLLBACK") - raise - - -def update_scripthashes(c: sqlite3.Cursor, scripthashes): - # update bindex DB with the new scripthashes - with transaction(c): - r = c.executemany( - "INSERT OR IGNORE INTO watch (script_hash) VALUES (?)", - [(s,) for s in scripthashes], - ) - if r.rowcount: - LOG.info("watching %d new addresses", r.rowcount) - - -def get_scripthashes(c: sqlite3.Cursor) -> set[bytes]: - c.execute("SELECT script_hash FROM watch") - return set(s for (s,) in c.fetchall()) - - -class Indexer: - """Handle `bindex` connection.""" - - def __init__(self): - self.tip = None - self._loop = asyncio.get_running_loop() - - async def _read_tip(self) -> str: - """Wait for `bindex` to finish current indexing iteration, and return current chain tip hash.""" - line = await self._loop.run_in_executor(None, sys.stdin.readline) - return line.strip() - - async def _notify_bindex(self): - """Notify `bindex` to run another indexing iteration.""" - - def write_fn(): - sys.stdout.write("\n") - sys.stdout.flush() - - await self._loop.run_in_executor(None, write_fn) - - @classmethod - async def start(cls) -> "Indexer": - i = Indexer() - i.tip = await i._read_tip() # wait for initial index sync - LOG.info("indexer at block=%r", i.tip) - return i - - async def sync(self) -> bool: - prev_tip = self.tip - # update `bindex` (start an indexing iteration) - await self._notify_bindex() - self.tip = await self._read_tip() # wait for the indexing iteration to finish - LOG.debug("indexer at block=%r", self.tip) - return prev_tip != self.tip - - -async def subscription_task(mgr: Manager, indexer: Indexer): - try: - # sending new scripthashes on subscription requests - while True: - reqs = await get_items(mgr.subscription_queue) - update_scripthashes(mgr.db.cursor(), scripthashes=[s for s, _ in reqs]) - - # update `bindex` - chain_updated = await indexer.sync() - - # update mempool via ZMQ notifications (or resync, if needed) - mempool_update = await mgr.mempool.update() - - if reqs or chain_updated: - LOG.info("indexer at block=%r: %d reqs", indexer.tip, len(reqs)) - - # mark subscription requests as done - for _, fn in reqs: - fn() - - # make sure all sessions are notified - watched = get_scripthashes(mgr.db.cursor()) - if chain_updated or mempool_update.scripthashes.intersection(watched): - await mgr.notify_sessions() - except Exception: - LOG.exception("sync_task() failed") - - -async def main(): - FMT = "[%(asctime)-27s %(levelname)-5s %(module)s] %(message)s" - logging.basicConfig(level="INFO", format=FMT) - - async with aiohttp.ClientSession() as session: - http = HttpClient(session) - - info = await http.rest_get("chaininfo.json", lambda r: r.json()) - chain = info["chain"] - blocks = info["blocks"] - logging.info("Electrum server '%s' @ %s:%s", VERSION, HOST, PORT) - logging.info("Bitcoin Core %s @ %s, %d blocks", chain, BITCOIND_URL, blocks) - - info = await http.json_rpc("getblockchaininfo") - assert info["chain"] == chain - - indexer = await Indexer.start() # wait for initial index sync - mgr = Manager(http) - env = Env(await mgr.raw_header(0)) - cls = functools.partial(ElectrumSession, env=env, mgr=mgr) - await serve_rs(cls, host=HOST, port=PORT) - async with TaskGroup() as g: - await g.spawn(subscription_task(mgr, indexer)) - if ZMQ_ADDR: - # calls Mempool.notify() when ZMQ message is received - await g.spawn(subscribe_events(ZMQ_ADDR, mgr.mempool.enqueue_message)) - else: - logging.warning("Set ZMQ_ADDR to sync mempool") - await g.join() - - -if __name__ == "__main__": - asyncio.run(main())