Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion integration-tests/tests/json_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,10 @@ async fn z_get_address_utxos_inner() {
mod zcashd {
use super::*;

#[allow(deprecated)]
pub(crate) mod zcash_indexer {
use zaino_state::LightWalletIndexer;
use zaino_fetch::jsonrpsee::response::block_header::GetBlockHeaderError;
use zaino_state::{FetchServiceError, LightWalletIndexer};
use zebra_rpc::methods::GetBlock;

use super::*;
Expand Down Expand Up @@ -865,6 +867,23 @@ mod zcashd {
.unwrap();
assert_eq!(zcashd_get_block_header, zainod_block_header_response);
}

// Try to fetch a non-existent block header
match zcashd_subscriber
.get_block_header(
"00000000008b4fdc4bae2868208f752526abfa441121cc0ac6526ddcb827befe".into(),
false,
)
.await
{
Err(FetchServiceError::RpcError(rpc_error)) => {
match GetBlockHeaderError::try_from(rpc_error) {
Ok(GetBlockHeaderError::MissingBlock { .. }) => (),
other => panic!("unexpected method error mapping: {:?}", other),
}
}
other => panic!("unexpected top-level error: {other:?}"),
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
Expand Down
88 changes: 64 additions & 24 deletions zaino-fetch/src/jsonrpsee/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use base64::{engine::general_purpose, Engine};
use http::Uri;
use reqwest::{Client, ClientBuilder, Url};
use serde::de::Error;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::{
Expand Down Expand Up @@ -101,6 +102,12 @@ impl RpcError {
}
}

impl From<Infallible> for RpcError {
fn from(x: Infallible) -> Self {
match x {} // Unreachable
}
}

impl fmt::Display for RpcError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "RPC Error (code: {}): {}", self.code, self.message)
Expand Down Expand Up @@ -287,7 +294,7 @@ impl JsonRpSeeConnector {
params: T,
) -> Result<R, RpcRequestError<R::RpcError>>
where
R::RpcError: Send + Sync + 'static,
R::RpcError: Send + Sync + 'static + std::fmt::Display,
{
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);

Expand Down Expand Up @@ -322,37 +329,70 @@ impl JsonRpSeeConnector {
continue;
}

// Try to parse a JSON-RPC envelope no matter what the HTTP status is.
Comment thread
dorianvp marked this conversation as resolved.
Outdated
if let Ok(rpc) = serde_json::from_slice::<RpcResponse<R>>(&body_bytes) {
// If the server sent a JSON-RPC error, surface it as a method error
if let Some(error) = rpc.error {
let raw_code = error.code;
let raw_msg = error.message.clone();

let mapped = R::RpcError::try_from(error)
.map_err(|e| RpcRequestError::UnexpectedErrorResponse(Box::new(e)))?;

// Log method errors
error!(
target: "zaino_fetch::jsonrpc",
method = %method,
id = id,
code = raw_code,
message = %raw_msg,
mapped_type = %type_name::<R::RpcError>(),
mapped = ?mapped,
params = tracing::field::debug(&params),
"JSON-RPC method error"
);

return Err(RpcRequestError::Method(mapped));
}

// If the server sent a JSON-RPC result, treat it as success
if let Some(result) = rpc.result {
match result.to_error() {
Ok(r) => return Ok(r),
Err(e) => {
error!(
target: "zaino_fetch::jsonrpc",
method = %method,
id = id,
mapped_type = %type_name::<R::RpcError>(),
mapped = ?e,
params = tracing::field::debug(&params),
"Domain-specific method error from result"
);
return Err(RpcRequestError::Method(e));
}
}
}

// Bad node response
return Err(RpcRequestError::Transport(
TransportError::EmptyResponseBody,
));
}

// Fall back to HTTP semantics at this point
Comment thread
dorianvp marked this conversation as resolved.
Outdated
let code = status.as_u16();
return match code {
// Invalid
..100 | 600.. => Err(RpcRequestError::Transport(
TransportError::InvalidStatusCode(code),
)),
// Informational | Redirection
100..200 | 300..400 => Err(RpcRequestError::Transport(
TransportError::UnexpectedStatusCode(code),
)),
// Success
200..300 => {
let response: RpcResponse<R> = serde_json::from_slice(&body_bytes)
.map_err(|e| TransportError::BadNodeData(Box::new(e), type_name::<R>()))?;

match (response.error, response.result) {
(Some(error), _) => Err(RpcRequestError::Method(
R::RpcError::try_from(error).map_err(|e| {
RpcRequestError::UnexpectedErrorResponse(Box::new(e))
})?,
)),
(None, Some(result)) => match result.to_error() {
Ok(r) => Ok(r),
Err(e) => Err(RpcRequestError::Method(e)),
},
(None, None) => Err(RpcRequestError::Transport(
TransportError::EmptyResponseBody,
)),
}
// Error
}
200..300 => Err(RpcRequestError::Transport(TransportError::BadNodeData(
Box::new(serde_json::Error::custom("non-JSON response with 2xx")),
type_name::<R>(),
))),
400..600 => Err(RpcRequestError::Transport(TransportError::ErrorStatusCode(
code,
))),
Expand Down
36 changes: 28 additions & 8 deletions zaino-fetch/src/jsonrpsee/response/block_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@
InvalidVerbosity(i8),

/// The requested block hash or height could not be found
#[error("Block not found: {0}")]
MissingBlock(String),
#[error("Block not found: {message}")]
MissingBlock {
code: i64, // TODO: -5 (hash) or -8 (height)

Check failure on line 34 in zaino-fetch/src/jsonrpsee/response/block_header.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

error: missing documentation for a struct field --> zaino-fetch/src/jsonrpsee/response/block_header.rs:34:9 | 34 | code: i64, // TODO: -5 (hash) or -8 (height) | ^^^^^^^^^ | = note: `-D missing-docs` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(missing_docs)]`

Check failure on line 34 in zaino-fetch/src/jsonrpsee/response/block_header.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

error: missing documentation for a struct field --> zaino-fetch/src/jsonrpsee/response/block_header.rs:34:9 | 34 | code: i64, // TODO: -5 (hash) or -8 (height) | ^^^^^^^^^ | = note: `-D missing-docs` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(missing_docs)]`
message: String,

Check failure on line 35 in zaino-fetch/src/jsonrpsee/response/block_header.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

error: missing documentation for a struct field --> zaino-fetch/src/jsonrpsee/response/block_header.rs:35:9 | 35 | message: String, | ^^^^^^^^^^^^^^^

Check failure on line 35 in zaino-fetch/src/jsonrpsee/response/block_header.rs

View workflow job for this annotation

GitHub Actions / clippy

missing documentation for a struct field

error: missing documentation for a struct field --> zaino-fetch/src/jsonrpsee/response/block_header.rs:35:9 | 35 | message: String, | ^^^^^^^^^^^^^^^
},
}

/// Verbose response to a `getblockheader` RPC request.
Expand Down Expand Up @@ -121,12 +124,29 @@
type Error = RpcError;

fn try_from(value: RpcError) -> Result<Self, Self::Error> {
// If the block is not in Zebra's state, returns
// [error code `-8`.](https://github.com/zcash/zcash/issues/5758)
if value.code == -8 {
Ok(Self::MissingBlock(value.message))
} else {
Err(value)
match value.code {
-5 | -8 => Ok(Self::MissingBlock {
code: value.code,
message: value.message,
}),
_ => Err(value),
}
}
}

impl From<GetBlockHeaderError> for RpcError {
fn from(e: GetBlockHeaderError) -> Self {
match e {
GetBlockHeaderError::MissingBlock { code, message } => RpcError {
code,
message,
data: None,
},
GetBlockHeaderError::InvalidVerbosity(v) => RpcError {
code: -32602, // TODO: Abstract JSON-RPC codes away
message: format!("Invalid verbosity: {v}"),
data: None,
},
}
}
}
Expand Down
25 changes: 16 additions & 9 deletions zaino-state/src/backends/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
&self,
params: GetAddressDeltasParams,
) -> Result<GetAddressDeltasResponse, Self::Error> {
Ok(self.fetcher.get_address_deltas(params).await?)
Ok(self.fetcher.get_address_deltas(params).await.unwrap())
}

/// Returns software information from the RPC server, as a [`GetInfo`] JSON struct.
Expand Down Expand Up @@ -352,7 +352,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
data: None,
})
})?)
.await?
.await
.unwrap()
.into())
}

