From d09dc97525e5fc77a6858a089050a78f91cbbd8f Mon Sep 17 00:00:00 2001 From: neriumpete Date: Fri, 13 Mar 2026 12:17:18 +0200 Subject: [PATCH] fix: add grace period for attestation signers and allocation eligibility checks --- crates/monitor/src/attestation.rs | 145 ++++++++++++++- .../src/middleware/attestation_signer.rs | 3 + crates/service/src/service/router.rs | 9 + crates/service/src/tap.rs | 9 +- .../src/tap/checks/allocation_eligible.rs | 176 +++++++++++++++++- 5 files changed, 329 insertions(+), 13 deletions(-) diff --git a/crates/monitor/src/attestation.rs b/crates/monitor/src/attestation.rs index 6fe704b6d..f2be04e42 100644 --- a/crates/monitor/src/attestation.rs +++ b/crates/monitor/src/attestation.rs @@ -4,6 +4,7 @@ use std::{ collections::HashMap, sync::{Arc, Mutex}, + time::{Duration, Instant}, }; use bip39::Mnemonic; @@ -22,14 +23,24 @@ pub type AttestationWatcher = Receiver>; /// /// This function accepts multiple mnemonics to support allocations created with different /// operator keys (e.g., after key rotation or migration). +/// +/// The `grace_period` parameter controls how long signers are retained after their allocation +/// disappears from the watch channel. This prevents premature eviction during network subgraph +/// polling gaps, ensuring queries with recently-closed allocation IDs can still be signed. pub fn attestation_signers( indexer_allocations_rx: AllocationWatcher, indexer_mnemonics: Vec, chain_id: ChainId, dispute_manager_rx: DisputeManagerWatcher, + grace_period: Duration, ) -> AttestationWatcher { let attestation_signers_map: &'static Mutex> = Box::leak(Box::new(Mutex::new(HashMap::new()))); + + // Tombstone map: records when each signer was first evicted + let evicted_at: &'static Mutex> = + Box::leak(Box::new(Mutex::new(HashMap::new()))); + let indexer_mnemonics: Arc<[String]> = indexer_mnemonics .iter() .map(|m| m.to_string()) @@ -45,6 +56,8 @@ pub fn attestation_signers( &indexer_mnemonics, chain_id, attestation_signers_map, + evicted_at, + grace_period, &allocation, &dispute, ) @@ -56,12 +69,34 @@ fn modify_signers( indexer_mnemonics: &[String], chain_id: ChainId, attestation_signers_map: &'static Mutex>, + evicted_at: &'static Mutex>, + grace_period: Duration, allocations: &HashMap, dispute_manager: &Address, ) -> HashMap { let mut signers = attestation_signers_map.lock().unwrap(); - // Remove signers for allocations that are no longer active or recently closed - signers.retain(|id, _| allocations.contains_key(id)); + let mut evicted = evicted_at.lock().unwrap(); + + // Retain signers that are active OR within the grace period post-eviction. + // This prevents premature eviction during network subgraph polling gaps. + signers.retain(|id, _| { + if allocations.contains_key(id) { + // Re-activated: clear tombstone + evicted.remove(id); + true + } else { + // Record eviction time on first absence; keep until grace period expires + let first_evicted = evicted.entry(*id).or_insert_with(Instant::now); + let keep = first_evicted.elapsed() < grace_period; + if !keep { + evicted.remove(id); // Clean up tombstone after expiry + } + keep + } + }); + + // Opportunistic tombstone cleanup to prevent unbounded growth + evicted.retain(|_, t| t.elapsed() < grace_period * 2); // Create signers for new allocations for (id, allocation) in allocations.iter() { @@ -123,11 +158,13 @@ mod tests { async fn test_attestation_signers_update_with_allocations() { let (allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS); + let grace_period = Duration::from_secs(3600); let mut signers = attestation_signers( allocations_rx, vec![INDEXER_MNEMONIC.clone()], 1, dispute_manager_rx, + grace_period, ); // Test that an empty set of allocations leads to an empty set of signers @@ -148,4 +185,108 @@ mod tests { .any(|allocation_id| signer_allocation_id == allocation_id)); } } + + #[tokio::test] + async fn test_signer_retained_within_grace_period() { + let (allocations_tx, allocations_rx) = watch::channel(HashMap::new()); + let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS); + // Use a long grace period to ensure signers are retained + let grace_period = Duration::from_secs(3600); + let mut signers = attestation_signers( + allocations_rx, + vec![INDEXER_MNEMONIC.clone()], + 1, + dispute_manager_rx, + grace_period, + ); + + // Add allocations + allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap(); + signers.changed().await.unwrap(); + let initial_count = signers.borrow().len(); + assert_eq!(initial_count, INDEXER_ALLOCATIONS.len()); + + // Remove all allocations (simulating network subgraph polling gap) + allocations_tx.send(HashMap::new()).unwrap(); + signers.changed().await.unwrap(); + + // Signers should be retained within grace period + let retained_signers = signers.borrow().clone(); + assert_eq!( + retained_signers.len(), + initial_count, + "Signers should be retained within grace period" + ); + } + + #[tokio::test] + async fn test_signer_evicted_after_grace_period() { + let (allocations_tx, allocations_rx) = watch::channel(HashMap::new()); + let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS); + // Use a very short grace period for testing + let grace_period = Duration::from_millis(10); + let mut signers = attestation_signers( + allocations_rx, + vec![INDEXER_MNEMONIC.clone()], + 1, + dispute_manager_rx, + grace_period, + ); + + // Add allocations + allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap(); + signers.changed().await.unwrap(); + assert!(!signers.borrow().is_empty()); + + // Remove all allocations + allocations_tx.send(HashMap::new()).unwrap(); + signers.changed().await.unwrap(); + + // Wait for grace period to expire + tokio::time::sleep(Duration::from_millis(50)).await; + + // Trigger another update to run the retain logic + allocations_tx.send(HashMap::new()).unwrap(); + signers.changed().await.unwrap(); + + // Signers should be evicted after grace period + assert!( + signers.borrow().is_empty(), + "Signers should be evicted after grace period expires" + ); + } + + #[tokio::test] + async fn test_reactivated_allocation_clears_tombstone() { + let (allocations_tx, allocations_rx) = watch::channel(HashMap::new()); + let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS); + let grace_period = Duration::from_secs(3600); + let mut signers = attestation_signers( + allocations_rx, + vec![INDEXER_MNEMONIC.clone()], + 1, + dispute_manager_rx, + grace_period, + ); + + // Add allocations + allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap(); + signers.changed().await.unwrap(); + let initial_count = signers.borrow().len(); + + // Remove allocations (starts grace period) + allocations_tx.send(HashMap::new()).unwrap(); + signers.changed().await.unwrap(); + + // Re-add allocations (should clear tombstone) + allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap(); + signers.changed().await.unwrap(); + + // Signers should still be present + assert_eq!( + signers.borrow().len(), + initial_count, + "Signers should be retained after re-activation" + ); + } } diff --git a/crates/service/src/middleware/attestation_signer.rs b/crates/service/src/middleware/attestation_signer.rs index 13e724004..cd4130de5 100644 --- a/crates/service/src/middleware/attestation_signer.rs +++ b/crates/service/src/middleware/attestation_signer.rs @@ -41,6 +41,8 @@ pub async fn signer_middleware( #[cfg(test)] mod tests { + use std::time::Duration; + use axum::{body::Body, http::Request, middleware::from_fn_with_state, routing::get, Router}; use indexer_attestation::AttestationSigner; use indexer_monitor::attestation_signers; @@ -65,6 +67,7 @@ mod tests { vec![INDEXER_MNEMONIC.clone()], 1, dispute_manager_rx, + Duration::from_secs(3600), ); let expected_signer = attestation_signers diff --git a/crates/service/src/service/router.rs b/crates/service/src/service/router.rs index daa2ffbfd..5fd783293 100644 --- a/crates/service/src/service/router.rs +++ b/crates/service/src/service/router.rs @@ -169,11 +169,19 @@ impl ServiceRouter { // Maintain an up-to-date set of attestation signers, one for each // allocation. Multiple mnemonics are tried to support allocations // created with different operator keys. + // The grace period prevents premature signer eviction during network + // subgraph polling gaps. + let attestation_signer_grace_period = self + .network_subgraph + .as_ref() + .map(|(_, config)| config.recently_closed_allocation_buffer_secs) + .unwrap_or(std::time::Duration::from_secs(3600)); let attestation_signers = attestation_signers( allocations.clone(), operator_mnemonics.clone(), self.blockchain.chain_id as u64, dispute_manager, + attestation_signer_grace_period, ); // Rate limits by allowing bursts of 10 requests and requiring 100ms of @@ -273,6 +281,7 @@ impl ServiceRouter { receipt_max_value, allowed_data_services, service_provider: self.indexer.indexer_address, + allocation_grace_period: attestation_signer_grace_period, }) .await; diff --git a/crates/service/src/tap.rs b/crates/service/src/tap.rs index fc9cd89a1..ed1c6a69d 100644 --- a/crates/service/src/tap.rs +++ b/crates/service/src/tap.rs @@ -95,6 +95,10 @@ pub struct TapChecksConfig { pub receipt_max_value: u128, pub allowed_data_services: Option>, pub service_provider: Address, + /// Grace period for allocation eligibility checks. + /// Receipts for allocations that were recently active will be accepted + /// within this period, bridging network subgraph polling gaps. + pub allocation_grace_period: Duration, } #[derive(Clone)] @@ -125,7 +129,10 @@ pub enum AdapterError { impl IndexerTapContext { pub async fn get_checks(config: TapChecksConfig) -> Vec> { let mut checks: Vec> = vec![ - Arc::new(AllocationEligible::new(config.indexer_allocations)), + Arc::new(AllocationEligible::with_grace_period( + config.indexer_allocations, + config.allocation_grace_period, + )), Arc::new(AllocationRedeemedCheck::new( config.indexer_address, config.network_subgraph, diff --git a/crates/service/src/tap/checks/allocation_eligible.rs b/crates/service/src/tap/checks/allocation_eligible.rs index 475ae57ac..0816ee9d9 100644 --- a/crates/service/src/tap/checks/allocation_eligible.rs +++ b/crates/service/src/tap/checks/allocation_eligible.rs @@ -1,27 +1,56 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; use anyhow::anyhow; use indexer_allocation::Allocation; use tap_core::receipt::checks::{Check, CheckError, CheckResult}; use thegraph_core::{alloy::primitives::Address, AllocationId, CollectionId}; -use tokio::sync::watch::Receiver; +use tokio::sync::{watch::Receiver, Mutex}; use crate::tap::{CheckingReceipt, TapReceipt}; +/// TAP receipt check that validates allocation eligibility. +/// +/// This check verifies that the allocation ID in a receipt corresponds to an +/// active or recently-active allocation for this indexer. A grace period is +/// applied to prevent rejecting receipts for allocations that have transiently +/// disappeared from the watch channel due to network subgraph polling gaps. pub struct AllocationEligible { indexer_allocations: Receiver>, + recently_seen: Arc>>, + grace_period: Duration, } impl AllocationEligible { + /// Creates a new `AllocationEligible` check with the default grace period (3600 seconds). pub fn new(indexer_allocations: Receiver>) -> Self { + // Backward-compatible default: matches recently_closed_allocation_buffer_secs default + Self::with_grace_period(indexer_allocations, Duration::from_secs(3600)) + } + + /// Creates a new `AllocationEligible` check with a custom grace period. + /// + /// The grace period determines how long an allocation remains eligible after + /// disappearing from the active allocations set. This bridges the gap between + /// the gateway's allocation discovery and the indexer's network subgraph polling. + pub fn with_grace_period( + indexer_allocations: Receiver>, + grace_period: Duration, + ) -> Self { Self { indexer_allocations, + recently_seen: Arc::new(Mutex::new(HashMap::new())), + grace_period, } } } + #[async_trait::async_trait] impl Check for AllocationEligible { async fn check( @@ -33,16 +62,143 @@ impl Check for AllocationEligible { receipt.signed_receipt().as_ref().message.collection_id, )); let allocation_address = allocation_id.into_inner(); - if !self + + let in_active_set = self .indexer_allocations .borrow() - .contains_key(&allocation_address) - { - return Err(CheckError::Failed(anyhow!( - "Receipt allocation ID `{}` is not eligible for this indexer", - allocation_id - ))); + .contains_key(&allocation_address); + + if in_active_set { + // Fast path: active. Refresh last-seen timestamp. + let mut recently_seen = self.recently_seen.lock().await; + recently_seen.insert(allocation_address, Instant::now()); + return Ok(()); } - Ok(()) + + // Slow path: not in active set. Check local grace period cache. + let mut recently_seen = self.recently_seen.lock().await; + + // Opportunistic eviction of stale entries to keep the map bounded. + recently_seen.retain(|_, last_seen| last_seen.elapsed() < self.grace_period * 2); + + if let Some(last_seen) = recently_seen.get(&allocation_address) { + if last_seen.elapsed() < self.grace_period { + tracing::debug!( + allocation_id = %allocation_id, + elapsed_secs = last_seen.elapsed().as_secs(), + grace_period_secs = self.grace_period.as_secs(), + "Accepting receipt for recently-seen allocation (within grace period)" + ); + return Ok(()); + } + } + + Err(CheckError::Failed(anyhow!( + "Receipt allocation ID `{}` is not eligible for this indexer", + allocation_id + ))) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use indexer_allocation::Allocation; + use tap_core::receipt::{checks::Check, Context}; + use test_assets::{create_signed_receipt_v2, INDEXER_ALLOCATIONS}; + use thegraph_core::{alloy::primitives::Address, CollectionId}; + use tokio::sync::watch; + + use super::*; + use crate::tap::TapReceipt; + + async fn create_test_receipt(allocation_id: Address) -> CheckingReceipt { + let receipt = create_signed_receipt_v2() + .collection_id(CollectionId::from(allocation_id)) + .call() + .await; + CheckingReceipt::new(TapReceipt(receipt)) + } + + #[tokio::test] + async fn test_active_allocation_accepted() { + let (tx, rx) = watch::channel((*INDEXER_ALLOCATIONS).clone()); + let check = AllocationEligible::new(rx); + + let allocation_id = *INDEXER_ALLOCATIONS.keys().next().unwrap(); + let receipt = create_test_receipt(allocation_id).await; + + let result = check.check(&Context::default(), &receipt).await; + assert!(result.is_ok(), "Active allocation should be accepted"); + + drop(tx); + } + + #[tokio::test] + async fn test_unknown_allocation_rejected() { + let (tx, rx) = watch::channel((*INDEXER_ALLOCATIONS).clone()); + let check = AllocationEligible::new(rx); + + // Use an allocation ID that's not in the active set + let unknown_allocation = Address::repeat_byte(0x42); + let receipt = create_test_receipt(unknown_allocation).await; + + let result = check.check(&Context::default(), &receipt).await; + assert!(result.is_err(), "Unknown allocation should be rejected"); + + drop(tx); + } + + #[tokio::test] + async fn test_recently_seen_allocation_accepted_within_grace_period() { + let allocations: HashMap = (*INDEXER_ALLOCATIONS).clone(); + let allocation_id = *allocations.keys().next().unwrap(); + + let (tx, rx) = watch::channel(allocations.clone()); + let check = AllocationEligible::with_grace_period(rx, Duration::from_secs(3600)); + + // First, verify the allocation while it's active (this populates recently_seen) + let receipt = create_test_receipt(allocation_id).await; + let result = check.check(&Context::default(), &receipt).await; + assert!(result.is_ok(), "Active allocation should be accepted"); + + // Remove the allocation from the active set + tx.send(HashMap::new()).unwrap(); + + // The allocation should still be accepted within the grace period + let result = check.check(&Context::default(), &receipt).await; + assert!( + result.is_ok(), + "Recently-seen allocation should be accepted within grace period" + ); + } + + #[tokio::test] + async fn test_allocation_rejected_after_grace_period() { + let allocations: HashMap = (*INDEXER_ALLOCATIONS).clone(); + let allocation_id = *allocations.keys().next().unwrap(); + + let (tx, rx) = watch::channel(allocations.clone()); + // Use a very short grace period for testing + let check = AllocationEligible::with_grace_period(rx, Duration::from_millis(10)); + + // Verify the allocation while active (populates recently_seen) + let receipt = create_test_receipt(allocation_id).await; + let result = check.check(&Context::default(), &receipt).await; + assert!(result.is_ok()); + + // Remove from active set + tx.send(HashMap::new()).unwrap(); + + // Wait for grace period to expire + tokio::time::sleep(Duration::from_millis(50)).await; + + // Should now be rejected + let result = check.check(&Context::default(), &receipt).await; + assert!( + result.is_err(), + "Allocation should be rejected after grace period expires" + ); } }