Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 150 additions & 0 deletions crates/hashi-guardian/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,26 @@ use hashi_types::guardian::crypto::combine_shares;
use hashi_types::guardian::crypto::commit_share;
use hashi_types::guardian::crypto::decrypt_share;
use hashi_types::guardian::crypto::Share;
use hashi_types::guardian::s3_utils::S3HourScopedDirectory;
use hashi_types::guardian::time_utils::now_timestamp_secs;
use hashi_types::guardian::InitLogMessage::OIAttestationUnsigned;
use hashi_types::guardian::InitLogMessage::OIGuardianInfo;
use hashi_types::guardian::InitLogMessage::PIEnclaveFullyInitialized;
use hashi_types::guardian::InitLogMessage::PISuccess;
use hashi_types::guardian::*;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
use GuardianError::*;

/// How many hour-scoped directories back we scan when rehydrating the
/// wid-keyed response cache from prior-session withdrawal logs. Matches
/// the Object Lock retention window for withdrawal logs (see
/// `S3_OBJECT_LOCK_DURATION_WITHDRAW`) so we never try to read a
/// directory whose entries have expired out from under us.
const REHYDRATE_LOG_HOURS: u64 = 2;

/// Receives S3 API keys & share commitments.
/// Returns an error for malformed requests / dup call & panics for the rest.
pub async fn operator_init(
Expand Down Expand Up @@ -178,6 +189,22 @@ pub async fn provisioner_init(
if current_share_count >= THRESHOLD {
let shares_vec: Vec<Share> = received_shares.iter().cloned().collect();
finalize_init(&shares_vec, &enclave, request.into_state()).await;

// Rehydrate the wid-keyed response cache from prior-session
// withdrawal logs BEFORE we start serving withdrawals. Retries
// of prior-session wids will hit the cache and return the
// (re-signed) response instead of going through consume again
// — preventing double debits across a guardian restart. The
// KP-side rehydration of `LimiterState` already guarantees
// `next_seq` is correct; this closes the remaining gap.
if let Err(e) = rehydrate_response_cache(&enclave).await {
warn!(
error = %e,
"Response cache rehydration failed; proceeding with empty cache. \
Retries of prior-session withdrawals may be rejected.",
);
}

// Log to S3 indicating that withdrawals can be expected henceforth
enclave
.log_init(PIEnclaveFullyInitialized)
Expand All @@ -194,6 +221,129 @@ pub async fn provisioner_init(
Ok(())
}

/// Read prior-session withdrawal success logs from S3 and repopulate the
/// current session's wid-keyed response cache.
///
/// Each cached entry is re-signed with the new session's Ed25519 key,
/// so clients that fetch the current guardian's signing pubkey and
/// verify responses will accept them. Errors are non-fatal — a cache
/// miss at most causes a double-debit on retry, which is the same
/// behavior we had before this function existed.
async fn rehydrate_response_cache(enclave: &Arc<Enclave>) -> GuardianResult<()> {
let logger = enclave.config.s3_logger()?;
let current_session_id = enclave.s3_session_id();
let start_secs = now_timestamp_secs().saturating_sub(REHYDRATE_LOG_HOURS * 60 * 60);

// Resolve the signing pubkey of each prior session we will read logs
// from. We ignore sessions whose attestation can't be fetched —
// logs we can't verify get skipped rather than trusted blindly.
let prior_session_pubkeys =
resolve_prior_session_pubkeys(logger, &current_session_id, start_secs).await?;
if prior_session_pubkeys.is_empty() {
info!("No prior sessions detected for response-cache rehydration");
return Ok(());
}

let mut rehydrated = 0usize;
for hour in 0..=REHYDRATE_LOG_HOURS {
let dir = S3HourScopedDirectory::new(S3_DIR_WITHDRAW, start_secs + hour * 60 * 60);
let logs = match logger.list_all_objects_in_dir::<LogRecord>(&dir).await {
Ok(logs) => logs,
Err(e) => {
warn!(
dir = %dir,
error = %e,
"Skipping withdrawal log directory during cache rehydration",
);
continue;
}
};
for log in logs {
let Some(pubkey) = prior_session_pubkeys.get(&log.session_id) else {
continue;
};
let verified = match log.verify(pubkey) {
Ok(v) => v,
Err(e) => {
warn!(
session_id = %pubkey_session(pubkey),
error = ?e,
"Skipping unverifiable withdrawal log during cache rehydration",
);
continue;
}
};
let LogMessage::Withdrawal(withdrawal_message) = verified.message else {
continue;
};
let WithdrawalLogMessage::Success {
request_data,
response,
..
} = *withdrawal_message
else {
continue;
};
// Re-sign with the CURRENT session's key so clients that
// verify against the new guardian pubkey accept the response.
let signed = enclave.sign(response);
enclave.state.cache_response(request_data.wid, signed);
rehydrated += 1;
}
}

info!(
rehydrated,
"Rehydrated wid-keyed response cache from prior-session logs"
);
Ok(())
}

/// For every prior session whose heartbeats we can see in the recent
/// window, fetch its OperatorInit attestation and return its Ed25519
/// signing pubkey. Sessions whose attestation is unavailable are
/// silently skipped (we'd be unable to verify their logs anyway).
async fn resolve_prior_session_pubkeys(
logger: &S3Logger,
current_session_id: &str,
start_secs: u64,
) -> GuardianResult<HashMap<String, GuardianPubKey>> {
let mut session_ids = std::collections::HashSet::new();
for hour in 0..=REHYDRATE_LOG_HOURS {
let dir = S3HourScopedDirectory::new(S3_DIR_HEARTBEAT, start_secs + hour * 60 * 60);
let logs = match logger.list_all_objects_in_dir::<LogRecord>(&dir).await {
Ok(logs) => logs,
Err(_) => continue,
};
for log in logs {
if log.session_id != current_session_id {
session_ids.insert(log.session_id);
}
}
}

let mut out = HashMap::new();
for session_id in session_ids {
match logger.get_attestation(&session_id).await {
Ok((_attestation, pubkey)) => {
out.insert(session_id, pubkey);
}
Err(e) => {
warn!(
session_id,
error = %e,
"Could not resolve prior session signing pubkey; logs from this session will be skipped",
);
}
}
}
Ok(out)
}

fn pubkey_session(pubkey: &GuardianPubKey) -> String {
session_id_from_signing_pubkey(pubkey)
}

/// Finalize the initialization process.
/// Panics upon an error as the enclaves state is irrecoverable at this point.
async fn finalize_init(
Expand Down
9 changes: 8 additions & 1 deletion crates/hashi-guardian/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;
/// Mock S3 logger for use in API calls post operator_init,
/// e.g., provisioner_init, withdrawals.
pub fn mock_logger() -> S3Logger {
use aws_sdk_s3::operation::list_object_versions::ListObjectVersionsOutput;
use aws_sdk_s3::operation::put_object::PutObjectOutput;
use aws_sdk_s3::Client;
use aws_smithy_mocks::mock;
Expand All @@ -35,7 +36,13 @@ pub fn mock_logger() -> S3Logger {
// The `then_output` helper creates a "simple" rule that repeats indefinitely.
let put_ok = mock!(Client::put_object).then_output(|| PutObjectOutput::builder().build());

let client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[&put_ok]);
// `provisioner_init` runs a post-finalize rehydration pass that lists
// prior-session withdrawal logs. Unit tests start against a fresh
// bucket (no prior sessions) — return an empty page.
let list_empty = mock!(Client::list_object_versions)
.then_output(|| ListObjectVersionsOutput::builder().build());

let client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[&put_ok, &list_empty]);

let config = S3Config::mock_for_testing();

Expand Down
13 changes: 13 additions & 0 deletions crates/hashi-guardian/src/withdraw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,17 @@ pub async fn standard_withdrawal(
match normal_withdrawal_inner(enclave.clone(), signed_request).await {
Ok((txid, response, limiter_guard)) => {
info!("Withdrawal {} processed successfully. Logging to S3.", wid);
// Snapshot the post-consume limiter state into the log so a
// future session can rehydrate the rate limiter from S3 alone.
// The LogRecord's signature attests to `response`, which is
// what future sessions re-sign when rehydrating the wid cache.
let limiter_state_post = limiter_guard.limiter_state();
let msg = WithdrawalLogMessage::Success {
txid,
request_data: unsigned_request,
request_sign: request_signature,
response: response.clone(),
limiter_state_post: Some(limiter_state_post),
};
log_withdrawal_success(enclave.as_ref(), wid, msg, limiter_guard).await?;
let signed_response = enclave.sign(response);
Expand Down Expand Up @@ -205,6 +211,13 @@ impl LimiterGuard {
}
}

/// Snapshot of the limiter state AFTER the successful consume this guard
/// wraps. Persisted into the withdrawal log so a subsequent session can
/// rehydrate the bucket state on boot.
pub fn limiter_state(&self) -> hashi_types::guardian::LimiterState {
*self.guard.state()
}

/// Mark this withdrawal as successful. Prevents revert on drop.
pub fn commit(mut self) {
self.committed = true;
Expand Down
14 changes: 12 additions & 2 deletions crates/hashi-monitor/src/kp/heartbeat_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ pub struct GuardianSessionInfo {
///
/// Returns the selected live session id if all invariants pass.
pub async fn kp_heartbeat_audit(s3_client: &S3Logger) -> anyhow::Result<String> {
let recent_heartbeats = read_recent_heartbeats(s3_client).await?;
let summary = summarize_heartbeats_by_session(recent_heartbeats)?;
let summary = collect_recent_sessions(s3_client).await?;
let now = now_unix_seconds();
select_live_session(
&summary,
Expand All @@ -52,6 +51,17 @@ pub async fn kp_heartbeat_audit(s3_client: &S3Logger) -> anyhow::Result<String>
)
}

/// Return all guardian sessions observed via heartbeats in the last ~2 hours
/// with their first/last heartbeat timestamps. Used by restart rehydration
/// to enumerate prior sessions whose withdrawal logs contribute to the
/// authoritative LimiterState.
pub async fn collect_recent_sessions(
s3_client: &S3Logger,
) -> anyhow::Result<Vec<GuardianSessionInfo>> {
let recent_heartbeats = read_recent_heartbeats(s3_client).await?;
summarize_heartbeats_by_session(recent_heartbeats)
}

async fn read_recent_heartbeats(s3_client: &S3Logger) -> anyhow::Result<Vec<VerifiedLogRecord>> {
// Read from the current and next hour-scoped prefixes to cover clock-boundary cases.
let one_hour_ago = now_unix_seconds().saturating_sub(60 * 60);
Expand Down
Loading
Loading