From 50153d66443f69d329cc5fd3bcb8f03ff816403b Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Thu, 23 Apr 2026 07:01:19 -0700 Subject: [PATCH 1/8] [guardian] add local limiter emulator with bootstrap + reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a `LocalLimiter` that mirrors the guardian's token-bucket state so each hashi node can project capacity and pick the next `seq` without a round-trip. Observational in this PR — no withdrawal-flow change yet; the rewiring lands in the next PR. - `crates/hashi/src/guardian_limiter.rs` — `LocalLimiter` with `validate_consume` / `apply_consume` / `capacity_at` / `snap_to` under a `tokio::Mutex`, plus 8 unit tests. - `Hashi::start` — fetch `GetGuardianInfo` once at startup, cache the Ed25519 signing pubkey, seed the `LocalLimiter`. Best-effort; the node still starts when the guardian is unreachable. - `start_limiter_reconcile_service` — 30 s background task. Covers late bootstrap, non-leader drift, and leader rotation. Snap only when the guardian is strictly ahead (`next_seq` or `last_updated_at` strictly greater) to avoid moving local state backwards on a stale guardian snapshot. Also refreshes `guardian_signing_pubkey` if bootstrap missed it. - `GetGuardianInfoResponse` extended with `LimiterState` + `LimiterConfig` so one RPC seeds the full emulator. --- crates/hashi-guardian/src/enclave.rs | 10 + crates/hashi-guardian/src/getters.rs | 4 + .../sui/hashi/v1alpha/guardian_service.proto | 15 ++ crates/hashi-types/src/guardian/mod.rs | 4 + .../src/guardian/proto_conversions.rs | 28 +++ crates/hashi-types/src/guardian/test_utils.rs | 2 + .../proto/generated/sui.hashi.v1alpha.fds.bin | Bin 14108 -> 14450 bytes .../src/proto/generated/sui.hashi.v1alpha.rs | 16 ++ crates/hashi/src/grpc/guardian_client.rs | 10 + crates/hashi/src/guardian_limiter.rs | 233 ++++++++++++++++++ crates/hashi/src/lib.rs | 184 +++++++++++++- 11 files changed, 505 insertions(+), 1 deletion(-) create mode 100644 crates/hashi/src/guardian_limiter.rs diff --git a/crates/hashi-guardian/src/enclave.rs b/crates/hashi-guardian/src/enclave.rs index f44f2d916..0ea84a51d 100644 --- a/crates/hashi-guardian/src/enclave.rs +++ b/crates/hashi-guardian/src/enclave.rs @@ -327,6 +327,16 @@ impl EnclaveState { guard.consume(seq, timestamp, amount_sats)?; Ok(LimiterGuard::new(guard)) } + + pub async fn limiter_state(&self) -> Option { + let limiter = self.rate_limiter.get()?; + Some(*limiter.lock().await.state()) + } + + pub async fn limiter_config(&self) -> Option { + let limiter = self.rate_limiter.get()?; + Some(*limiter.lock().await.config()) + } } impl Enclave { diff --git a/crates/hashi-guardian/src/getters.rs b/crates/hashi-guardian/src/getters.rs index 65320a6ad..3e213f8c5 100644 --- a/crates/hashi-guardian/src/getters.rs +++ b/crates/hashi-guardian/src/getters.rs @@ -27,10 +27,14 @@ pub async fn get_guardian_info(enclave: Arc) -> GuardianResult, + /// Current rate limiter state (if initialized). + pub limiter_state: Option, + /// Immutable limiter configuration (if initialized). + pub limiter_config: Option, } /// TODO: Add network? diff --git a/crates/hashi-types/src/guardian/proto_conversions.rs b/crates/hashi-types/src/guardian/proto_conversions.rs index 8b73da531..7f4166e53 100644 --- a/crates/hashi-types/src/guardian/proto_conversions.rs +++ b/crates/hashi-types/src/guardian/proto_conversions.rs @@ -21,6 +21,7 @@ use super::GuardianSigned; use super::HashiCommittee; use super::HashiCommitteeMember; use super::HashiSigned; +use super::LimiterConfig; use super::LimiterState; use super::OperatorInitRequest; use super::ProvisionerInitRequest; @@ -200,10 +201,15 @@ impl TryFrom for GetGuardianInfoResponse { let signed_info_pb = resp.signed_info.ok_or_else(|| missing("signed_info"))?; let signed_info = pb_to_signed_guardian_info(signed_info_pb)?; + let limiter_state = resp.limiter_state.map(pb_to_limiter_state).transpose()?; + let limiter_config = resp.limiter_config.map(pb_to_limiter_config).transpose()?; + Ok(GetGuardianInfoResponse { attestation: attestation.to_vec(), signing_pub_key, signed_info, + limiter_state, + limiter_config, }) } } @@ -335,6 +341,8 @@ pub fn get_guardian_info_response_to_pb(r: GetGuardianInfoResponse) -> pb::GetGu attestation: Some(r.attestation.into()), signing_pub_key: Some(r.signing_pub_key.to_bytes().to_vec().into()), signed_info: Some(signed_guardian_info_to_pb(r.signed_info)), + limiter_state: r.limiter_state.map(limiter_state_to_pb), + limiter_config: r.limiter_config.map(limiter_config_to_pb), } } @@ -659,6 +667,26 @@ fn limiter_state_to_pb(state: LimiterState) -> pb::LimiterState { } } +fn pb_to_limiter_config(cfg: pb::LimiterConfig) -> GuardianResult { + let refill_rate = cfg + .refill_rate_sats_per_sec + .ok_or_else(|| missing("refill_rate_sats_per_sec"))?; + let max_bucket_capacity = cfg + .max_bucket_capacity_sats + .ok_or_else(|| missing("max_bucket_capacity_sats"))?; + Ok(LimiterConfig { + refill_rate, + max_bucket_capacity, + }) +} + +fn limiter_config_to_pb(cfg: LimiterConfig) -> pb::LimiterConfig { + pb::LimiterConfig { + refill_rate_sats_per_sec: Some(cfg.refill_rate), + max_bucket_capacity_sats: Some(cfg.max_bucket_capacity), + } +} + fn pb_to_hashi_committee(c: pb::Committee) -> GuardianResult { let epoch = c.epoch.ok_or_else(|| missing("epoch"))?; diff --git a/crates/hashi-types/src/guardian/test_utils.rs b/crates/hashi-types/src/guardian/test_utils.rs index eae27f219..41e4aeec9 100644 --- a/crates/hashi-types/src/guardian/test_utils.rs +++ b/crates/hashi-types/src/guardian/test_utils.rs @@ -85,6 +85,8 @@ impl GetGuardianInfoResponse { attestation: "abcd".as_bytes().to_vec(), signing_pub_key, signed_info: GuardianSigned::new(info, &signing_key, 1234), + limiter_state: None, + limiter_config: None, } } } diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin index 97006669616fefa72176fc2504ecf641c0e3145d..7c8bff75398fdab60b5c0e55b4fd6428a14462cd 100644 GIT binary patch delta 171 zcmbP}_o-lmI4{#2&B>CyGnrm6P2R%0RY1svi#I1TH?t(QD89HPv1IZFC7H?hC8VT; z+`0H*63O{_X_@H~tO|@8+(rtM16hQDO4vo6!NNf>{SfZvOuh{~f=3v+czqzoLWCzL tNXoJ^0u5mCV3^z@X~xb3W;0GcC@DSpi=-8^B$xQ)gYx2=jip~o0{{, + /// Current rate limiter state (if initialized). + #[prost(message, optional, tag = "4")] + pub limiter_state: ::core::option::Option, + /// Immutable limiter configuration (if initialized). + #[prost(message, optional, tag = "5")] + pub limiter_config: ::core::option::Option, } /// Guardian-signed wrapper around `GuardianInfoData`. #[derive(Clone, PartialEq, ::prost::Message)] @@ -1180,6 +1186,16 @@ pub struct LimiterState { #[prost(uint64, optional, tag = "3")] pub next_seq: ::core::option::Option, } +/// Immutable limiter configuration. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LimiterConfig { + /// Token refill rate in sats per second. + #[prost(uint64, optional, tag = "1")] + pub refill_rate_sats_per_sec: ::core::option::Option, + /// Maximum bucket capacity in sats. + #[prost(uint64, optional, tag = "2")] + pub max_bucket_capacity_sats: ::core::option::Option, +} #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct ProvisionerInitResponse {} /// Hashi-signed wrapper for the withdrawal request. diff --git a/crates/hashi/src/grpc/guardian_client.rs b/crates/hashi/src/grpc/guardian_client.rs index 6f55bd2dd..a640f80ec 100644 --- a/crates/hashi/src/grpc/guardian_client.rs +++ b/crates/hashi/src/grpc/guardian_client.rs @@ -37,4 +37,14 @@ impl GuardianClient { pub fn guardian_service_client(&self) -> GuardianServiceClient { GuardianServiceClient::new(self.channel.clone()) } + + pub async fn get_guardian_info( + &self, + ) -> Result { + let response = self + .guardian_service_client() + .get_guardian_info(hashi_types::proto::GetGuardianInfoRequest {}) + .await?; + Ok(response.into_inner()) + } } diff --git a/crates/hashi/src/guardian_limiter.rs b/crates/hashi/src/guardian_limiter.rs new file mode 100644 index 000000000..fd1544e4d --- /dev/null +++ b/crates/hashi/src/guardian_limiter.rs @@ -0,0 +1,233 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Local emulator of the guardian's token-bucket rate limiter, used by +//! coin selection to project capacity and by the leader to pick the next +//! `seq` without a guardian round-trip. The guardian remains authoritative; +//! `snap_to` syncs from it on any drift. + +use hashi_types::guardian::LimiterConfig; +use hashi_types::guardian::LimiterState; +use std::fmt; +use tokio::sync::Mutex; + +pub struct LocalLimiter { + config: LimiterConfig, + state: Mutex, +} + +#[derive(Debug, thiserror::Error)] +pub enum LocalLimiterError { + #[error("stale timestamp: local last_updated_at={local_last}, incoming={incoming}")] + StaleTimestamp { local_last: u64, incoming: u64 }, + + #[error("insufficient capacity: needed {needed}, available {available}")] + InsufficientCapacity { needed: u64, available: u64 }, + + #[error("seq mismatch: expected {expected}, applied {applied}")] + SeqMismatch { expected: u64, applied: u64 }, +} + +impl fmt::Debug for LocalLimiter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalLimiter") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +impl LocalLimiter { + pub fn new(config: LimiterConfig, state: LimiterState) -> Self { + Self { + config, + state: Mutex::new(state), + } + } + + pub fn config(&self) -> &LimiterConfig { + &self.config + } + + pub async fn snapshot(&self) -> LimiterState { + *self.state.lock().await + } + + /// Capacity projected to `timestamp_secs`, clamped to the config ceiling. + pub async fn capacity_at(&self, timestamp_secs: u64) -> u64 { + let state = *self.state.lock().await; + project_capacity(&self.config, &state, timestamp_secs) + } + + pub async fn next_seq(&self) -> u64 { + self.state.lock().await.next_seq + } + + /// Check whether a consume for `(timestamp_secs, amount_sats)` would + /// be accepted, and return the `seq` to submit. Does not mutate state; + /// the caller must call `apply_consume` once the guardian accepts. + pub async fn validate_consume( + &self, + timestamp_secs: u64, + amount_sats: u64, + ) -> Result { + let state = *self.state.lock().await; + if timestamp_secs < state.last_updated_at { + return Err(LocalLimiterError::StaleTimestamp { + local_last: state.last_updated_at, + incoming: timestamp_secs, + }); + } + let capacity = project_capacity(&self.config, &state, timestamp_secs); + if capacity < amount_sats { + return Err(LocalLimiterError::InsufficientCapacity { + needed: amount_sats, + available: capacity, + }); + } + Ok(state.next_seq) + } + + /// Mirror an accepted consume into local state. On any error the state + /// is left untouched and the caller should `snap_to` the guardian. + pub async fn apply_consume( + &self, + applied_seq: u64, + timestamp_secs: u64, + amount_sats: u64, + ) -> Result<(), LocalLimiterError> { + let mut guard = self.state.lock().await; + if applied_seq != guard.next_seq { + return Err(LocalLimiterError::SeqMismatch { + expected: guard.next_seq, + applied: applied_seq, + }); + } + if timestamp_secs < guard.last_updated_at { + return Err(LocalLimiterError::StaleTimestamp { + local_last: guard.last_updated_at, + incoming: timestamp_secs, + }); + } + let capacity = project_capacity(&self.config, &guard, timestamp_secs); + if capacity < amount_sats { + return Err(LocalLimiterError::InsufficientCapacity { + needed: amount_sats, + available: capacity, + }); + } + guard.num_tokens_available = capacity - amount_sats; + guard.last_updated_at = timestamp_secs; + guard.next_seq += 1; + Ok(()) + } + + /// Replace local state with the guardian's authoritative snapshot. + pub async fn snap_to(&self, state: LimiterState) { + *self.state.lock().await = state; + } +} + +fn project_capacity(config: &LimiterConfig, state: &LimiterState, timestamp_secs: u64) -> u64 { + let elapsed = timestamp_secs.saturating_sub(state.last_updated_at); + let refilled = elapsed.saturating_mul(config.refill_rate); + state + .num_tokens_available + .saturating_add(refilled) + .min(config.max_bucket_capacity) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_limiter( + num_tokens_available: u64, + last_updated_at: u64, + next_seq: u64, + ) -> LocalLimiter { + let config = LimiterConfig { + refill_rate: 1_000, + max_bucket_capacity: 2_000_000, + }; + let state = LimiterState { + num_tokens_available, + last_updated_at, + next_seq, + }; + LocalLimiter::new(config, state) + } + + #[tokio::test] + async fn test_validate_consume_happy_path() { + let limiter = make_limiter(0, 0, 7); + // 100 seconds of refill => 100k headroom. + let seq = limiter.validate_consume(100, 80_000).await.unwrap(); + assert_eq!(seq, 7, "validate returns the currently-next seq"); + } + + #[tokio::test] + async fn test_validate_rejects_stale_timestamp() { + let limiter = make_limiter(0, 100, 0); + let err = limiter.validate_consume(50, 0).await.unwrap_err(); + assert!(matches!(err, LocalLimiterError::StaleTimestamp { .. })); + } + + #[tokio::test] + async fn test_validate_rejects_over_capacity() { + let limiter = make_limiter(0, 0, 0); + // Only 10k refilled; ask for 50k. + let err = limiter.validate_consume(10, 50_000).await.unwrap_err(); + match err { + LocalLimiterError::InsufficientCapacity { needed, available } => { + assert_eq!(needed, 50_000); + assert_eq!(available, 10_000); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn test_apply_bumps_seq_and_updates_last_updated_at() { + let limiter = make_limiter(0, 0, 42); + let seq = limiter.validate_consume(100, 80_000).await.unwrap(); + limiter.apply_consume(seq, 100, 80_000).await.unwrap(); + let snap = limiter.snapshot().await; + assert_eq!(snap.next_seq, 43); + assert_eq!(snap.last_updated_at, 100); + assert_eq!(snap.num_tokens_available, 20_000); + } + + #[tokio::test] + async fn test_apply_rejects_seq_mismatch() { + let limiter = make_limiter(0, 0, 0); + let err = limiter.apply_consume(5, 100, 1_000).await.unwrap_err(); + assert!(matches!(err, LocalLimiterError::SeqMismatch { .. })); + } + + #[tokio::test] + async fn test_capacity_at_refills_linearly_and_clamps_to_ceiling() { + let limiter = make_limiter(100_000, 10, 0); + // refill 1000 sats/sec; elapsed 5s; base 100k; refill 5k => 105k. + assert_eq!(limiter.capacity_at(15).await, 105_000); + // far-future => clamped at max_bucket_capacity 2_000_000. + assert_eq!(limiter.capacity_at(u64::MAX).await, 2_000_000); + } + + #[tokio::test] + async fn test_snap_to_replaces_state() { + let limiter = make_limiter(1_000, 10, 3); + let new_state = LimiterState { + num_tokens_available: 500_000, + last_updated_at: 999, + next_seq: 42, + }; + limiter.snap_to(new_state).await; + assert_eq!(limiter.snapshot().await, new_state); + } + + #[tokio::test] + async fn test_next_seq_matches_snapshot() { + let limiter = make_limiter(0, 0, 11); + assert_eq!(limiter.next_seq().await, 11); + } +} diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index 454a5da60..2f8973558 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -18,6 +18,7 @@ pub mod constants; pub mod db; pub mod deposits; pub mod grpc; +pub mod guardian_limiter; pub mod leader; pub mod metrics; pub mod mpc; @@ -50,6 +51,12 @@ pub struct Hashi { btc_monitor: OnceLock, screener_client: OnceLock>, guardian_client: OnceLock>, + guardian_signing_pubkey: OnceLock>, + /// Local emulator of the guardian rate limiter. Populated at startup + /// from the same `GetGuardianInfo` response that caches + /// `guardian_signing_pubkey`; if the guardian wasn't ready then, the + /// reconciliation task fills it in on its first successful poll. + local_limiter: RwLock>>, /// Reconfig completion signatures by epoch. reconfig_signatures: RwLock>>, } @@ -72,6 +79,8 @@ impl Hashi { btc_monitor: OnceLock::new(), screener_client: OnceLock::new(), guardian_client: OnceLock::new(), + guardian_signing_pubkey: OnceLock::new(), + local_limiter: RwLock::new(None), reconfig_signatures: RwLock::new(HashMap::new()), })) } @@ -97,6 +106,8 @@ impl Hashi { btc_monitor: OnceLock::new(), screener_client: OnceLock::new(), guardian_client: OnceLock::new(), + guardian_signing_pubkey: OnceLock::new(), + local_limiter: RwLock::new(None), reconfig_signatures: RwLock::new(HashMap::new()), })) } @@ -192,6 +203,25 @@ impl Hashi { self.guardian_client.get().and_then(|opt| opt.as_ref()) } + pub fn guardian_signing_pubkey(&self) -> Option<&hashi_types::guardian::GuardianPubKey> { + self.guardian_signing_pubkey + .get() + .and_then(|opt| opt.as_ref()) + } + + /// Local emulator of the guardian rate limiter. `None` when no + /// guardian is configured or the guardian hadn't initialized a limiter + /// by the time the reconciliation task polled it. + pub fn local_limiter(&self) -> Option> { + self.local_limiter.read().unwrap().clone() + } + + /// Install a local limiter. Used by the reconciliation task when + /// bootstrap at startup missed the guardian (late-start case). + pub(crate) fn install_local_limiter(&self, limiter: Arc) { + *self.local_limiter.write().unwrap() = Some(limiter); + } + async fn initialize_onchain_state(&self) -> anyhow::Result { let (onchain_state, service) = onchain::OnchainState::new( self.config.sui_rpc.as_deref().unwrap(), @@ -397,6 +427,59 @@ impl Hashi { match grpc::guardian_client::GuardianClient::new(endpoint) { Ok(client) => { tracing::info!("Guardian client configured for {}", client.endpoint()); + // Best-effort fetch at startup. We cache two things: + // 1. the guardian's Ed25519 signing pubkey (so we can + // verify `StandardWithdrawalResponse` signatures), and + // 2. the limiter state + config, which seed the local + // emulator that drives coin-selection capacity checks + // and picks the next `seq` for hard reserves. + // If either piece is missing (guardian not bootstrapped yet + // or network hiccup), the reconciliation task spawned below + // will fill it in on its first successful poll. + match client.get_guardian_info().await { + Ok(info_pb) => { + match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) + { + Ok(info) => { + tracing::info!( + "Guardian signing pubkey cached for response verification" + ); + let _ = self + .guardian_signing_pubkey + .set(Some(info.signing_pub_key)); + if let (Some(state), Some(config)) = + (info.limiter_state, info.limiter_config) + { + tracing::info!( + ?state, + ?config, + "Local guardian limiter seeded from GetGuardianInfo", + ); + *self.local_limiter.write().unwrap() = Some(Arc::new( + guardian_limiter::LocalLimiter::new(config, state), + )); + } else { + tracing::info!( + "Guardian has no limiter yet; local emulator will \ + initialize from the reconciliation task" + ); + } + } + Err(e) => { + tracing::warn!( + "Failed to parse guardian info: {e}; \ + response signature verification disabled" + ); + } + } + } + Err(e) => { + tracing::warn!( + "Failed to fetch guardian info: {e}; \ + response signature verification disabled" + ); + } + } Some(client) } Err(e) => { @@ -466,17 +549,116 @@ impl Hashi { let (_http_addr, http_service) = grpc::HttpService::new(self.clone()).start().await; let leader_service = leader::LeaderService::new(self.clone()).start(); let mpc_service = mpc_service.start(); + let limiter_reconcile_service = self.clone().start_limiter_reconcile_service(); let service = Service::new() .merge(onchain_service) .merge(btc_monitor_service) .merge(http_service) .merge(leader_service) - .merge(mpc_service); + .merge(mpc_service) + .merge(limiter_reconcile_service); Ok(service) } + /// Spawn a background task that periodically re-fetches the guardian's + /// limiter state and snaps the local emulator onto the authoritative + /// view. Covers three cases: + /// + /// 1. **Late bootstrap**: if the guardian wasn't initialized when + /// `Hashi::start()` ran, the task installs the local limiter on its + /// first successful poll. + /// 2. **Non-leader drift**: non-leader nodes never talk to the guardian + /// on the withdrawal hot path, so they'd otherwise see stale state. + /// When they later become leader, they'd submit with a bad seq / + /// timestamp. Periodic reconciliation keeps them fresh. + /// 3. **Leader rotation**: a new leader's local state is at most + /// `LIMITER_RECONCILE_INTERVAL` stale; the first guardian rejection + /// also triggers an immediate snap. + fn start_limiter_reconcile_service(self: Arc) -> Service { + const LIMITER_RECONCILE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); + Service::new().spawn_aborting(async move { + if self.guardian_client().is_none() { + return Ok(()); + } + let mut ticker = tokio::time::interval(LIMITER_RECONCILE_INTERVAL); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + // First tick fires immediately; skip it so we don't overlap the + // bootstrap fetch that already ran in Hashi::start. + ticker.tick().await; + loop { + ticker.tick().await; + let Some(client) = self.guardian_client() else { + return Ok(()); + }; + let info_pb = match client.get_guardian_info().await { + Ok(info) => info, + Err(e) => { + tracing::debug!("limiter reconcile: GetGuardianInfo failed: {e}"); + continue; + } + }; + let info = match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) { + Ok(info) => info, + Err(e) => { + tracing::debug!("limiter reconcile: parse failed: {e:?}"); + continue; + } + }; + // Cache the guardian signing pubkey if bootstrap missed it + // (e.g. the guardian wasn't reachable when `Hashi::start` + // ran). `OnceLock::set` returns Err when already populated + // — that's the steady-state happy path, ignore it. + let _ = self.guardian_signing_pubkey.set(Some(info.signing_pub_key)); + let (Some(state), Some(config)) = (info.limiter_state, info.limiter_config) else { + tracing::debug!("limiter reconcile: guardian has no limiter yet"); + continue; + }; + match self.local_limiter() { + Some(limiter) => { + let local = limiter.snapshot().await; + // Only snap when the guardian is strictly ahead. + // A "same next_seq but guardian's last_updated_at + // later" delta means the guardian observed a later + // timestamp without consuming — extremely rare but + // still ahead. If local is ahead (e.g. a guardian + // rehydrate from a stale S3 snapshot), don't clobber + // — warn and wait for the guardian to catch up. + let guardian_ahead = state.next_seq > local.next_seq + || (state.next_seq == local.next_seq + && state.last_updated_at > local.last_updated_at); + if guardian_ahead { + tracing::info!( + ?local, + guardian = ?state, + "limiter reconcile: snapping local state to guardian" + ); + limiter.snap_to(state).await; + } else if local.next_seq > state.next_seq { + tracing::warn!( + ?local, + guardian = ?state, + "limiter reconcile: local ahead of guardian; not snapping \ + (possible guardian rehydrate from a stale snapshot)" + ); + } + } + None => { + tracing::info!( + ?state, + ?config, + "limiter reconcile: installing local limiter from first successful poll" + ); + self.install_local_limiter(Arc::new(guardian_limiter::LocalLimiter::new( + config, state, + ))); + } + } + } + }) + } + pub(crate) fn is_in_current_committee(&self) -> bool { let address = match self.config.validator_address() { Ok(a) => a, From fbe95d229536213d680ab5475f301a7203c0f901 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Thu, 23 Apr 2026 11:44:25 -0700 Subject: [PATCH 2/8] [guardian] swap periodic reconcile for event-driven bootstrap retry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In the simplest case the 30 s reconciliation loop has nothing to do — the leader stays in sync via `apply_consume` after every successful guardian RPC, the next PR's error-path snaps from the guardian unconditionally on any RPC rejection, and the wasted-MPC-round window at leader promotion is better covered by an on-demand snap at the false→true leader edge (landing alongside the withdrawal wiring). Keep the bootstrap half of the sync story; replace the recurring reconcile with a short-lived retry. - `try_seed_guardian_state` — idempotent helper that writes each field at most once (`OnceLock::set` for the pubkey, `is_none()` guard on the `local_limiter` slot). Called synchronously in `Hashi::start` so the happy path is seeded before any other service starts. - `start_guardian_bootstrap_retry` — short-lived task that fast-exits when no guardian is configured or state is already seeded, else retries with bounded exponential backoff (1 s → 30 s) until the first success, then returns. - Delete `start_limiter_reconcile_service` + `install_local_limiter`. The reconcile's "strictly ahead" clause had a dead branch anyway: `RateLimiter::consume` moves `last_updated_at` and `next_seq` in lockstep, and `revert` rolls both back, so they cannot diverge. No proto, guardian-side, or `guardian_limiter.rs` changes. Non-guardian baseline is untouched. --- crates/hashi/src/lib.rs | 225 +++++++++++++--------------------------- 1 file changed, 71 insertions(+), 154 deletions(-) diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index 2f8973558..9611c4f83 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -52,15 +52,17 @@ pub struct Hashi { screener_client: OnceLock>, guardian_client: OnceLock>, guardian_signing_pubkey: OnceLock>, - /// Local emulator of the guardian rate limiter. Populated at startup - /// from the same `GetGuardianInfo` response that caches - /// `guardian_signing_pubkey`; if the guardian wasn't ready then, the - /// reconciliation task fills it in on its first successful poll. local_limiter: RwLock>>, /// Reconfig completion signatures by epoch. reconfig_signatures: RwLock>>, } +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +enum SeedOutcome { + FullySeeded, + NotReady, +} + impl Hashi { pub fn new(server_version: ServerVersion, config: config::Config) -> anyhow::Result> { init_crypto_provider(); @@ -209,19 +211,10 @@ impl Hashi { .and_then(|opt| opt.as_ref()) } - /// Local emulator of the guardian rate limiter. `None` when no - /// guardian is configured or the guardian hadn't initialized a limiter - /// by the time the reconciliation task polled it. pub fn local_limiter(&self) -> Option> { self.local_limiter.read().unwrap().clone() } - /// Install a local limiter. Used by the reconciliation task when - /// bootstrap at startup missed the guardian (late-start case). - pub(crate) fn install_local_limiter(&self, limiter: Arc) { - *self.local_limiter.write().unwrap() = Some(limiter); - } - async fn initialize_onchain_state(&self) -> anyhow::Result { let (onchain_state, service) = onchain::OnchainState::new( self.config.sui_rpc.as_deref().unwrap(), @@ -427,59 +420,6 @@ impl Hashi { match grpc::guardian_client::GuardianClient::new(endpoint) { Ok(client) => { tracing::info!("Guardian client configured for {}", client.endpoint()); - // Best-effort fetch at startup. We cache two things: - // 1. the guardian's Ed25519 signing pubkey (so we can - // verify `StandardWithdrawalResponse` signatures), and - // 2. the limiter state + config, which seed the local - // emulator that drives coin-selection capacity checks - // and picks the next `seq` for hard reserves. - // If either piece is missing (guardian not bootstrapped yet - // or network hiccup), the reconciliation task spawned below - // will fill it in on its first successful poll. - match client.get_guardian_info().await { - Ok(info_pb) => { - match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) - { - Ok(info) => { - tracing::info!( - "Guardian signing pubkey cached for response verification" - ); - let _ = self - .guardian_signing_pubkey - .set(Some(info.signing_pub_key)); - if let (Some(state), Some(config)) = - (info.limiter_state, info.limiter_config) - { - tracing::info!( - ?state, - ?config, - "Local guardian limiter seeded from GetGuardianInfo", - ); - *self.local_limiter.write().unwrap() = Some(Arc::new( - guardian_limiter::LocalLimiter::new(config, state), - )); - } else { - tracing::info!( - "Guardian has no limiter yet; local emulator will \ - initialize from the reconciliation task" - ); - } - } - Err(e) => { - tracing::warn!( - "Failed to parse guardian info: {e}; \ - response signature verification disabled" - ); - } - } - } - Err(e) => { - tracing::warn!( - "Failed to fetch guardian info: {e}; \ - response signature verification disabled" - ); - } - } Some(client) } Err(e) => { @@ -500,6 +440,12 @@ impl Hashi { .set(guardian) .map_err(|_| anyhow!("Guardian client already initialized"))?; + // Seed guardian signing pubkey and local limiter before any other + // service starts. The background retry below covers the unhappy path. + if self.guardian_client().is_some() { + let _ = self.try_seed_guardian_state().await; + } + // Verify Sui RPC is on the expected chain before loading any state. self.verify_sui_chain_id().await?; @@ -549,7 +495,7 @@ impl Hashi { let (_http_addr, http_service) = grpc::HttpService::new(self.clone()).start().await; let leader_service = leader::LeaderService::new(self.clone()).start(); let mpc_service = mpc_service.start(); - let limiter_reconcile_service = self.clone().start_limiter_reconcile_service(); + let bootstrap_retry_service = self.clone().start_guardian_bootstrap_retry(); let service = Service::new() .merge(onchain_service) @@ -557,104 +503,75 @@ impl Hashi { .merge(http_service) .merge(leader_service) .merge(mpc_service) - .merge(limiter_reconcile_service); + .merge(bootstrap_retry_service); Ok(service) } - /// Spawn a background task that periodically re-fetches the guardian's - /// limiter state and snaps the local emulator onto the authoritative - /// view. Covers three cases: - /// - /// 1. **Late bootstrap**: if the guardian wasn't initialized when - /// `Hashi::start()` ran, the task installs the local limiter on its - /// first successful poll. - /// 2. **Non-leader drift**: non-leader nodes never talk to the guardian - /// on the withdrawal hot path, so they'd otherwise see stale state. - /// When they later become leader, they'd submit with a bad seq / - /// timestamp. Periodic reconciliation keeps them fresh. - /// 3. **Leader rotation**: a new leader's local state is at most - /// `LIMITER_RECONCILE_INTERVAL` stale; the first guardian rejection - /// also triggers an immediate snap. - fn start_limiter_reconcile_service(self: Arc) -> Service { - const LIMITER_RECONCILE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30); + /// Fetch `GetGuardianInfo` once and seed `guardian_signing_pubkey` and + /// `local_limiter` as far as the response allows. Idempotent. + async fn try_seed_guardian_state(&self) -> SeedOutcome { + let Some(client) = self.guardian_client() else { + return SeedOutcome::NotReady; + }; + let info_pb = match client.get_guardian_info().await { + Ok(info) => info, + Err(e) => { + tracing::debug!("guardian bootstrap: GetGuardianInfo failed: {e}"); + return SeedOutcome::NotReady; + } + }; + let info = match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) { + Ok(info) => info, + Err(e) => { + tracing::warn!("guardian bootstrap: parse failed: {e:?}"); + return SeedOutcome::NotReady; + } + }; + let _ = self.guardian_signing_pubkey.set(Some(info.signing_pub_key)); + let (Some(state), Some(config)) = (info.limiter_state, info.limiter_config) else { + tracing::debug!("guardian bootstrap: guardian has no limiter yet"); + return SeedOutcome::NotReady; + }; + let mut slot = self.local_limiter.write().unwrap(); + if slot.is_none() { + tracing::info!( + ?state, + ?config, + "Local guardian limiter seeded from GetGuardianInfo", + ); + *slot = Some(Arc::new(guardian_limiter::LocalLimiter::new(config, state))); + } + SeedOutcome::FullySeeded + } + + /// Retry `try_seed_guardian_state` with bounded exponential backoff + /// until the first success, then exit. No-op when state is already + /// seeded or no guardian is configured. + fn start_guardian_bootstrap_retry(self: Arc) -> Service { Service::new().spawn_aborting(async move { if self.guardian_client().is_none() { return Ok(()); } - let mut ticker = tokio::time::interval(LIMITER_RECONCILE_INTERVAL); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - // First tick fires immediately; skip it so we don't overlap the - // bootstrap fetch that already ran in Hashi::start. - ticker.tick().await; + if self.guardian_signing_pubkey().is_some() && self.local_limiter().is_some() { + return Ok(()); + } + tracing::info!( + "Guardian not fully seeded at startup; retrying in background with \ + exponential backoff" + ); + const MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(30); + let mut delay = std::time::Duration::from_secs(1); loop { - ticker.tick().await; - let Some(client) = self.guardian_client() else { + tokio::time::sleep(delay).await; + if matches!( + self.try_seed_guardian_state().await, + SeedOutcome::FullySeeded + ) { + tracing::info!("Guardian bootstrap complete via retry task"); return Ok(()); - }; - let info_pb = match client.get_guardian_info().await { - Ok(info) => info, - Err(e) => { - tracing::debug!("limiter reconcile: GetGuardianInfo failed: {e}"); - continue; - } - }; - let info = match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) { - Ok(info) => info, - Err(e) => { - tracing::debug!("limiter reconcile: parse failed: {e:?}"); - continue; - } - }; - // Cache the guardian signing pubkey if bootstrap missed it - // (e.g. the guardian wasn't reachable when `Hashi::start` - // ran). `OnceLock::set` returns Err when already populated - // — that's the steady-state happy path, ignore it. - let _ = self.guardian_signing_pubkey.set(Some(info.signing_pub_key)); - let (Some(state), Some(config)) = (info.limiter_state, info.limiter_config) else { - tracing::debug!("limiter reconcile: guardian has no limiter yet"); - continue; - }; - match self.local_limiter() { - Some(limiter) => { - let local = limiter.snapshot().await; - // Only snap when the guardian is strictly ahead. - // A "same next_seq but guardian's last_updated_at - // later" delta means the guardian observed a later - // timestamp without consuming — extremely rare but - // still ahead. If local is ahead (e.g. a guardian - // rehydrate from a stale S3 snapshot), don't clobber - // — warn and wait for the guardian to catch up. - let guardian_ahead = state.next_seq > local.next_seq - || (state.next_seq == local.next_seq - && state.last_updated_at > local.last_updated_at); - if guardian_ahead { - tracing::info!( - ?local, - guardian = ?state, - "limiter reconcile: snapping local state to guardian" - ); - limiter.snap_to(state).await; - } else if local.next_seq > state.next_seq { - tracing::warn!( - ?local, - guardian = ?state, - "limiter reconcile: local ahead of guardian; not snapping \ - (possible guardian rehydrate from a stale snapshot)" - ); - } - } - None => { - tracing::info!( - ?state, - ?config, - "limiter reconcile: installing local limiter from first successful poll" - ); - self.install_local_limiter(Arc::new(guardian_limiter::LocalLimiter::new( - config, state, - ))); - } } + delay = (delay * 2).min(MAX_DELAY); } }) } From 6a8027a7039ad65318cd9e0d5252936ce98cc9d7 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 08:17:02 -0700 Subject: [PATCH 3/8] [guardian] replace SeedOutcome enum with bool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The two-variant enum was a glorified bool — the retry loop only checked for FullySeeded. Switch to bool to drop the indirection. --- crates/hashi/src/lib.rs | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index 9611c4f83..a67023be5 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -57,12 +57,6 @@ pub struct Hashi { reconfig_signatures: RwLock>>, } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum SeedOutcome { - FullySeeded, - NotReady, -} - impl Hashi { pub fn new(server_version: ServerVersion, config: config::Config) -> anyhow::Result> { init_crypto_provider(); @@ -509,29 +503,31 @@ impl Hashi { } /// Fetch `GetGuardianInfo` once and seed `guardian_signing_pubkey` and - /// `local_limiter` as far as the response allows. Idempotent. - async fn try_seed_guardian_state(&self) -> SeedOutcome { + /// `local_limiter` as far as the response allows. Idempotent. Returns + /// `true` once the local limiter slot is populated; the retry loop uses + /// this as its exit signal. + async fn try_seed_guardian_state(&self) -> bool { let Some(client) = self.guardian_client() else { - return SeedOutcome::NotReady; + return false; }; let info_pb = match client.get_guardian_info().await { Ok(info) => info, Err(e) => { tracing::debug!("guardian bootstrap: GetGuardianInfo failed: {e}"); - return SeedOutcome::NotReady; + return false; } }; let info = match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) { Ok(info) => info, Err(e) => { tracing::warn!("guardian bootstrap: parse failed: {e:?}"); - return SeedOutcome::NotReady; + return false; } }; let _ = self.guardian_signing_pubkey.set(Some(info.signing_pub_key)); let (Some(state), Some(config)) = (info.limiter_state, info.limiter_config) else { tracing::debug!("guardian bootstrap: guardian has no limiter yet"); - return SeedOutcome::NotReady; + return false; }; let mut slot = self.local_limiter.write().unwrap(); if slot.is_none() { @@ -542,7 +538,7 @@ impl Hashi { ); *slot = Some(Arc::new(guardian_limiter::LocalLimiter::new(config, state))); } - SeedOutcome::FullySeeded + true } /// Retry `try_seed_guardian_state` with bounded exponential backoff @@ -564,10 +560,7 @@ impl Hashi { let mut delay = std::time::Duration::from_secs(1); loop { tokio::time::sleep(delay).await; - if matches!( - self.try_seed_guardian_state().await, - SeedOutcome::FullySeeded - ) { + if self.try_seed_guardian_state().await { tracing::info!("Guardian bootstrap complete via retry task"); return Ok(()); } From b3c5783457af4b7e34ac0d942bf973305e1bb237 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 08:21:40 -0700 Subject: [PATCH 4/8] [guardian] fold the upfront seed into the background bootstrap loop The synchronous seed in start() was best-effort (return ignored) and duplicated the retry task. Drop it and let the background task own seeding, with the first attempt firing at delay = 0. No caller relies on guardian state being populated before start() returns. --- crates/hashi/src/lib.rs | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index a67023be5..df1ae7ddb 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -434,12 +434,6 @@ impl Hashi { .set(guardian) .map_err(|_| anyhow!("Guardian client already initialized"))?; - // Seed guardian signing pubkey and local limiter before any other - // service starts. The background retry below covers the unhappy path. - if self.guardian_client().is_some() { - let _ = self.try_seed_guardian_state().await; - } - // Verify Sui RPC is on the expected chain before loading any state. self.verify_sui_chain_id().await?; @@ -489,7 +483,7 @@ impl Hashi { let (_http_addr, http_service) = grpc::HttpService::new(self.clone()).start().await; let leader_service = leader::LeaderService::new(self.clone()).start(); let mpc_service = mpc_service.start(); - let bootstrap_retry_service = self.clone().start_guardian_bootstrap_retry(); + let guardian_bootstrap_service = self.clone().start_guardian_bootstrap(); let service = Service::new() .merge(onchain_service) @@ -497,7 +491,7 @@ impl Hashi { .merge(http_service) .merge(leader_service) .merge(mpc_service) - .merge(bootstrap_retry_service); + .merge(guardian_bootstrap_service); Ok(service) } @@ -541,30 +535,27 @@ impl Hashi { true } - /// Retry `try_seed_guardian_state` with bounded exponential backoff - /// until the first success, then exit. No-op when state is already - /// seeded or no guardian is configured. - fn start_guardian_bootstrap_retry(self: Arc) -> Service { + /// Drive `try_seed_guardian_state` with bounded exponential backoff + /// until the first success, then exit. The first attempt fires + /// immediately (delay = 0). No-op when no guardian is configured. + fn start_guardian_bootstrap(self: Arc) -> Service { Service::new().spawn_aborting(async move { if self.guardian_client().is_none() { return Ok(()); } - if self.guardian_signing_pubkey().is_some() && self.local_limiter().is_some() { - return Ok(()); - } - tracing::info!( - "Guardian not fully seeded at startup; retrying in background with \ - exponential backoff" - ); const MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(30); - let mut delay = std::time::Duration::from_secs(1); + let mut delay = std::time::Duration::ZERO; loop { tokio::time::sleep(delay).await; if self.try_seed_guardian_state().await { - tracing::info!("Guardian bootstrap complete via retry task"); + tracing::info!("Guardian bootstrap complete"); return Ok(()); } - delay = (delay * 2).min(MAX_DELAY); + delay = if delay.is_zero() { + std::time::Duration::from_secs(1) + } else { + (delay * 2).min(MAX_DELAY) + }; } }) } From c5604f946d6624c86f65a93448a301f28f4d2cae Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 08:30:10 -0700 Subject: [PATCH 5/8] [guardian] use backon for the bootstrap retry loop Replace the hand-rolled exponential-backoff loop with backon's ExponentialBuilder + Retryable, matching the pattern already used in crates/hashi/src/communication/timeout_and_retry.rs. Also bump the GetGuardianInfo failure log from debug to warn so operators see unreachable-guardian failures at the default log level. --- crates/hashi/src/lib.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index df1ae7ddb..93065f852 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -507,7 +507,7 @@ impl Hashi { let info_pb = match client.get_guardian_info().await { Ok(info) => info, Err(e) => { - tracing::debug!("guardian bootstrap: GetGuardianInfo failed: {e}"); + tracing::warn!("guardian bootstrap: GetGuardianInfo failed: {e}"); return false; } }; @@ -537,26 +537,28 @@ impl Hashi { /// Drive `try_seed_guardian_state` with bounded exponential backoff /// until the first success, then exit. The first attempt fires - /// immediately (delay = 0). No-op when no guardian is configured. + /// immediately. No-op when no guardian is configured. fn start_guardian_bootstrap(self: Arc) -> Service { + use backon::Retryable; Service::new().spawn_aborting(async move { if self.guardian_client().is_none() { return Ok(()); } - const MAX_DELAY: std::time::Duration = std::time::Duration::from_secs(30); - let mut delay = std::time::Duration::ZERO; - loop { - tokio::time::sleep(delay).await; + let policy = backon::ExponentialBuilder::default() + .with_min_delay(std::time::Duration::from_secs(1)) + .with_max_delay(std::time::Duration::from_secs(30)) + .without_max_times(); + let _ = (|| async { if self.try_seed_guardian_state().await { - tracing::info!("Guardian bootstrap complete"); - return Ok(()); - } - delay = if delay.is_zero() { - std::time::Duration::from_secs(1) + Ok::<(), ()>(()) } else { - (delay * 2).min(MAX_DELAY) - }; - } + Err(()) + } + }) + .retry(policy) + .await; + tracing::info!("Guardian bootstrap complete"); + Ok(()) }) } From 0c87e0758b1b7464256e652d191e9135053bed85 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 08:33:39 -0700 Subject: [PATCH 6/8] [guardian] drop doc comments on guardian bootstrap helpers Match the file's convention of short or no doc comments. Function names and bodies carry the meaning here. --- crates/hashi/src/lib.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index 93065f852..301ab9c6f 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -496,10 +496,6 @@ impl Hashi { Ok(service) } - /// Fetch `GetGuardianInfo` once and seed `guardian_signing_pubkey` and - /// `local_limiter` as far as the response allows. Idempotent. Returns - /// `true` once the local limiter slot is populated; the retry loop uses - /// this as its exit signal. async fn try_seed_guardian_state(&self) -> bool { let Some(client) = self.guardian_client() else { return false; @@ -535,9 +531,6 @@ impl Hashi { true } - /// Drive `try_seed_guardian_state` with bounded exponential backoff - /// until the first success, then exit. The first attempt fires - /// immediately. No-op when no guardian is configured. fn start_guardian_bootstrap(self: Arc) -> Service { use backon::Retryable; Service::new().spawn_aborting(async move { From ce05a79f87a6c9d5d6fea44f51c9d09317919537 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 09:11:56 -0700 Subject: [PATCH 7/8] [guardian] drop snap_to and trim local limiter comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit snap_to was a recovery primitive the new design doesn't need — the local limiter is bootstrapped from the guardian at startup and advanced only on accepted consumes, with no force-resync path. Drop the function, its test, and the references in module and apply_consume docs. Also trim docstrings that just restated method names and inline test arithmetic that re-derived constants from the test setup. --- crates/hashi/src/guardian_limiter.rs | 39 +++++----------------------- 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/crates/hashi/src/guardian_limiter.rs b/crates/hashi/src/guardian_limiter.rs index fd1544e4d..5ae4c5a46 100644 --- a/crates/hashi/src/guardian_limiter.rs +++ b/crates/hashi/src/guardian_limiter.rs @@ -1,10 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -//! Local emulator of the guardian's token-bucket rate limiter, used by -//! coin selection to project capacity and by the leader to pick the next -//! `seq` without a guardian round-trip. The guardian remains authoritative; -//! `snap_to` syncs from it on any drift. +//! Local emulator of the guardian's token-bucket limiter. Bootstrapped +//! from the guardian at startup and advanced on every accepted consume. use hashi_types::guardian::LimiterConfig; use hashi_types::guardian::LimiterState; @@ -52,7 +50,6 @@ impl LocalLimiter { *self.state.lock().await } - /// Capacity projected to `timestamp_secs`, clamped to the config ceiling. pub async fn capacity_at(&self, timestamp_secs: u64) -> u64 { let state = *self.state.lock().await; project_capacity(&self.config, &state, timestamp_secs) @@ -62,9 +59,8 @@ impl LocalLimiter { self.state.lock().await.next_seq } - /// Check whether a consume for `(timestamp_secs, amount_sats)` would - /// be accepted, and return the `seq` to submit. Does not mutate state; - /// the caller must call `apply_consume` once the guardian accepts. + /// Returns the `seq` the caller should submit to the guardian. Does + /// not mutate state — call `apply_consume` once the guardian accepts. pub async fn validate_consume( &self, timestamp_secs: u64, @@ -87,8 +83,8 @@ impl LocalLimiter { Ok(state.next_seq) } - /// Mirror an accepted consume into local state. On any error the state - /// is left untouched and the caller should `snap_to` the guardian. + /// Advance local state to match an accepted consume. State is left + /// untouched on error. pub async fn apply_consume( &self, applied_seq: u64, @@ -120,11 +116,6 @@ impl LocalLimiter { guard.next_seq += 1; Ok(()) } - - /// Replace local state with the guardian's authoritative snapshot. - pub async fn snap_to(&self, state: LimiterState) { - *self.state.lock().await = state; - } } fn project_capacity(config: &LimiterConfig, state: &LimiterState, timestamp_secs: u64) -> u64 { @@ -160,9 +151,8 @@ mod tests { #[tokio::test] async fn test_validate_consume_happy_path() { let limiter = make_limiter(0, 0, 7); - // 100 seconds of refill => 100k headroom. let seq = limiter.validate_consume(100, 80_000).await.unwrap(); - assert_eq!(seq, 7, "validate returns the currently-next seq"); + assert_eq!(seq, 7); } #[tokio::test] @@ -175,7 +165,6 @@ mod tests { #[tokio::test] async fn test_validate_rejects_over_capacity() { let limiter = make_limiter(0, 0, 0); - // Only 10k refilled; ask for 50k. let err = limiter.validate_consume(10, 50_000).await.unwrap_err(); match err { LocalLimiterError::InsufficientCapacity { needed, available } => { @@ -207,24 +196,10 @@ mod tests { #[tokio::test] async fn test_capacity_at_refills_linearly_and_clamps_to_ceiling() { let limiter = make_limiter(100_000, 10, 0); - // refill 1000 sats/sec; elapsed 5s; base 100k; refill 5k => 105k. assert_eq!(limiter.capacity_at(15).await, 105_000); - // far-future => clamped at max_bucket_capacity 2_000_000. assert_eq!(limiter.capacity_at(u64::MAX).await, 2_000_000); } - #[tokio::test] - async fn test_snap_to_replaces_state() { - let limiter = make_limiter(1_000, 10, 3); - let new_state = LimiterState { - num_tokens_available: 500_000, - last_updated_at: 999, - next_seq: 42, - }; - limiter.snap_to(new_state).await; - assert_eq!(limiter.snapshot().await, new_state); - } - #[tokio::test] async fn test_next_seq_matches_snapshot() { let limiter = make_limiter(0, 0, 11); From 4bf13f8c752c524568e4d08d850de2625c128ce3 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 10:03:29 -0700 Subject: [PATCH 8/8] [guardian] take expected_seq as input on validate_consume Each committee member will validate against a leader-supplied seq at MPC signing time, so `validate_consume` becomes a pure check rather than a peek-and-return. Returns `()` on success; the seq mismatch case shares the existing `SeqMismatch` variant (with `local`/`incoming` field names that read naturally for both validate and apply). --- crates/hashi/src/guardian_limiter.rs | 46 +++++++++++++++++++--------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/crates/hashi/src/guardian_limiter.rs b/crates/hashi/src/guardian_limiter.rs index 5ae4c5a46..78abcbcf4 100644 --- a/crates/hashi/src/guardian_limiter.rs +++ b/crates/hashi/src/guardian_limiter.rs @@ -22,8 +22,8 @@ pub enum LocalLimiterError { #[error("insufficient capacity: needed {needed}, available {available}")] InsufficientCapacity { needed: u64, available: u64 }, - #[error("seq mismatch: expected {expected}, applied {applied}")] - SeqMismatch { expected: u64, applied: u64 }, + #[error("seq mismatch: local={local}, incoming={incoming}")] + SeqMismatch { local: u64, incoming: u64 }, } impl fmt::Debug for LocalLimiter { @@ -59,14 +59,20 @@ impl LocalLimiter { self.state.lock().await.next_seq } - /// Returns the `seq` the caller should submit to the guardian. Does - /// not mutate state — call `apply_consume` once the guardian accepts. + /// Validate a consume; does not mutate state. pub async fn validate_consume( &self, + expected_seq: u64, timestamp_secs: u64, amount_sats: u64, - ) -> Result { + ) -> Result<(), LocalLimiterError> { let state = *self.state.lock().await; + if expected_seq != state.next_seq { + return Err(LocalLimiterError::SeqMismatch { + local: state.next_seq, + incoming: expected_seq, + }); + } if timestamp_secs < state.last_updated_at { return Err(LocalLimiterError::StaleTimestamp { local_last: state.last_updated_at, @@ -80,7 +86,7 @@ impl LocalLimiter { available: capacity, }); } - Ok(state.next_seq) + Ok(()) } /// Advance local state to match an accepted consume. State is left @@ -94,8 +100,8 @@ impl LocalLimiter { let mut guard = self.state.lock().await; if applied_seq != guard.next_seq { return Err(LocalLimiterError::SeqMismatch { - expected: guard.next_seq, - applied: applied_seq, + local: guard.next_seq, + incoming: applied_seq, }); } if timestamp_secs < guard.last_updated_at { @@ -151,21 +157,33 @@ mod tests { #[tokio::test] async fn test_validate_consume_happy_path() { let limiter = make_limiter(0, 0, 7); - let seq = limiter.validate_consume(100, 80_000).await.unwrap(); - assert_eq!(seq, 7); + limiter.validate_consume(7, 100, 80_000).await.unwrap(); + } + + #[tokio::test] + async fn test_validate_rejects_seq_mismatch() { + let limiter = make_limiter(100_000, 0, 5); + let err = limiter.validate_consume(7, 100, 1_000).await.unwrap_err(); + match err { + LocalLimiterError::SeqMismatch { local, incoming } => { + assert_eq!(local, 5); + assert_eq!(incoming, 7); + } + other => panic!("unexpected error: {other:?}"), + } } #[tokio::test] async fn test_validate_rejects_stale_timestamp() { let limiter = make_limiter(0, 100, 0); - let err = limiter.validate_consume(50, 0).await.unwrap_err(); + let err = limiter.validate_consume(0, 50, 0).await.unwrap_err(); assert!(matches!(err, LocalLimiterError::StaleTimestamp { .. })); } #[tokio::test] async fn test_validate_rejects_over_capacity() { let limiter = make_limiter(0, 0, 0); - let err = limiter.validate_consume(10, 50_000).await.unwrap_err(); + let err = limiter.validate_consume(0, 10, 50_000).await.unwrap_err(); match err { LocalLimiterError::InsufficientCapacity { needed, available } => { assert_eq!(needed, 50_000); @@ -178,8 +196,8 @@ mod tests { #[tokio::test] async fn test_apply_bumps_seq_and_updates_last_updated_at() { let limiter = make_limiter(0, 0, 42); - let seq = limiter.validate_consume(100, 80_000).await.unwrap(); - limiter.apply_consume(seq, 100, 80_000).await.unwrap(); + limiter.validate_consume(42, 100, 80_000).await.unwrap(); + limiter.apply_consume(42, 100, 80_000).await.unwrap(); let snap = limiter.snapshot().await; assert_eq!(snap.next_seq, 43); assert_eq!(snap.last_updated_at, 100);