diff --git a/crates/hashi-guardian/src/enclave.rs b/crates/hashi-guardian/src/enclave.rs index f44f2d916..0ea84a51d 100644 --- a/crates/hashi-guardian/src/enclave.rs +++ b/crates/hashi-guardian/src/enclave.rs @@ -327,6 +327,16 @@ impl EnclaveState { guard.consume(seq, timestamp, amount_sats)?; Ok(LimiterGuard::new(guard)) } + + pub async fn limiter_state(&self) -> Option { + let limiter = self.rate_limiter.get()?; + Some(*limiter.lock().await.state()) + } + + pub async fn limiter_config(&self) -> Option { + let limiter = self.rate_limiter.get()?; + Some(*limiter.lock().await.config()) + } } impl Enclave { diff --git a/crates/hashi-guardian/src/getters.rs b/crates/hashi-guardian/src/getters.rs index 65320a6ad..3e213f8c5 100644 --- a/crates/hashi-guardian/src/getters.rs +++ b/crates/hashi-guardian/src/getters.rs @@ -27,10 +27,14 @@ pub async fn get_guardian_info(enclave: Arc) -> GuardianResult, + /// Current rate limiter state (if initialized). + pub limiter_state: Option, + /// Immutable limiter configuration (if initialized). + pub limiter_config: Option, } /// TODO: Add network? diff --git a/crates/hashi-types/src/guardian/proto_conversions.rs b/crates/hashi-types/src/guardian/proto_conversions.rs index 8b73da531..7f4166e53 100644 --- a/crates/hashi-types/src/guardian/proto_conversions.rs +++ b/crates/hashi-types/src/guardian/proto_conversions.rs @@ -21,6 +21,7 @@ use super::GuardianSigned; use super::HashiCommittee; use super::HashiCommitteeMember; use super::HashiSigned; +use super::LimiterConfig; use super::LimiterState; use super::OperatorInitRequest; use super::ProvisionerInitRequest; @@ -200,10 +201,15 @@ impl TryFrom for GetGuardianInfoResponse { let signed_info_pb = resp.signed_info.ok_or_else(|| missing("signed_info"))?; let signed_info = pb_to_signed_guardian_info(signed_info_pb)?; + let limiter_state = resp.limiter_state.map(pb_to_limiter_state).transpose()?; + let limiter_config = resp.limiter_config.map(pb_to_limiter_config).transpose()?; + Ok(GetGuardianInfoResponse { attestation: attestation.to_vec(), signing_pub_key, signed_info, + limiter_state, + limiter_config, }) } } @@ -335,6 +341,8 @@ pub fn get_guardian_info_response_to_pb(r: GetGuardianInfoResponse) -> pb::GetGu attestation: Some(r.attestation.into()), signing_pub_key: Some(r.signing_pub_key.to_bytes().to_vec().into()), signed_info: Some(signed_guardian_info_to_pb(r.signed_info)), + limiter_state: r.limiter_state.map(limiter_state_to_pb), + limiter_config: r.limiter_config.map(limiter_config_to_pb), } } @@ -659,6 +667,26 @@ fn limiter_state_to_pb(state: LimiterState) -> pb::LimiterState { } } +fn pb_to_limiter_config(cfg: pb::LimiterConfig) -> GuardianResult { + let refill_rate = cfg + .refill_rate_sats_per_sec + .ok_or_else(|| missing("refill_rate_sats_per_sec"))?; + let max_bucket_capacity = cfg + .max_bucket_capacity_sats + .ok_or_else(|| missing("max_bucket_capacity_sats"))?; + Ok(LimiterConfig { + refill_rate, + max_bucket_capacity, + }) +} + +fn limiter_config_to_pb(cfg: LimiterConfig) -> pb::LimiterConfig { + pb::LimiterConfig { + refill_rate_sats_per_sec: Some(cfg.refill_rate), + max_bucket_capacity_sats: Some(cfg.max_bucket_capacity), + } +} + fn pb_to_hashi_committee(c: pb::Committee) -> GuardianResult { let epoch = c.epoch.ok_or_else(|| missing("epoch"))?; diff --git a/crates/hashi-types/src/guardian/test_utils.rs b/crates/hashi-types/src/guardian/test_utils.rs index eae27f219..41e4aeec9 100644 --- a/crates/hashi-types/src/guardian/test_utils.rs +++ b/crates/hashi-types/src/guardian/test_utils.rs @@ -85,6 +85,8 @@ impl GetGuardianInfoResponse { attestation: "abcd".as_bytes().to_vec(), signing_pub_key, signed_info: GuardianSigned::new(info, &signing_key, 1234), + limiter_state: None, + limiter_config: None, } } } diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin index 970066696..7c8bff753 100644 Binary files a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin and b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin differ diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs index 8cc4ed798..927aeb176 100644 --- a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs +++ b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs @@ -1008,6 +1008,12 @@ pub struct GetGuardianInfoResponse { /// Signed guardian info (includes server version, encryption pubkey, and optional S3/bucket info). #[prost(message, optional, tag = "3")] pub signed_info: ::core::option::Option, + /// Current rate limiter state (if initialized). + #[prost(message, optional, tag = "4")] + pub limiter_state: ::core::option::Option, + /// Immutable limiter configuration (if initialized). + #[prost(message, optional, tag = "5")] + pub limiter_config: ::core::option::Option, } /// Guardian-signed wrapper around `GuardianInfoData`. #[derive(Clone, PartialEq, ::prost::Message)] @@ -1180,6 +1186,16 @@ pub struct LimiterState { #[prost(uint64, optional, tag = "3")] pub next_seq: ::core::option::Option, } +/// Immutable limiter configuration. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LimiterConfig { + /// Token refill rate in sats per second. + #[prost(uint64, optional, tag = "1")] + pub refill_rate_sats_per_sec: ::core::option::Option, + /// Maximum bucket capacity in sats. + #[prost(uint64, optional, tag = "2")] + pub max_bucket_capacity_sats: ::core::option::Option, +} #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] pub struct ProvisionerInitResponse {} /// Hashi-signed wrapper for the withdrawal request. diff --git a/crates/hashi/src/grpc/guardian_client.rs b/crates/hashi/src/grpc/guardian_client.rs index 6f55bd2dd..a640f80ec 100644 --- a/crates/hashi/src/grpc/guardian_client.rs +++ b/crates/hashi/src/grpc/guardian_client.rs @@ -37,4 +37,14 @@ impl GuardianClient { pub fn guardian_service_client(&self) -> GuardianServiceClient { GuardianServiceClient::new(self.channel.clone()) } + + pub async fn get_guardian_info( + &self, + ) -> Result { + let response = self + .guardian_service_client() + .get_guardian_info(hashi_types::proto::GetGuardianInfoRequest {}) + .await?; + Ok(response.into_inner()) + } } diff --git a/crates/hashi/src/guardian_limiter.rs b/crates/hashi/src/guardian_limiter.rs new file mode 100644 index 000000000..78abcbcf4 --- /dev/null +++ b/crates/hashi/src/guardian_limiter.rs @@ -0,0 +1,226 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +//! Local emulator of the guardian's token-bucket limiter. Bootstrapped +//! from the guardian at startup and advanced on every accepted consume. + +use hashi_types::guardian::LimiterConfig; +use hashi_types::guardian::LimiterState; +use std::fmt; +use tokio::sync::Mutex; + +pub struct LocalLimiter { + config: LimiterConfig, + state: Mutex, +} + +#[derive(Debug, thiserror::Error)] +pub enum LocalLimiterError { + #[error("stale timestamp: local last_updated_at={local_last}, incoming={incoming}")] + StaleTimestamp { local_last: u64, incoming: u64 }, + + #[error("insufficient capacity: needed {needed}, available {available}")] + InsufficientCapacity { needed: u64, available: u64 }, + + #[error("seq mismatch: local={local}, incoming={incoming}")] + SeqMismatch { local: u64, incoming: u64 }, +} + +impl fmt::Debug for LocalLimiter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LocalLimiter") + .field("config", &self.config) + .finish_non_exhaustive() + } +} + +impl LocalLimiter { + pub fn new(config: LimiterConfig, state: LimiterState) -> Self { + Self { + config, + state: Mutex::new(state), + } + } + + pub fn config(&self) -> &LimiterConfig { + &self.config + } + + pub async fn snapshot(&self) -> LimiterState { + *self.state.lock().await + } + + pub async fn capacity_at(&self, timestamp_secs: u64) -> u64 { + let state = *self.state.lock().await; + project_capacity(&self.config, &state, timestamp_secs) + } + + pub async fn next_seq(&self) -> u64 { + self.state.lock().await.next_seq + } + + /// Validate a consume; does not mutate state. + pub async fn validate_consume( + &self, + expected_seq: u64, + timestamp_secs: u64, + amount_sats: u64, + ) -> Result<(), LocalLimiterError> { + let state = *self.state.lock().await; + if expected_seq != state.next_seq { + return Err(LocalLimiterError::SeqMismatch { + local: state.next_seq, + incoming: expected_seq, + }); + } + if timestamp_secs < state.last_updated_at { + return Err(LocalLimiterError::StaleTimestamp { + local_last: state.last_updated_at, + incoming: timestamp_secs, + }); + } + let capacity = project_capacity(&self.config, &state, timestamp_secs); + if capacity < amount_sats { + return Err(LocalLimiterError::InsufficientCapacity { + needed: amount_sats, + available: capacity, + }); + } + Ok(()) + } + + /// Advance local state to match an accepted consume. State is left + /// untouched on error. + pub async fn apply_consume( + &self, + applied_seq: u64, + timestamp_secs: u64, + amount_sats: u64, + ) -> Result<(), LocalLimiterError> { + let mut guard = self.state.lock().await; + if applied_seq != guard.next_seq { + return Err(LocalLimiterError::SeqMismatch { + local: guard.next_seq, + incoming: applied_seq, + }); + } + if timestamp_secs < guard.last_updated_at { + return Err(LocalLimiterError::StaleTimestamp { + local_last: guard.last_updated_at, + incoming: timestamp_secs, + }); + } + let capacity = project_capacity(&self.config, &guard, timestamp_secs); + if capacity < amount_sats { + return Err(LocalLimiterError::InsufficientCapacity { + needed: amount_sats, + available: capacity, + }); + } + guard.num_tokens_available = capacity - amount_sats; + guard.last_updated_at = timestamp_secs; + guard.next_seq += 1; + Ok(()) + } +} + +fn project_capacity(config: &LimiterConfig, state: &LimiterState, timestamp_secs: u64) -> u64 { + let elapsed = timestamp_secs.saturating_sub(state.last_updated_at); + let refilled = elapsed.saturating_mul(config.refill_rate); + state + .num_tokens_available + .saturating_add(refilled) + .min(config.max_bucket_capacity) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_limiter( + num_tokens_available: u64, + last_updated_at: u64, + next_seq: u64, + ) -> LocalLimiter { + let config = LimiterConfig { + refill_rate: 1_000, + max_bucket_capacity: 2_000_000, + }; + let state = LimiterState { + num_tokens_available, + last_updated_at, + next_seq, + }; + LocalLimiter::new(config, state) + } + + #[tokio::test] + async fn test_validate_consume_happy_path() { + let limiter = make_limiter(0, 0, 7); + limiter.validate_consume(7, 100, 80_000).await.unwrap(); + } + + #[tokio::test] + async fn test_validate_rejects_seq_mismatch() { + let limiter = make_limiter(100_000, 0, 5); + let err = limiter.validate_consume(7, 100, 1_000).await.unwrap_err(); + match err { + LocalLimiterError::SeqMismatch { local, incoming } => { + assert_eq!(local, 5); + assert_eq!(incoming, 7); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn test_validate_rejects_stale_timestamp() { + let limiter = make_limiter(0, 100, 0); + let err = limiter.validate_consume(0, 50, 0).await.unwrap_err(); + assert!(matches!(err, LocalLimiterError::StaleTimestamp { .. })); + } + + #[tokio::test] + async fn test_validate_rejects_over_capacity() { + let limiter = make_limiter(0, 0, 0); + let err = limiter.validate_consume(0, 10, 50_000).await.unwrap_err(); + match err { + LocalLimiterError::InsufficientCapacity { needed, available } => { + assert_eq!(needed, 50_000); + assert_eq!(available, 10_000); + } + other => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn test_apply_bumps_seq_and_updates_last_updated_at() { + let limiter = make_limiter(0, 0, 42); + limiter.validate_consume(42, 100, 80_000).await.unwrap(); + limiter.apply_consume(42, 100, 80_000).await.unwrap(); + let snap = limiter.snapshot().await; + assert_eq!(snap.next_seq, 43); + assert_eq!(snap.last_updated_at, 100); + assert_eq!(snap.num_tokens_available, 20_000); + } + + #[tokio::test] + async fn test_apply_rejects_seq_mismatch() { + let limiter = make_limiter(0, 0, 0); + let err = limiter.apply_consume(5, 100, 1_000).await.unwrap_err(); + assert!(matches!(err, LocalLimiterError::SeqMismatch { .. })); + } + + #[tokio::test] + async fn test_capacity_at_refills_linearly_and_clamps_to_ceiling() { + let limiter = make_limiter(100_000, 10, 0); + assert_eq!(limiter.capacity_at(15).await, 105_000); + assert_eq!(limiter.capacity_at(u64::MAX).await, 2_000_000); + } + + #[tokio::test] + async fn test_next_seq_matches_snapshot() { + let limiter = make_limiter(0, 0, 11); + assert_eq!(limiter.next_seq().await, 11); + } +} diff --git a/crates/hashi/src/lib.rs b/crates/hashi/src/lib.rs index 454a5da60..301ab9c6f 100644 --- a/crates/hashi/src/lib.rs +++ b/crates/hashi/src/lib.rs @@ -18,6 +18,7 @@ pub mod constants; pub mod db; pub mod deposits; pub mod grpc; +pub mod guardian_limiter; pub mod leader; pub mod metrics; pub mod mpc; @@ -50,6 +51,8 @@ pub struct Hashi { btc_monitor: OnceLock, screener_client: OnceLock>, guardian_client: OnceLock>, + guardian_signing_pubkey: OnceLock>, + local_limiter: RwLock>>, /// Reconfig completion signatures by epoch. reconfig_signatures: RwLock>>, } @@ -72,6 +75,8 @@ impl Hashi { btc_monitor: OnceLock::new(), screener_client: OnceLock::new(), guardian_client: OnceLock::new(), + guardian_signing_pubkey: OnceLock::new(), + local_limiter: RwLock::new(None), reconfig_signatures: RwLock::new(HashMap::new()), })) } @@ -97,6 +102,8 @@ impl Hashi { btc_monitor: OnceLock::new(), screener_client: OnceLock::new(), guardian_client: OnceLock::new(), + guardian_signing_pubkey: OnceLock::new(), + local_limiter: RwLock::new(None), reconfig_signatures: RwLock::new(HashMap::new()), })) } @@ -192,6 +199,16 @@ impl Hashi { self.guardian_client.get().and_then(|opt| opt.as_ref()) } + pub fn guardian_signing_pubkey(&self) -> Option<&hashi_types::guardian::GuardianPubKey> { + self.guardian_signing_pubkey + .get() + .and_then(|opt| opt.as_ref()) + } + + pub fn local_limiter(&self) -> Option> { + self.local_limiter.read().unwrap().clone() + } + async fn initialize_onchain_state(&self) -> anyhow::Result { let (onchain_state, service) = onchain::OnchainState::new( self.config.sui_rpc.as_deref().unwrap(), @@ -466,17 +483,78 @@ impl Hashi { let (_http_addr, http_service) = grpc::HttpService::new(self.clone()).start().await; let leader_service = leader::LeaderService::new(self.clone()).start(); let mpc_service = mpc_service.start(); + let guardian_bootstrap_service = self.clone().start_guardian_bootstrap(); let service = Service::new() .merge(onchain_service) .merge(btc_monitor_service) .merge(http_service) .merge(leader_service) - .merge(mpc_service); + .merge(mpc_service) + .merge(guardian_bootstrap_service); Ok(service) } + async fn try_seed_guardian_state(&self) -> bool { + let Some(client) = self.guardian_client() else { + return false; + }; + let info_pb = match client.get_guardian_info().await { + Ok(info) => info, + Err(e) => { + tracing::warn!("guardian bootstrap: GetGuardianInfo failed: {e}"); + return false; + } + }; + let info = match hashi_types::guardian::GetGuardianInfoResponse::try_from(info_pb) { + Ok(info) => info, + Err(e) => { + tracing::warn!("guardian bootstrap: parse failed: {e:?}"); + return false; + } + }; + let _ = self.guardian_signing_pubkey.set(Some(info.signing_pub_key)); + let (Some(state), Some(config)) = (info.limiter_state, info.limiter_config) else { + tracing::debug!("guardian bootstrap: guardian has no limiter yet"); + return false; + }; + let mut slot = self.local_limiter.write().unwrap(); + if slot.is_none() { + tracing::info!( + ?state, + ?config, + "Local guardian limiter seeded from GetGuardianInfo", + ); + *slot = Some(Arc::new(guardian_limiter::LocalLimiter::new(config, state))); + } + true + } + + fn start_guardian_bootstrap(self: Arc) -> Service { + use backon::Retryable; + Service::new().spawn_aborting(async move { + if self.guardian_client().is_none() { + return Ok(()); + } + let policy = backon::ExponentialBuilder::default() + .with_min_delay(std::time::Duration::from_secs(1)) + .with_max_delay(std::time::Duration::from_secs(30)) + .without_max_times(); + let _ = (|| async { + if self.try_seed_guardian_state().await { + Ok::<(), ()>(()) + } else { + Err(()) + } + }) + .retry(policy) + .await; + tracing::info!("Guardian bootstrap complete"); + Ok(()) + }) + } + pub(crate) fn is_in_current_committee(&self) -> bool { let address = match self.config.validator_address() { Ok(a) => a,