Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ and this library adheres to Rust's notion of

## Unreleased
- [943] Zallet regtest fixes
- [1065] Move functionality to BlockChainSource: t-address rpcs

### Added
### Added
- `zaino-state::chain_index::source::BlockchainSource` and
`zaino-state::chain_index::ChainIndex` now expose transparent-address query
methods for deltas, balances, txids, and UTXOs.
### Changed
- `JsonRpSeeConnector::get_tree_state` now returns a `GetTreestateResponse`
whose `sapling` and `orchard` fields are optional. In regtest mode, these
Expand Down
24 changes: 24 additions & 0 deletions packages/zaino-fetch/src/jsonrpsee/response/address_deltas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,30 @@ pub struct AddressDelta {
}

impl AddressDelta {
/// Creates a transparent address delta from already-indexed address-history data.
///
/// This is used by backing stores and test sources that already know the
/// address, value, transaction location, and input/output index. It avoids
/// forcing those implementations to build partial `TransactionObject`
/// values solely to immediately decompose them again.
pub fn new(
satoshis: i64,
txid: String,
index: u32,
height: u32,
address: String,
block_index: Option<u32>,
) -> Self {
Self {
satoshis,
txid,
index,
height,
address,
block_index,
}
}

/// Create a delta from a transaction input (spend - negative value)
pub fn from_input(
input: &Input,
Expand Down
16 changes: 16 additions & 0 deletions packages/zaino-state/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ and this library adheres to Rust's notion of
- `BlockIndex::try_from_wire(proto::BlockId) -> Result<Self, WireBlockIdError>`
- new error enum `WireBlockIdError` (`HashWrongLength`, `HeightOverflow`)
- `local_cache::compact_block_with_pool_types`
- `source::BlockchainSource` and implementors now expose transparent-address
methods:
- `get_address_deltas`
- `get_address_balance`
- `get_address_txids`
- `get_address_utxos`
- `ChainIndex` and `NodeBackedChainIndexSubscriber` now expose transparent-address
query methods:
- `get_address_deltas`
- `get_address_balance`
- `get_address_txids`
- `get_address_utxos`
### Changed
- `get_mempool_tx` now takes `GetMempoolTxRequest` as parameter
- `chain_index::finalised_state`
Expand All @@ -54,6 +66,10 @@ and this library adheres to Rust's notion of
`chain_index::types::BlockIndex` (was briefly `non_finalized_state::BlockIdent`
earlier in the same unreleased cycle); its inner field is now named `hash`
(previously `blockhash`), and it gains `Eq`/`Hash` derives.
- `FetchService` and `StateService` now serve the get_raw_transaction RPC through
`ChainIndex`.
- `FetchService` and `StateService` now serve the transparent-address RPCs through
`ChainIndex`.

### Deprecated
- `GetTaddressTxids` is replaced by `GetTaddressTransactions`
Expand Down
158 changes: 99 additions & 59 deletions packages/zaino-state/src/backends/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ use zebra_chain::{
use zebra_rpc::{
client::{
GetAddressBalanceRequest, GetSubtreesByIndexResponse, GetTreestateResponse,
ValidateAddressResponse,
TransactionObject, ValidateAddressResponse,
},
methods::{
AddressBalance, GetAddressTxIdsRequest, GetAddressUtxos, GetBlock, GetBlockHashResponse,
GetBlockchainInfoResponse, GetInfo, GetRawTransaction, SentTransactionHash,
ValidateAddresses as _,
},
};

Expand Down Expand Up @@ -267,7 +266,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
&self,
params: GetAddressDeltasParams,
) -> Result<GetAddressDeltasResponse, Self::Error> {
Ok(self.fetcher.get_address_deltas(params).await?)
Ok(self.indexer.get_address_deltas(params).await?)
}

/// Returns software information from the RPC server, as a [`GetInfo`] JSON struct.
Expand Down Expand Up @@ -367,25 +366,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
&self,
address_strings: GetAddressBalanceRequest,
) -> Result<AddressBalance, Self::Error> {
Ok(self
.fetcher
.get_address_balance(
address_strings
.valid_addresses()
.map_err(|error| {
#[allow(deprecated)]
FetchServiceError::RpcError(RpcError {
code: error.code() as i64,
message: "Invalid address provided".to_string(),
data: None,
})
})?
.into_iter()
.map(|address| address.to_string())
.collect(),
)
.await?
.into())
Ok(self.indexer.get_address_balance(address_strings).await?)
}

/// Sends the raw bytes of a signed transaction to the local node's mempool, if the transaction is valid.
Expand Down Expand Up @@ -695,11 +676,78 @@ impl ZcashIndexer for FetchServiceSubscriber {
txid_hex: String,
verbose: Option<u8>,
) -> Result<GetRawTransaction, Self::Error> {
Ok(self
.fetcher
.get_raw_transaction(txid_hex, verbose)
.await?
.into())
#[allow(deprecated)]
let txid = types::TransactionHash::from_hex(&txid_hex).map_err(|error| {
FetchServiceError::RpcError(RpcError::new_from_legacycode(
zebra_rpc::server::error::LegacyCode::InvalidAddressOrKey,
error.to_string(),
))
})?;

#[allow(deprecated)]
let not_found_error = || {
FetchServiceError::RpcError(RpcError::new_from_legacycode(
zebra_rpc::server::error::LegacyCode::InvalidAddressOrKey,
"No such mempool or main chain transaction",
))
};

let snapshot = self.indexer.snapshot_nonfinalized_state().await?;

let Some((serialized_transaction, _consensus_branch_id)) =
self.indexer.get_raw_transaction(&snapshot, &txid).await?
else {
return Err(not_found_error());
};

if verbose.is_none() {
return Ok(GetRawTransaction::Raw(
zebra_chain::transaction::SerializedTransaction::from(serialized_transaction),
));
}

let transaction = zebra_chain::transaction::Transaction::zcash_deserialize(
serialized_transaction.as_slice(),
)
.map_err(|_| not_found_error())?;

let (best_chain_location, _non_best_chain_locations) = self
.indexer
.get_transaction_status(&snapshot, &txid)
.await?;

let (height, confirmations, block_hash, in_best_chain) = match best_chain_location {
Some(types::BestChainLocation::Block(block_hash, height)) => {
let confirmations = snapshot
.max_serviceable_height()
.0
.saturating_sub(height.0)
.saturating_add(1);

(
Some(zebra_chain::block::Height::from(height)),
Some(confirmations),
Some(zebra_chain::block::Hash::from(block_hash)),
Some(true),
)
}
Some(types::BestChainLocation::Mempool(_height)) => (None, Some(0), None, Some(false)),
None => (None, None, None, Some(false)),
};

Ok(GetRawTransaction::Object(Box::new(
TransactionObject::from_transaction(
transaction.into(),
height,
confirmations,
#[allow(deprecated)]
&self.config.common.network.to_zebra_network(),
None,
block_hash,
in_best_chain,
zebra_chain::transaction::Hash::from(txid),
),
)))
}

async fn chain_height(&self) -> Result<Height, Self::Error> {
Expand Down Expand Up @@ -732,12 +780,13 @@ impl ZcashIndexer for FetchServiceSubscriber {
&self,
request: GetAddressTxIdsRequest,
) -> Result<Vec<String>, Self::Error> {
let (addresses, start, end) = request.into_parts();
Ok(self
.fetcher
.get_address_txids(addresses, start, end)
.indexer
.get_address_txids(request)
.await?
.transactions)
.into_iter()
.map(|transaction_hash| transaction_hash.to_rpc_hex())
.collect())
}

/// Returns all unspent outputs for a list of addresses.
Expand All @@ -758,27 +807,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
&self,
addresses: GetAddressBalanceRequest,
) -> Result<Vec<GetAddressUtxos>, Self::Error> {
Ok(self
.fetcher
.get_address_utxos(
addresses
.valid_addresses()
.map_err(|error| {
#[allow(deprecated)]
FetchServiceError::RpcError(RpcError {
code: error.code() as i64,
message: "Invalid address provided".to_string(),
data: None,
})
})?
.into_iter()
.map(|address| address.to_string())
.collect(),
)
.await?
.into_iter()
.map(|utxos| utxos.into())
.collect())
Ok(self.indexer.get_address_utxos(addresses).await?)
}

