Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 143 additions & 2 deletions crates/monitor/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::{Duration, Instant},
};

use bip39::Mnemonic;
Expand All @@ -22,14 +23,24 @@ pub type AttestationWatcher = Receiver<HashMap<Address, AttestationSigner>>;
///
/// This function accepts multiple mnemonics to support allocations created with different
/// operator keys (e.g., after key rotation or migration).
///
/// The `grace_period` parameter controls how long signers are retained after their allocation
/// disappears from the watch channel. This prevents premature eviction during network subgraph
/// polling gaps, ensuring queries with recently-closed allocation IDs can still be signed.
pub fn attestation_signers(
indexer_allocations_rx: AllocationWatcher,
indexer_mnemonics: Vec<Mnemonic>,
chain_id: ChainId,
dispute_manager_rx: DisputeManagerWatcher,
grace_period: Duration,
) -> AttestationWatcher {
let attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));

// Tombstone map: records when each signer was first evicted
let evicted_at: &'static Mutex<HashMap<Address, Instant>> =
Box::leak(Box::new(Mutex::new(HashMap::new())));

let indexer_mnemonics: Arc<[String]> = indexer_mnemonics
.iter()
.map(|m| m.to_string())
Expand All @@ -45,6 +56,8 @@ pub fn attestation_signers(
&indexer_mnemonics,
chain_id,
attestation_signers_map,
evicted_at,
grace_period,
&allocation,
&dispute,
)
Expand All @@ -56,12 +69,34 @@ fn modify_signers(
indexer_mnemonics: &[String],
chain_id: ChainId,
attestation_signers_map: &'static Mutex<HashMap<Address, AttestationSigner>>,
evicted_at: &'static Mutex<HashMap<Address, Instant>>,
grace_period: Duration,
allocations: &HashMap<Address, Allocation>,
dispute_manager: &Address,
) -> HashMap<Address, AttestationSigner> {
let mut signers = attestation_signers_map.lock().unwrap();
// Remove signers for allocations that are no longer active or recently closed
signers.retain(|id, _| allocations.contains_key(id));
let mut evicted = evicted_at.lock().unwrap();

// Retain signers that are active OR within the grace period post-eviction.
// This prevents premature eviction during network subgraph polling gaps.
signers.retain(|id, _| {
if allocations.contains_key(id) {
// Re-activated: clear tombstone
evicted.remove(id);
true
} else {
// Record eviction time on first absence; keep until grace period expires
let first_evicted = evicted.entry(*id).or_insert_with(Instant::now);
let keep = first_evicted.elapsed() < grace_period;
if !keep {
evicted.remove(id); // Clean up tombstone after expiry
}
keep
}
});

// Opportunistic tombstone cleanup to prevent unbounded growth
evicted.retain(|_, t| t.elapsed() < grace_period * 2);

// Create signers for new allocations
for (id, allocation) in allocations.iter() {
Expand Down Expand Up @@ -123,11 +158,13 @@ mod tests {
async fn test_attestation_signers_update_with_allocations() {
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS);
let grace_period = Duration::from_secs(3600);
let mut signers = attestation_signers(
allocations_rx,
vec![INDEXER_MNEMONIC.clone()],
1,
dispute_manager_rx,
grace_period,
);

// Test that an empty set of allocations leads to an empty set of signers
Expand All @@ -148,4 +185,108 @@ mod tests {
.any(|allocation_id| signer_allocation_id == allocation_id));
}
}

#[tokio::test]
async fn test_signer_retained_within_grace_period() {
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS);
// Use a long grace period to ensure signers are retained
let grace_period = Duration::from_secs(3600);
let mut signers = attestation_signers(
allocations_rx,
vec![INDEXER_MNEMONIC.clone()],
1,
dispute_manager_rx,
grace_period,
);

// Add allocations
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
signers.changed().await.unwrap();
let initial_count = signers.borrow().len();
assert_eq!(initial_count, INDEXER_ALLOCATIONS.len());

// Remove all allocations (simulating network subgraph polling gap)
allocations_tx.send(HashMap::new()).unwrap();
signers.changed().await.unwrap();

// Signers should be retained within grace period
let retained_signers = signers.borrow().clone();
assert_eq!(
retained_signers.len(),
initial_count,
"Signers should be retained within grace period"
);
}

#[tokio::test]
async fn test_signer_evicted_after_grace_period() {
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS);
// Use a very short grace period for testing
let grace_period = Duration::from_millis(10);
let mut signers = attestation_signers(
allocations_rx,
vec![INDEXER_MNEMONIC.clone()],
1,
dispute_manager_rx,
grace_period,
);