Expand All @@ -378,7 +379,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
Ok(self
.fetcher
.send_raw_transaction(raw_transaction_hex)
.await?
.await
.unwrap()
.into())
}

Expand Down Expand Up @@ -414,7 +416,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
Ok(self
.fetcher
.get_block(hash_or_height, verbosity)
.await?
.await
.unwrap()
.try_into()?)
}

Expand All @@ -426,7 +429,7 @@ impl ZcashIndexer for FetchServiceSubscriber {
///
/// Note: This method has only been implemented in `zcashd`. Zebra has no intention of supporting it.
async fn get_block_deltas(&self, hash: String) -> Result<BlockDeltas, Self::Error> {
Ok(self.fetcher.get_block_deltas(hash).await?)
Ok(self.fetcher.get_block_deltas(hash).await.unwrap())
}

async fn get_block_header(
Expand Down Expand Up @@ -527,7 +530,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
Ok(self
.fetcher
.get_treestate(hash_or_height)
.await?
.await
.unwrap()
.try_into()?)
}

Expand Down Expand Up @@ -558,7 +562,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
Ok(self
.fetcher
.get_subtrees_by_index(pool, start_index.0, limit.map(|limit_index| limit_index.0))
.await?
.await
.unwrap()
.into())
}

Expand Down Expand Up @@ -621,7 +626,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
Ok(self
.fetcher
.get_address_txids(addresses, start, end)
.await?
.await
.unwrap()
.transactions)
}

