Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 1 addition & 132 deletions crates/monitor/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ use std::{
time::Duration,
};

use anyhow::anyhow;
use indexer_query::escrow_account::{self, EscrowAccountQuery};
use thegraph_core::alloy::primitives::{Address, U256};
use thiserror::Error;
use tokio::sync::watch::Receiver;
Expand Down Expand Up @@ -111,18 +109,6 @@ pub fn empty_escrow_accounts_watcher() -> EscrowAccountsWatcher {
receiver
}

pub async fn escrow_accounts_v1(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Result<EscrowAccountsWatcher, anyhow::Error> {
indexer_watcher::new_watcher(interval, move || {
get_escrow_accounts_v1(escrow_subgraph, indexer_address, reject_thawing_signers)
})
.await
}

pub async fn escrow_accounts_v2(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
Expand Down Expand Up @@ -366,76 +352,6 @@ async fn get_escrow_accounts_v2(
Ok(escrow_accounts)
}

async fn get_escrow_accounts_v1(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
reject_thawing_signers: bool,
) -> anyhow::Result<EscrowAccounts> {
tracing::debug!(?indexer_address, "Loading V1 escrow accounts for indexer");

// thawEndTimestamp == 0 means that the signer is not thawing. This also means
// that we don't wait for the thawing period to end before stopping serving
// queries for this signer.
// isAuthorized == true means that the signer is still authorized to sign
// payments in the name of the sender.
let response = escrow_subgraph
.query::<EscrowAccountQuery, _>(escrow_account::Variables {
indexer: format!("{indexer_address:x?}"),
thaw_end_timestamp: if reject_thawing_signers {
U256::ZERO.to_string()
} else {
U256::MAX.to_string()
},
})
.await?;

let senders_balances: HashMap<Address, U256> = response
.escrow_accounts
.iter()
.map(|account| {
let balance = U256::checked_sub(
U256::from_str(&account.balance)?,
U256::from_str(&account.total_amount_thawing)?,
)
.unwrap_or_else(|| {
tracing::warn!(
sender = ?account.sender.id,
"Balance minus total amount thawing underflowed for account; setting balance to 0, no queries will be served for this sender."
);
U256::from(0)
});

Ok((Address::from_str(&account.sender.id)?, balance))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;

let senders_to_signers = response
.escrow_accounts
.into_iter()
.map(|account| {
let sender = Address::from_str(&account.sender.id)?;
let signers = account
.sender
.signers
.ok_or(anyhow!("Could not find any signers for sender {sender}"))?
.iter()
.map(|signer| Address::from_str(&signer.id))
.collect::<Result<Vec<_>, _>>()?;
Ok((sender, signers))
})
.collect::<Result<HashMap<_, _>, anyhow::Error>>()?;

let escrow_accounts = EscrowAccounts::new(senders_balances.clone(), senders_to_signers.clone());

tracing::debug!(
senders = senders_balances.len(),
mappings = escrow_accounts.signers_to_senders.len(),
"V1 escrow accounts loaded"
);

Ok(escrow_accounts)
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down Expand Up @@ -466,53 +382,6 @@ mod tests {
)
}

#[test(tokio::test)]
async fn test_current_accounts() {
// Set up a mock escrow subgraph
let mock_server = MockServer::start().await;
let escrow_subgraph = Box::leak(Box::new(
SubgraphClient::new(
reqwest::Client::new(),
None,
DeploymentDetails::for_query_url(&format!(
"{}/subgraphs/id/{}",
&mock_server.uri(),
test_assets::ESCROW_SUBGRAPH_DEPLOYMENT
))
.unwrap(),
)
.await,
));

let mock = Mock::given(method("POST"))
.and(path(format!(
"/subgraphs/id/{}",
test_assets::ESCROW_SUBGRAPH_DEPLOYMENT
)))
.respond_with(
ResponseTemplate::new(200)
.set_body_raw(test_assets::ESCROW_QUERY_RESPONSE, "application/json"),
);
mock_server.register(mock).await;

let mut accounts = escrow_accounts_v1(
escrow_subgraph,
test_assets::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
)
.await
.unwrap();
accounts.changed().await.unwrap();
assert_eq!(
accounts.borrow().clone(),
EscrowAccounts::new(
ESCROW_ACCOUNTS_BALANCES.to_owned(),
ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
)
);
}

#[test(tokio::test)]
async fn test_current_accounts_v2() {
// Set up a mock escrow subgraph for V2 with payer fields
Expand Down Expand Up @@ -556,7 +425,7 @@ mod tests {
.unwrap();
accounts.changed().await.unwrap();

// V2 should produce identical results to V1 since they query the same data
// Verify V2 escrow accounts are loaded correctly
assert_eq!(
accounts.borrow().clone(),
EscrowAccounts::new(
Expand Down
52 changes: 0 additions & 52 deletions crates/monitor/src/horizon_detection.rs

This file was deleted.

6 changes: 2 additions & 4 deletions crates/monitor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ mod deployment_to_allocation;
mod dispute_manager;
mod escrow_accounts;
mod freshness;
mod horizon_detection;

pub use crate::{
allocations::{indexer_allocations, AllocationWatcher},
Expand All @@ -17,8 +16,7 @@ pub use crate::{
deployment_to_allocation::{deployment_to_allocation, DeploymentToAllocationWatcher},
dispute_manager::{dispute_manager, DisputeManagerWatcher},
escrow_accounts::{
empty_escrow_accounts_watcher, escrow_accounts_v1, escrow_accounts_v2, EscrowAccounts,
EscrowAccountsError, EscrowAccountsWatcher,
empty_escrow_accounts_watcher, escrow_accounts_v2, EscrowAccounts, EscrowAccountsError,
EscrowAccountsWatcher,
},
horizon_detection::is_horizon_active,
};
24 changes: 0 additions & 24 deletions crates/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,29 +104,6 @@ pub async fn run() -> anyhow::Result<()> {
// V2 escrow accounts (used by DIPS) are in the network subgraph
let escrow_v2_query_url_for_dips = config.subgraphs.network.config.query_url.clone();

// Verify Horizon contracts are active in the network subgraph
tracing::info!("Checking network subgraph readiness for Horizon mode");
match indexer_monitor::is_horizon_active(network_subgraph).await {
Ok(true) => {
tracing::info!("Horizon contracts detected in network subgraph");
}
Ok(false) => {
anyhow::bail!(
"Horizon mode is required, but the Network Subgraph indicates Horizon is not active (no PaymentsEscrow accounts found). \
Ensure Horizon contracts are deployed and the Network Subgraph is updated before starting the indexer service."
);
}
Err(e) => {
anyhow::bail!(
"Failed to detect Horizon contracts due to network/subgraph error: {}. \
Cannot start with Horizon mode enabled when network status is unknown.",
e
);
}
}

tracing::info!("Horizon contracts detected - using Horizon (V2) mode");

let collector_address = config.blockchain.receipts_verifier_address_v2;
let escrow_min_balance_grt_wei = config.subgraphs.network.escrow_min_balance_grt_wei.clone();
let max_signers_per_payer = config.subgraphs.network.max_signers_per_payer;
Expand Down Expand Up @@ -198,7 +175,6 @@ pub async fn run() -> anyhow::Result<()> {
);

// TODO: Try to re-use the same watcher for both DIPS and TAP
// DIPS requires Horizon/V2, so always use V2 escrow from network subgraph
let dips_http_client = create_http_client(DIPS_HTTP_CLIENT_TIMEOUT, false)
.context("Failed to create DIPS HTTP client")?;

Expand Down
5 changes: 2 additions & 3 deletions crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
//!
//! ## Receipt Types
//!
//! The system supports two receipt versions:
//! - **V1 (Legacy)**: Allocation-based receipts tied to specific allocations
//! - **V2 (Horizon)**: Collection-based receipts using the Horizon payment contracts
//! The system uses Horizon (V2) receipts, which are collection-based receipts
//! using the Horizon payment contracts.
//!
//! ## Validation Checks
//!
Expand Down
2 changes: 1 addition & 1 deletion crates/service/src/tap/receipt_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ mod tests {
#[case(ProcessedReceipt::None, async { vec![] })]
#[case(ProcessedReceipt::V2, async { vec![create_v2().await] })]
#[tokio::test]
async fn v1_and_v2_are_processed_successfully(
async fn receipts_are_processed_successfully(
#[case] expected: ProcessedReceipt,
#[future(awt)]
#[case]
Expand Down
2 changes: 1 addition & 1 deletion crates/service/tests/router_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async fn full_integration_test() {
));
mock_server.register(mock).await;

// Mock escrow subgraph (v1) and network subgraph (v2) redemption queries.
// Mock network subgraph redemption queries.
mock_server
.register(Mock::given(method("POST")).and(path("/")).respond_with(
ResponseTemplate::new(200).set_body_raw(
Expand Down
23 changes: 0 additions & 23 deletions crates/tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,29 +144,6 @@ pub async fn start_agent(
.await
.with_context(|| "Failed to initialize indexer_allocations watcher")?;

// Verify the network subgraph is ready for Horizon mode
tracing::info!("Checking network subgraph readiness for Horizon mode");
match indexer_monitor::is_horizon_active(network_subgraph).await {
Ok(true) => {
tracing::info!("Horizon schema available in network subgraph - enabling Horizon mode");
tracing::info!(
"V2 watcher will automatically detect new PaymentsEscrow accounts as they appear"
);
}
Ok(false) => {
anyhow::bail!(
"Horizon mode is required, but the Network Subgraph indicates Horizon is not active (no PaymentsEscrow accounts found). \
Ensure Horizon contracts are deployed and the Network Subgraph is updated before starting the TAP agent."
);
}
Err(e) => {
anyhow::bail!(
"Failed to detect Horizon contracts due to network/subgraph error: {}. Cannot start with Horizon enabled when network status is unknown.",
e
);
}
}

// Create V2 escrow accounts watcher
// V2 escrow accounts are in the network subgraph, not a separate TAP v2 subgraph
tracing::info!(
Expand Down
Loading