// Add allocations
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
signers.changed().await.unwrap();
assert!(!signers.borrow().is_empty());

// Remove all allocations
allocations_tx.send(HashMap::new()).unwrap();
signers.changed().await.unwrap();

// Wait for grace period to expire
tokio::time::sleep(Duration::from_millis(50)).await;

// Trigger another update to run the retain logic
allocations_tx.send(HashMap::new()).unwrap();
signers.changed().await.unwrap();

// Signers should be evicted after grace period
assert!(
signers.borrow().is_empty(),
"Signers should be evicted after grace period expires"
);
}

#[tokio::test]
async fn test_reactivated_allocation_clears_tombstone() {
let (allocations_tx, allocations_rx) = watch::channel(HashMap::new());
let (_, dispute_manager_rx) = watch::channel(DISPUTE_MANAGER_ADDRESS);
let grace_period = Duration::from_secs(3600);
let mut signers = attestation_signers(
allocations_rx,
vec![INDEXER_MNEMONIC.clone()],
1,
dispute_manager_rx,
grace_period,
);

// Add allocations
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
signers.changed().await.unwrap();
let initial_count = signers.borrow().len();

// Remove allocations (starts grace period)
allocations_tx.send(HashMap::new()).unwrap();
signers.changed().await.unwrap();

// Re-add allocations (should clear tombstone)
allocations_tx.send((*INDEXER_ALLOCATIONS).clone()).unwrap();
signers.changed().await.unwrap();

// Signers should still be present
assert_eq!(
signers.borrow().len(),
initial_count,
"Signers should be retained after re-activation"
);
}
}
3 changes: 3 additions & 0 deletions crates/service/src/middleware/attestation_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub async fn signer_middleware(

#[cfg(test)]
mod tests {
use std::time::Duration;

use axum::{body::Body, http::Request, middleware::from_fn_with_state, routing::get, Router};
use indexer_attestation::AttestationSigner;
use indexer_monitor::attestation_signers;
Expand All @@ -65,6 +67,7 @@ mod tests {
vec![INDEXER_MNEMONIC.clone()],
1,
dispute_manager_rx,
Duration::from_secs(3600),
);

let expected_signer = attestation_signers
Expand Down
9 changes: 9 additions & 0 deletions crates/service/src/service/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,19 @@ impl ServiceRouter {
// Maintain an up-to-date set of attestation signers, one for each
// allocation. Multiple mnemonics are tried to support allocations
// created with different operator keys.
// The grace period prevents premature signer eviction during network
// subgraph polling gaps.
let attestation_signer_grace_period = self
.network_subgraph
.as_ref()
.map(|(_, config)| config.recently_closed_allocation_buffer_secs)
.unwrap_or(std::time::Duration::from_secs(3600));
let attestation_signers = attestation_signers(
allocations.clone(),
operator_mnemonics.clone(),
self.blockchain.chain_id as u64,
dispute_manager,
attestation_signer_grace_period,
);

// Rate limits by allowing bursts of 10 requests and requiring 100ms of
Expand Down Expand Up @@ -274,6 +282,7 @@ impl ServiceRouter {
receipt_max_value,
allowed_data_services,
service_provider: self.indexer.indexer_address,
allocation_grace_period: attestation_signer_grace_period,
})
.await;

Expand Down
9 changes: 8 additions & 1 deletion crates/service/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ pub struct TapChecksConfig {
pub receipt_max_value: u128,
pub allowed_data_services: Option<Vec<Address>>,
pub service_provider: Address,
/// Grace period for allocation eligibility checks.
/// Receipts for allocations that were recently active will be accepted
/// within this period, bridging network subgraph polling gaps.
pub allocation_grace_period: Duration,
}

#[derive(Clone)]
Expand Down Expand Up @@ -125,7 +129,10 @@ pub enum AdapterError {
impl IndexerTapContext {
pub async fn get_checks(config: TapChecksConfig) -> Vec<ReceiptCheck<TapReceipt>> {
let mut checks: Vec<ReceiptCheck<TapReceipt>> = vec![
Arc::new(AllocationEligible::new(config.indexer_allocations)),
Arc::new(AllocationEligible::with_grace_period(
config.indexer_allocations,
config.allocation_grace_period,
)),
Arc::new(AllocationRedeemedCheck::new(
config.indexer_address,
config.network_subgraph,
Expand Down
Loading
Loading