diff --git a/crates/hashi-guardian/src/init.rs b/crates/hashi-guardian/src/init.rs index d8b2ebc40..358530584 100644 --- a/crates/hashi-guardian/src/init.rs +++ b/crates/hashi-guardian/src/init.rs @@ -8,15 +8,26 @@ use hashi_types::guardian::crypto::combine_shares; use hashi_types::guardian::crypto::commit_share; use hashi_types::guardian::crypto::decrypt_share; use hashi_types::guardian::crypto::Share; +use hashi_types::guardian::s3_utils::S3HourScopedDirectory; +use hashi_types::guardian::time_utils::now_timestamp_secs; use hashi_types::guardian::InitLogMessage::OIAttestationUnsigned; use hashi_types::guardian::InitLogMessage::OIGuardianInfo; use hashi_types::guardian::InitLogMessage::PIEnclaveFullyInitialized; use hashi_types::guardian::InitLogMessage::PISuccess; use hashi_types::guardian::*; +use std::collections::HashMap; use std::sync::Arc; use tracing::info; +use tracing::warn; use GuardianError::*; +/// How many hour-scoped directories back we scan when rehydrating the +/// wid-keyed response cache from prior-session withdrawal logs. Matches +/// the Object Lock retention window for withdrawal logs (see +/// `S3_OBJECT_LOCK_DURATION_WITHDRAW`) so we never try to read a +/// directory whose entries have expired out from under us. +const REHYDRATE_LOG_HOURS: u64 = 2; + /// Receives S3 API keys & share commitments. /// Returns an error for malformed requests / dup call & panics for the rest. pub async fn operator_init( @@ -178,6 +189,22 @@ pub async fn provisioner_init( if current_share_count >= THRESHOLD { let shares_vec: Vec = received_shares.iter().cloned().collect(); finalize_init(&shares_vec, &enclave, request.into_state()).await; + + // Rehydrate the wid-keyed response cache from prior-session + // withdrawal logs BEFORE we start serving withdrawals. Retries + // of prior-session wids will hit the cache and return the + // (re-signed) response instead of going through consume again + // — preventing double debits across a guardian restart. The + // KP-side rehydration of `LimiterState` already guarantees + // `next_seq` is correct; this closes the remaining gap. + if let Err(e) = rehydrate_response_cache(&enclave).await { + warn!( + error = %e, + "Response cache rehydration failed; proceeding with empty cache. \ + Retries of prior-session withdrawals may be rejected.", + ); + } + // Log to S3 indicating that withdrawals can be expected henceforth enclave .log_init(PIEnclaveFullyInitialized) @@ -194,6 +221,129 @@ pub async fn provisioner_init( Ok(()) } +/// Read prior-session withdrawal success logs from S3 and repopulate the +/// current session's wid-keyed response cache. +/// +/// Each cached entry is re-signed with the new session's Ed25519 key, +/// so clients that fetch the current guardian's signing pubkey and +/// verify responses will accept them. Errors are non-fatal — a cache +/// miss at most causes a double-debit on retry, which is the same +/// behavior we had before this function existed. +async fn rehydrate_response_cache(enclave: &Arc) -> GuardianResult<()> { + let logger = enclave.config.s3_logger()?; + let current_session_id = enclave.s3_session_id(); + let start_secs = now_timestamp_secs().saturating_sub(REHYDRATE_LOG_HOURS * 60 * 60); + + // Resolve the signing pubkey of each prior session we will read logs + // from. We ignore sessions whose attestation can't be fetched — + // logs we can't verify get skipped rather than trusted blindly. + let prior_session_pubkeys = + resolve_prior_session_pubkeys(logger, ¤t_session_id, start_secs).await?; + if prior_session_pubkeys.is_empty() { + info!("No prior sessions detected for response-cache rehydration"); + return Ok(()); + } + + let mut rehydrated = 0usize; + for hour in 0..=REHYDRATE_LOG_HOURS { + let dir = S3HourScopedDirectory::new(S3_DIR_WITHDRAW, start_secs + hour * 60 * 60); + let logs = match logger.list_all_objects_in_dir::(&dir).await { + Ok(logs) => logs, + Err(e) => { + warn!( + dir = %dir, + error = %e, + "Skipping withdrawal log directory during cache rehydration", + ); + continue; + } + }; + for log in logs { + let Some(pubkey) = prior_session_pubkeys.get(&log.session_id) else { + continue; + }; + let verified = match log.verify(pubkey) { + Ok(v) => v, + Err(e) => { + warn!( + session_id = %pubkey_session(pubkey), + error = ?e, + "Skipping unverifiable withdrawal log during cache rehydration", + ); + continue; + } + }; + let LogMessage::Withdrawal(withdrawal_message) = verified.message else { + continue; + }; + let WithdrawalLogMessage::Success { + request_data, + response, + .. + } = *withdrawal_message + else { + continue; + }; + // Re-sign with the CURRENT session's key so clients that + // verify against the new guardian pubkey accept the response. + let signed = enclave.sign(response); + enclave.state.cache_response(request_data.wid, signed); + rehydrated += 1; + } + } + + info!( + rehydrated, + "Rehydrated wid-keyed response cache from prior-session logs" + ); + Ok(()) +} + +/// For every prior session whose heartbeats we can see in the recent +/// window, fetch its OperatorInit attestation and return its Ed25519 +/// signing pubkey. Sessions whose attestation is unavailable are +/// silently skipped (we'd be unable to verify their logs anyway). +async fn resolve_prior_session_pubkeys( + logger: &S3Logger, + current_session_id: &str, + start_secs: u64, +) -> GuardianResult> { + let mut session_ids = std::collections::HashSet::new(); + for hour in 0..=REHYDRATE_LOG_HOURS { + let dir = S3HourScopedDirectory::new(S3_DIR_HEARTBEAT, start_secs + hour * 60 * 60); + let logs = match logger.list_all_objects_in_dir::(&dir).await { + Ok(logs) => logs, + Err(_) => continue, + }; + for log in logs { + if log.session_id != current_session_id { + session_ids.insert(log.session_id); + } + } + } + + let mut out = HashMap::new(); + for session_id in session_ids { + match logger.get_attestation(&session_id).await { + Ok((_attestation, pubkey)) => { + out.insert(session_id, pubkey); + } + Err(e) => { + warn!( + session_id, + error = %e, + "Could not resolve prior session signing pubkey; logs from this session will be skipped", + ); + } + } + } + Ok(out) +} + +fn pubkey_session(pubkey: &GuardianPubKey) -> String { + session_id_from_signing_pubkey(pubkey) +} + /// Finalize the initialization process. /// Panics upon an error as the enclaves state is irrecoverable at this point. async fn finalize_init( diff --git a/crates/hashi-guardian/src/test_utils.rs b/crates/hashi-guardian/src/test_utils.rs index 4b1199005..1e05013ae 100644 --- a/crates/hashi-guardian/src/test_utils.rs +++ b/crates/hashi-guardian/src/test_utils.rs @@ -24,6 +24,7 @@ use std::sync::Arc; /// Mock S3 logger for use in API calls post operator_init, /// e.g., provisioner_init, withdrawals. pub fn mock_logger() -> S3Logger { + use aws_sdk_s3::operation::list_object_versions::ListObjectVersionsOutput; use aws_sdk_s3::operation::put_object::PutObjectOutput; use aws_sdk_s3::Client; use aws_smithy_mocks::mock; @@ -35,7 +36,13 @@ pub fn mock_logger() -> S3Logger { // The `then_output` helper creates a "simple" rule that repeats indefinitely. let put_ok = mock!(Client::put_object).then_output(|| PutObjectOutput::builder().build()); - let client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[&put_ok]); + // `provisioner_init` runs a post-finalize rehydration pass that lists + // prior-session withdrawal logs. Unit tests start against a fresh + // bucket (no prior sessions) — return an empty page. + let list_empty = mock!(Client::list_object_versions) + .then_output(|| ListObjectVersionsOutput::builder().build()); + + let client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[&put_ok, &list_empty]); let config = S3Config::mock_for_testing(); diff --git a/crates/hashi-guardian/src/withdraw.rs b/crates/hashi-guardian/src/withdraw.rs index 4a0a2bb8d..e351187f1 100644 --- a/crates/hashi-guardian/src/withdraw.rs +++ b/crates/hashi-guardian/src/withdraw.rs @@ -44,11 +44,17 @@ pub async fn standard_withdrawal( match normal_withdrawal_inner(enclave.clone(), signed_request).await { Ok((txid, response, limiter_guard)) => { info!("Withdrawal {} processed successfully. Logging to S3.", wid); + // Snapshot the post-consume limiter state into the log so a + // future session can rehydrate the rate limiter from S3 alone. + // The LogRecord's signature attests to `response`, which is + // what future sessions re-sign when rehydrating the wid cache. + let limiter_state_post = limiter_guard.limiter_state(); let msg = WithdrawalLogMessage::Success { txid, request_data: unsigned_request, request_sign: request_signature, response: response.clone(), + limiter_state_post: Some(limiter_state_post), }; log_withdrawal_success(enclave.as_ref(), wid, msg, limiter_guard).await?; let signed_response = enclave.sign(response); @@ -205,6 +211,13 @@ impl LimiterGuard { } } + /// Snapshot of the limiter state AFTER the successful consume this guard + /// wraps. Persisted into the withdrawal log so a subsequent session can + /// rehydrate the bucket state on boot. + pub fn limiter_state(&self) -> hashi_types::guardian::LimiterState { + *self.guard.state() + } + /// Mark this withdrawal as successful. Prevents revert on drop. pub fn commit(mut self) { self.committed = true; diff --git a/crates/hashi-monitor/src/kp/heartbeat_checks.rs b/crates/hashi-monitor/src/kp/heartbeat_checks.rs index 6135974bd..19bc8ea7b 100644 --- a/crates/hashi-monitor/src/kp/heartbeat_checks.rs +++ b/crates/hashi-monitor/src/kp/heartbeat_checks.rs @@ -41,8 +41,7 @@ pub struct GuardianSessionInfo { /// /// Returns the selected live session id if all invariants pass. pub async fn kp_heartbeat_audit(s3_client: &S3Logger) -> anyhow::Result { - let recent_heartbeats = read_recent_heartbeats(s3_client).await?; - let summary = summarize_heartbeats_by_session(recent_heartbeats)?; + let summary = collect_recent_sessions(s3_client).await?; let now = now_unix_seconds(); select_live_session( &summary, @@ -52,6 +51,17 @@ pub async fn kp_heartbeat_audit(s3_client: &S3Logger) -> anyhow::Result ) } +/// Return all guardian sessions observed via heartbeats in the last ~2 hours +/// with their first/last heartbeat timestamps. Used by restart rehydration +/// to enumerate prior sessions whose withdrawal logs contribute to the +/// authoritative LimiterState. +pub async fn collect_recent_sessions( + s3_client: &S3Logger, +) -> anyhow::Result> { + let recent_heartbeats = read_recent_heartbeats(s3_client).await?; + summarize_heartbeats_by_session(recent_heartbeats) +} + async fn read_recent_heartbeats(s3_client: &S3Logger) -> anyhow::Result> { // Read from the current and next hour-scoped prefixes to cover clock-boundary cases. let one_hour_ago = now_unix_seconds().saturating_sub(60 * 60); diff --git a/crates/hashi-monitor/src/kp/mod.rs b/crates/hashi-monitor/src/kp/mod.rs index a365e6656..1cf0f80a7 100644 --- a/crates/hashi-monitor/src/kp/mod.rs +++ b/crates/hashi-monitor/src/kp/mod.rs @@ -5,24 +5,37 @@ use hpke::Deserializable; mod config; mod heartbeat_checks; +use crate::domain::now_unix_seconds; use crate::kp::config::GuardianConfig; +use crate::rpc::guardian::GuardianLogDir; +use crate::rpc::guardian::GuardianPollerCore; use anyhow::Context; use hashi_guardian::s3_logger::S3Logger; use hashi_types::guardian::EncPubKey; use hashi_types::guardian::GetGuardianInfoResponse; use hashi_types::guardian::GuardianInfo; use hashi_types::guardian::LimiterState; +use hashi_types::guardian::LogMessage; use hashi_types::guardian::ProvisionerInitRequest; use hashi_types::guardian::ProvisionerInitState; +use hashi_types::guardian::WithdrawalConfig; +use hashi_types::guardian::WithdrawalLogMessage; use hashi_types::guardian::proto_conversions::provisioner_init_request_to_pb; use hashi_types::guardian::session_id_from_signing_pubkey; use hashi_types::guardian::verify_enclave_attestation; use hashi_types::proto as pb; use rand::thread_rng; +use std::collections::HashSet; use tracing::info; +use tracing::warn; pub use config::ProvisionerConfig; +/// Read up to this many hours of withdrawal logs when rehydrating limiter +/// state on a new session's ProvisionerInit. Matches the retention window +/// for withdrawal logs so we never race Object Lock expiry on boot. +const REHYDRATE_HISTORY_HOURS: u64 = 2; + pub async fn run(cfg: ProvisionerConfig) -> anyhow::Result<()> { let s3_client = S3Logger::new_checked(&cfg.s3) .await @@ -38,17 +51,17 @@ pub async fn run(cfg: ProvisionerConfig) -> anyhow::Result<()> { expected_guardian_config.ensure_matches_info(&guardian_info)?; info!(session_id, "init checks passed for selected session"); - // TODO: replace mock limiter state with actual state from S3 logs. + // 3. Rehydrate LimiterState from prior-session withdrawal logs. The + // prior session already consumed from the bucket and advanced + // `next_seq`; if we initialized the new session at (max_capacity, + // 0, 0) we would let the bucket drift past the real cap. let committee = cfg.hashi_committee.try_into()?; - let mock_limiter_state = LimiterState { - num_tokens_available: cfg.withdrawal_config.max_bucket_capacity_sats, - last_updated_at: 0, - next_seq: 0, - }; + let limiter_state = + rehydrate_limiter_state(&s3_client, &session_id, &cfg.withdrawal_config).await?; let state = ProvisionerInitState::new( committee, cfg.withdrawal_config, - mock_limiter_state, + limiter_state, cfg.hashi_btc_master_pubkey, ) .map_err(|e| anyhow::anyhow!(e))?; @@ -81,6 +94,118 @@ pub async fn run(cfg: ProvisionerConfig) -> anyhow::Result<()> { Ok(()) } +/// Reconstruct the authoritative `LimiterState` for the new session from +/// prior-session withdrawal logs in S3. +/// +/// A prior session is any session whose heartbeats appear in the recent +/// window but that is not the newly-provisioned `live_session_id`. Each +/// successful withdrawal is logged with a `limiter_state_post` snapshot, +/// so we pick the highest-seq snapshot across all prior sessions. If no +/// prior success logs are found we fall back to a fresh max-capacity +/// state (first-ever provisioning, or all prior logs expired). +pub async fn rehydrate_limiter_state( + s3_client: &S3Logger, + live_session_id: &str, + withdrawal_config: &WithdrawalConfig, +) -> anyhow::Result { + let sessions = heartbeat_checks::collect_recent_sessions(s3_client).await?; + let prior_sessions: HashSet = sessions + .into_iter() + .map(|s| s.session_id) + .filter(|id| id != live_session_id) + .collect(); + + if prior_sessions.is_empty() { + info!("No prior sessions found; starting fresh at max bucket capacity"); + return Ok(fresh_limiter_state(withdrawal_config)); + } + + info!( + prior_session_count = prior_sessions.len(), + "Rehydrating LimiterState from prior session withdrawal logs", + ); + + let start = now_unix_seconds().saturating_sub(REHYDRATE_HISTORY_HOURS * 60 * 60); + let mut poller = + GuardianPollerCore::from_s3_client(s3_client.clone(), start, GuardianLogDir::Withdraw); + + let mut best_seq: Option = None; + let mut best_state: Option = None; + + for _ in 0..=REHYDRATE_HISTORY_HOURS { + let logs = match poller.read_cur_dir().await { + Ok(logs) => logs, + Err(e) => { + // A missing directory (no withdrawals in that hour) is a + // normal condition — log and move on. + warn!( + error = %e, + "Skipping withdrawal log directory during rehydration", + ); + poller.advance_cursor(); + continue; + } + }; + + for log in logs { + if !prior_sessions.contains(&log.session_id) { + continue; + } + let LogMessage::Withdrawal(withdrawal_message) = log.message else { + continue; + }; + let WithdrawalLogMessage::Success { + request_data, + limiter_state_post, + .. + } = *withdrawal_message + else { + continue; + }; + let Some(snap) = limiter_state_post else { + // Old-format logs (pre-PR-5) have no snapshot; skip. + continue; + }; + let seq = request_data.seq; + if best_seq.is_none_or(|s| seq > s) { + best_seq = Some(seq); + best_state = Some(snap); + } + } + + poller.advance_cursor(); + } + + match best_state { + Some(state) => { + info!( + next_seq = state.next_seq, + last_updated_at = state.last_updated_at, + num_tokens_available = state.num_tokens_available, + "Rehydrated LimiterState from prior session logs", + ); + Ok(state) + } + None => { + warn!( + "No prior withdrawal success logs found; starting fresh \ + at max bucket capacity. Retries of prior-session \ + withdrawals will be rejected rather than idempotently \ + served.", + ); + Ok(fresh_limiter_state(withdrawal_config)) + } + } +} + +fn fresh_limiter_state(withdrawal_config: &WithdrawalConfig) -> LimiterState { + LimiterState { + num_tokens_available: withdrawal_config.max_bucket_capacity_sats, + last_updated_at: 0, + next_seq: 0, + } +} + /// Implements check B of IOP-225. pub async fn get_guardian_info_from_s3( s3_client: &S3Logger, diff --git a/crates/hashi-types/src/guardian/limiter.rs b/crates/hashi-types/src/guardian/limiter.rs index 3b8deef82..9bda00bb9 100644 --- a/crates/hashi-types/src/guardian/limiter.rs +++ b/crates/hashi-types/src/guardian/limiter.rs @@ -4,6 +4,7 @@ use super::GuardianError::InvalidInputs; use super::GuardianError::RateLimitExceeded; use super::GuardianResult; +use serde::Deserialize; use serde::Serialize; use std::collections::HashMap; @@ -14,7 +15,7 @@ use std::collections::HashMap; pub const SOFT_RESERVE_TTL_SECS: u64 = 5 * 60; /// Immutable configuration for the token bucket rate limiter. -#[derive(Debug, Copy, Clone, PartialEq, Serialize)] +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct LimiterConfig { /// Refill rate in sats per second. pub refill_rate: u64, @@ -24,7 +25,7 @@ pub struct LimiterConfig { /// Serializable state for the token bucket rate limiter. /// Provisioners provide this when initializing the enclave. -#[derive(Debug, Copy, Clone, PartialEq, Serialize)] +#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct LimiterState { /// Available tokens in sats. pub num_tokens_available: u64, diff --git a/crates/hashi-types/src/guardian/mod.rs b/crates/hashi-types/src/guardian/mod.rs index 144b5e13f..bf08db2e5 100644 --- a/crates/hashi-types/src/guardian/mod.rs +++ b/crates/hashi-types/src/guardian/mod.rs @@ -57,7 +57,12 @@ use std::time::Duration; /// /// These are public so that external verifiers/monitors can apply the same expectations. pub const S3_OBJECT_LOCK_DURATION_INIT: Duration = Duration::from_secs(5 * 60); -pub const S3_OBJECT_LOCK_DURATION_WITHDRAW: Duration = Duration::from_secs(5 * 60); +/// Withdrawal logs are the authoritative state snapshot used by the next +/// session's `ProvisionerInit` to rehydrate the rate limiter and idempotency +/// cache. 60 minutes gives the KP a comfortable window to read the prior +/// session's tail across a guardian restart without racing Object Lock +/// expiry; longer than heartbeat retention on purpose. +pub const S3_OBJECT_LOCK_DURATION_WITHDRAW: Duration = Duration::from_secs(60 * 60); pub const S3_OBJECT_LOCK_DURATION_HEARTBEAT: Duration = Duration::from_secs(5 * 60); /// S3 sub-prefixes used for guardian log streams. @@ -268,6 +273,20 @@ pub enum WithdrawalLogMessage { request_data: StandardWithdrawalRequestWire, request_sign: CommitteeSignature, response: StandardWithdrawalResponse, + /// Committed rate-limiter state AFTER this withdrawal was applied. + /// Self-contained snapshot per log record — a key provisioner reading + /// the latest successful log reconstructs the authoritative limiter + /// state for the next session's `ProvisionerInit` without replaying + /// from genesis. `#[serde(default)]` keeps us compatible with logs + /// written before this field existed. + /// + /// The `response` field above is NOT re-signed per session; on + /// rehydration the new session re-signs it with its own Ed25519 + /// signing key. The LogRecord's own signature attests to the + /// `response` data's provenance, so external verifiers don't need + /// a separately-signed envelope here. + #[serde(default)] + limiter_state_post: Option, }, /// Immediate withdraw failure Failure { @@ -894,6 +913,7 @@ mod tests { request_data: request_data.into(), request_sign, response: GuardianSigned::::mock_for_testing().data, + limiter_state_post: None, })), &signing_key, );