/// Returns the estimated network solutions per second based on the last n blocks.
Expand Down Expand Up @@ -1012,7 +1041,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {

let fetch_service_clone = self.clone();
let service_timeout = self.config.common.service.timeout;
let (channel_tx, channel_rx) = mpsc::channel(self.config.common.service.channel_size as usize);
let (channel_tx, channel_rx) =
mpsc::channel(self.config.common.service.channel_size as usize);
let snapshot = fetch_service_clone
.indexer
.snapshot_nonfinalized_state()
Expand Down Expand Up @@ -1144,7 +1174,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {

let fetch_service_clone = self.clone();
let service_timeout = self.config.common.service.timeout;
let (channel_tx, channel_rx) = mpsc::channel(self.config.common.service.channel_size as usize);
let (channel_tx, channel_rx) =
mpsc::channel(self.config.common.service.channel_size as usize);
let snapshot = fetch_service_clone
.indexer
.snapshot_nonfinalized_state()
Expand Down Expand Up @@ -1332,7 +1363,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {
let txids = self.get_taddress_txids_helper(request).await?;
let fetch_service_clone = self.clone();
let service_timeout = self.config.common.service.timeout;
let (transmitter, receiver) = mpsc::channel(self.config.common.service.channel_size as usize);
let (transmitter, receiver) =
mpsc::channel(self.config.common.service.channel_size as usize);
tokio::spawn(async move {
let timeout = timeout(
time::Duration::from_secs((service_timeout * 4) as u64),
Expand Down Expand Up @@ -1538,7 +1570,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {

let mempool = self.indexer.clone();
let service_timeout = self.config.common.service.timeout;
let (channel_tx, channel_rx) = mpsc::channel(self.config.common.service.channel_size as usize);
let (channel_tx, channel_rx) =
mpsc::channel(self.config.common.service.channel_size as usize);

tokio::spawn(async move {
let timeout = timeout(
Expand Down Expand Up @@ -1638,7 +1671,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {
async fn get_mempool_stream(&self) -> Result<RawTransactionStream, Self::Error> {
let indexer = self.indexer.clone();
let service_timeout = self.config.common.service.timeout;
let (channel_tx, channel_rx) = mpsc::channel(self.config.common.service.channel_size as usize);
let (channel_tx, channel_rx) =
mpsc::channel(self.config.common.service.channel_size as usize);
let snapshot = indexer.snapshot_nonfinalized_state().await?;
tokio::spawn(async move {
let timeout = timeout(
Expand Down Expand Up @@ -1733,7 +1767,12 @@ impl LightWalletIndexer for FetchServiceSubscriber {
.await?
.into_parts();
Ok(TreeState {
network: self.config.common.network.to_zebra_network().bip70_network_name(),
network: self
.config
.common
.network
.to_zebra_network()
.bip70_network_name(),
height: height.0 as u64,
hash: hash.to_string(),
time,
Expand Down Expand Up @@ -1826,7 +1865,8 @@ impl LightWalletIndexer for FetchServiceSubscriber {
let taddrs = GetAddressBalanceRequest::new(request.addresses);
let utxos = self.z_get_address_utxos(taddrs).await?;
let service_timeout = self.config.common.service.timeout;
let (channel_tx, channel_rx) = mpsc::channel(self.config.common.service.channel_size as usize);
let (channel_tx, channel_rx) =
mpsc::channel(self.config.common.service.channel_size as usize);
tokio::spawn(async move {
let timeout = timeout(
time::Duration::from_secs((service_timeout * 4) as u64),
Expand Down
Loading
Loading