Expand Down Expand Up @@ -652,7 +658,8 @@ impl ZcashIndexer for FetchServiceSubscriber {
data: None,
})
})?)
.await?
.await
.unwrap()
.into_iter()
.map(|utxos| utxos.into())
.collect())
Expand Down
30 changes: 13 additions & 17 deletions zaino-state/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::BlockHash;

use std::{any::type_name, fmt::Display};

use zaino_fetch::jsonrpsee::connector::RpcRequestError;
use zaino_fetch::jsonrpsee::connector::{RpcError, RpcRequestError};

impl<T: ToString> From<RpcRequestError<T>> for StateServiceError {
fn from(value: RpcRequestError<T>) -> Self {
Expand Down Expand Up @@ -134,32 +134,27 @@ impl From<StateServiceError> for tonic::Status {
}
}

impl<T: ToString> From<RpcRequestError<T>> for FetchServiceError {
fn from(value: RpcRequestError<T>) -> Self {
impl<E> From<RpcRequestError<E>> for FetchServiceError
where
E: Into<RpcError>,
{
fn from(value: RpcRequestError<E>) -> Self {
match value {
RpcRequestError::Method(e) => FetchServiceError::RpcError(e.into()),
RpcRequestError::Transport(transport_error) => {
FetchServiceError::JsonRpcConnectorError(transport_error)
}
RpcRequestError::JsonRpc(error) => {
FetchServiceError::Critical(format!("argument failed to serialze: {error}"))
}
RpcRequestError::InternalUnrecoverable(e) => {
FetchServiceError::Critical(format!("Internal unrecoverable error: {e}"))
FetchServiceError::Critical(format!("argument failed to serialize: {error}"))
}
RpcRequestError::ServerWorkQueueFull => FetchServiceError::Critical(
"Server queue full. Handling for this not yet implemented".to_string(),
),
RpcRequestError::Method(e) => FetchServiceError::Critical(format!(
"unhandled rpc-specific {} error: {}",
type_name::<T>(),
e.to_string()
)),
RpcRequestError::UnexpectedErrorResponse(error) => {
FetchServiceError::Critical(format!(
"unhandled rpc-specific {} error: {}",
type_name::<T>(),
error
))
FetchServiceError::Critical(format!("unexpected rpc error response: {error}"))
}
RpcRequestError::InternalUnrecoverable(e) => {
FetchServiceError::Critical(format!("Internal unrecoverable error: {e}"))
}
}
}
Expand Down Expand Up @@ -221,6 +216,7 @@ impl From<FetchServiceError> for tonic::Status {
}
}
}

/// These aren't the best conversions, but the MempoolError should go away
/// in favor of a new type with the new chain cache is complete
impl<T: ToString> From<RpcRequestError<T>> for MempoolError {
Expand Down
Loading