From 87b43d68ea3e55954b332aeb764076078677202b Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 09:54:59 -0700 Subject: [PATCH 1/5] [guardian] post-MPC guardian RPC for enclave signature Aggregates a BLS cert over a `StandardWithdrawalRequest` after MPC signing quorum and forwards it to the guardian for the enclave signature. The local limiter has already been advanced by every committee member at MPC signing time (see PR #495), so this just records the consume with the authoritative guardian. - Proto: `SignGuardianWithdrawalRequest` message + RPC - bridge_service: `sign_guardian_withdrawal_request` handler - guardian_client: `standard_withdrawal` RPC - withdrawals: `validate_and_sign_guardian_withdrawal_request`, `build_guardian_withdrawal_request`, `compute_withdrawal_wid` - leader: post-MPC `finalize_withdrawal_through_guardian` + BLS fan-out helpers; `max_concurrent=1` when guardian is configured - guardian/limiter, hashi-guardian/{enclave,withdraw}: `wid` parameter for the guardian-side idempotency cache - e2e: assert local_state == guardian_state after withdrawal --- crates/e2e-tests/src/e2e_flow.rs | 19 ++ crates/hashi-guardian/src/enclave.rs | 3 +- crates/hashi-guardian/src/withdraw.rs | 2 + .../sui/hashi/v1alpha/bridge_service.proto | 21 ++ crates/hashi-types/src/guardian/limiter.rs | 27 ++- .../proto/generated/sui.hashi.v1alpha.fds.bin | Bin 14531 -> 14942 bytes .../src/proto/generated/sui.hashi.v1alpha.rs | 114 +++++++++++ crates/hashi/src/grpc/bridge_service.rs | 39 ++++ crates/hashi/src/grpc/guardian_client.rs | 11 + crates/hashi/src/leader/mod.rs | 192 +++++++++++++++++- crates/hashi/src/withdrawals.rs | 111 ++++++++++ 11 files changed, 524 insertions(+), 15 deletions(-) diff --git a/crates/e2e-tests/src/e2e_flow.rs b/crates/e2e-tests/src/e2e_flow.rs index 9a658692e..f3522cf8f 100644 --- a/crates/e2e-tests/src/e2e_flow.rs +++ b/crates/e2e-tests/src/e2e_flow.rs @@ -365,6 +365,25 @@ mod tests { ) .await?; + if with_guardian { + let guardian_state = networks + .guardian_harness + .as_ref() + .expect("harness present when .with_guardian() is set") + .enclave() + .state + .limiter_state() + .await + .expect("guardian limiter state present after a successful withdrawal"); + assert_eq!(guardian_state.next_seq, 1); + let local_state = hashi + .local_limiter() + .expect("local limiter present after bootstrap") + .snapshot() + .await; + assert_eq!(local_state, guardian_state); + } + info!("=== Bitcoin Withdrawal E2E Test{label} Passed ==="); Ok(()) } diff --git a/crates/hashi-guardian/src/enclave.rs b/crates/hashi-guardian/src/enclave.rs index 0ea84a51d..49ea09c25 100644 --- a/crates/hashi-guardian/src/enclave.rs +++ b/crates/hashi-guardian/src/enclave.rs @@ -310,6 +310,7 @@ impl EnclaveState { pub async fn consume_from_limiter( &self, + wid: u64, seq: u64, timestamp: u64, amount_sats: u64, @@ -324,7 +325,7 @@ impl EnclaveState { ) .await .map_err(|_| InvalidInputs("timed out waiting for rate limiter lock".into()))?; - guard.consume(seq, timestamp, amount_sats)?; + guard.consume(wid, seq, timestamp, amount_sats)?; Ok(LimiterGuard::new(guard)) } diff --git a/crates/hashi-guardian/src/withdraw.rs b/crates/hashi-guardian/src/withdraw.rs index 3bbcad79a..bb881a3af 100644 --- a/crates/hashi-guardian/src/withdraw.rs +++ b/crates/hashi-guardian/src/withdraw.rs @@ -99,9 +99,11 @@ async fn normal_withdrawal_inner( info!("Checking rate limits."); let consumed_amount_sats = request.utxos().external_out_amount().to_sat(); + let wid = *request.wid(); let limiter_guard = enclave .state .consume_from_limiter( + wid, request.seq(), request.timestamp_secs(), consumed_amount_sats, diff --git a/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto b/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto index 95b506611..affa4054b 100644 --- a/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto +++ b/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto @@ -18,6 +18,9 @@ service BridgeService { rpc SignWithdrawalTxConstruction(SignWithdrawalTxConstructionRequest) returns (SignWithdrawalTxConstructionResponse); // Step 2b: Sign a bitcoin withdrawal transaction (MPC Schnorr). rpc SignWithdrawalTransaction(SignWithdrawalTransactionRequest) returns (SignWithdrawalTransactionResponse); + // Sign a guardian rate-limiting request so the leader can aggregate a + // committee certificate and forward it to the guardian after MPC quorum. + rpc SignGuardianWithdrawalRequest(SignGuardianWithdrawalRequestRequest) returns (SignGuardianWithdrawalRequestResponse); // Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. rpc SignWithdrawalTxSigning(SignWithdrawalTxSigningRequest) returns (SignWithdrawalTxSigningResponse); // Step 4: Sign committee approval to confirm a processed withdrawal on-chain. @@ -165,3 +168,21 @@ message SignWithdrawalConfirmationRequest { message SignWithdrawalConfirmationResponse { MemberSignature member_signature = 1; } + +// The leader sends the withdrawal-transaction id along with the guardian-specific +// fields (timestamp, seq) so each validator can independently reconstruct and +// BLS-sign the same `StandardWithdrawalRequest`. +message SignGuardianWithdrawalRequestRequest { + // The id of the WithdrawalTransaction on Sui (32 bytes). + bytes withdrawal_txn_id = 1; + + // Timestamp in unix seconds (used for guardian rate limiting). + uint64 timestamp_secs = 2; + + // Monotonic sequence number (used by guardian for replay prevention). + uint64 seq = 3; +} + +message SignGuardianWithdrawalRequestResponse { + MemberSignature member_signature = 1; +} diff --git a/crates/hashi-types/src/guardian/limiter.rs b/crates/hashi-types/src/guardian/limiter.rs index 9dc3f88fb..927b4afa2 100644 --- a/crates/hashi-types/src/guardian/limiter.rs +++ b/crates/hashi-types/src/guardian/limiter.rs @@ -66,7 +66,13 @@ impl RateLimiter { /// Consume tokens from the bucket. Validates seq and timestamp ordering, /// refills based on elapsed time, then debits the requested amount. - pub fn consume(&mut self, seq: u64, timestamp: u64, amount_sats: u64) -> GuardianResult<()> { + pub fn consume( + &mut self, + _wid: u64, + seq: u64, + timestamp: u64, + amount_sats: u64, + ) -> GuardianResult<()> { if seq != self.state.next_seq { return Err(InvalidInputs(format!( "seq mismatch: expected {}, got {}", @@ -95,7 +101,6 @@ impl RateLimiter { return Err(RateLimitExceeded); } - // Snapshot for revert, then mutate. self.prev_state = self.state; self.state.last_updated_at = timestamp; self.state.num_tokens_available = capacity - amount_sats; @@ -131,18 +136,18 @@ mod test { fn test_basic() { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); - assert!(limiter.consume(0, 1, config.refill_rate).is_ok()); + assert!(limiter.consume(1, 0, 1, config.refill_rate).is_ok()); let target_amount = 1_000_000u64; let num_secs_required = target_amount.div_ceil(config.refill_rate); assert!( limiter - .consume(1, num_secs_required, target_amount) + .consume(2, 1, num_secs_required, target_amount) .is_err() ); assert!( limiter - .consume(1, 1 + num_secs_required, target_amount) + .consume(2, 1, 1 + num_secs_required, target_amount) .is_ok() ); } @@ -153,12 +158,12 @@ mod test { let mut limiter = RateLimiter::new(config, state).unwrap(); assert!( limiter - .consume(0, u64::MAX, config.max_bucket_capacity + 1) + .consume(1, 0, u64::MAX, config.max_bucket_capacity + 1) .is_err() ); assert!( limiter - .consume(0, u64::MAX, config.max_bucket_capacity) + .consume(1, 0, u64::MAX, config.max_bucket_capacity) .is_ok() ); } @@ -168,7 +173,7 @@ mod test { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); // Consume after refill, then revert — should restore original state. - limiter.consume(0, 100, 50_000).unwrap(); + limiter.consume(1, 0, 100, 50_000).unwrap(); assert_eq!(limiter.state().num_tokens_available, 50_000); // 100*1000 - 50_000 limiter.revert(); assert_eq!(limiter.state().num_tokens_available, 0); @@ -181,10 +186,10 @@ mod test { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); // Wrong seq. - assert!(limiter.consume(1, 0, 0).is_err()); + assert!(limiter.consume(1, 1, 0, 0).is_err()); // Advance state. - limiter.consume(0, 100, 1_000).unwrap(); + limiter.consume(1, 0, 100, 1_000).unwrap(); // Old timestamp. - assert!(limiter.consume(1, 50, 1_000).is_err()); + assert!(limiter.consume(2, 1, 50, 1_000).is_err()); } } diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin index 2407c17bab0a6b455184383479407eebab1c6e61..f5332db7d856e4c976fdc7e8b2a296120fffdb41 100644 GIT binary patch delta 264 zcmX?Hc&~(+>%Z(q=7pR}O5KcHD#4lQdG4i&MJbtydEuEQ87W1H<%u~#sfDGf#U+#f za2Q7ksdDj^Waa|7iMa*w#i_}~5=;t=8Z1G)aM56(ppXC;b8%{+1T$C;#8fKdQpIn) vFJ}bPS&q%iIae_$Ph#Yf#j9G#eDXpbaV4CNg}PbF0;Eh5r?Sl+Jk>e?Q^#F) delta 26 icmcata=4J0>xtAx=7pThMmyLyujN|BwD}Ehkq!WmlL|xt diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs index 8e465f6b4..e1c540cef 100644 --- a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs +++ b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs @@ -155,6 +155,26 @@ pub struct SignWithdrawalConfirmationResponse { #[prost(message, optional, tag = "1")] pub member_signature: ::core::option::Option, } +/// The leader sends the withdrawal-transaction id along with the guardian-specific +/// fields (timestamp, seq) so each validator can independently reconstruct and +/// BLS-sign the same `StandardWithdrawalRequest`. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SignGuardianWithdrawalRequestRequest { + /// The id of the WithdrawalTransaction on Sui (32 bytes). + #[prost(bytes = "bytes", tag = "1")] + pub withdrawal_txn_id: ::prost::bytes::Bytes, + /// Timestamp in unix seconds (used for guardian rate limiting). + #[prost(uint64, tag = "2")] + pub timestamp_secs: u64, + /// Monotonic sequence number (used by guardian for replay prevention). + #[prost(uint64, tag = "3")] + pub seq: u64, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SignGuardianWithdrawalRequestResponse { + #[prost(message, optional, tag = "1")] + pub member_signature: ::core::option::Option, +} /// Generated client implementations. pub mod bridge_service_client { #![allow( @@ -394,6 +414,37 @@ pub mod bridge_service_client { ); self.inner.unary(req, path, codec).await } + /// Sign a guardian rate-limiting request so the leader can aggregate a + /// committee certificate and forward it to the guardian after MPC quorum. + pub async fn sign_guardian_withdrawal_request( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.hashi.v1alpha.BridgeService/SignGuardianWithdrawalRequest", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "sui.hashi.v1alpha.BridgeService", + "SignGuardianWithdrawalRequest", + ), + ); + self.inner.unary(req, path, codec).await + } /// Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. pub async fn sign_withdrawal_tx_signing( &mut self, @@ -510,6 +561,15 @@ pub mod bridge_service_server { tonic::Response, tonic::Status, >; + /// Sign a guardian rate-limiting request so the leader can aggregate a + /// committee certificate and forward it to the guardian after MPC quorum. + async fn sign_guardian_withdrawal_request( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. async fn sign_withdrawal_tx_signing( &self, @@ -858,6 +918,60 @@ pub mod bridge_service_server { }; Box::pin(fut) } + "/sui.hashi.v1alpha.BridgeService/SignGuardianWithdrawalRequest" => { + #[allow(non_camel_case_types)] + struct SignGuardianWithdrawalRequestSvc( + pub Arc, + ); + impl< + T: BridgeService, + > tonic::server::UnaryService< + super::SignGuardianWithdrawalRequestRequest, + > for SignGuardianWithdrawalRequestSvc { + type Response = super::SignGuardianWithdrawalRequestResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::SignGuardianWithdrawalRequestRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::sign_guardian_withdrawal_request( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SignGuardianWithdrawalRequestSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/sui.hashi.v1alpha.BridgeService/SignWithdrawalTxSigning" => { #[allow(non_camel_case_types)] struct SignWithdrawalTxSigningSvc(pub Arc); diff --git a/crates/hashi/src/grpc/bridge_service.rs b/crates/hashi/src/grpc/bridge_service.rs index 762658edb..be6113ab3 100644 --- a/crates/hashi/src/grpc/bridge_service.rs +++ b/crates/hashi/src/grpc/bridge_service.rs @@ -19,6 +19,8 @@ use hashi_types::proto::GetServiceInfoRequest; use hashi_types::proto::GetServiceInfoResponse; use hashi_types::proto::SignDepositConfirmationRequest; use hashi_types::proto::SignDepositConfirmationResponse; +use hashi_types::proto::SignGuardianWithdrawalRequestRequest; +use hashi_types::proto::SignGuardianWithdrawalRequestResponse; use hashi_types::proto::SignWithdrawalConfirmationRequest; use hashi_types::proto::SignWithdrawalConfirmationResponse; use hashi_types::proto::SignWithdrawalRequestApprovalRequest; @@ -131,6 +133,43 @@ impl BridgeService for HttpService { })) } + /// Validate and BLS-sign a `StandardWithdrawalRequest` for the guardian. + #[tracing::instrument( + level = "info", + skip_all, + fields(withdrawal_txn_id = tracing::field::Empty, caller = tracing::field::Empty), + )] + async fn sign_guardian_withdrawal_request( + &self, + request: Request, + ) -> Result, Status> { + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); + let req = request.get_ref(); + let withdrawal_txn_id = Address::from_bytes(&req.withdrawal_txn_id) + .map_err(|e| Status::invalid_argument(format!("invalid withdrawal_txn_id: {e}")))?; + tracing::Span::current().record( + "withdrawal_txn_id", + tracing::field::display(&withdrawal_txn_id), + ); + let member_signature = self + .inner + .validate_and_sign_guardian_withdrawal_request( + &withdrawal_txn_id, + req.timestamp_secs, + req.seq, + ) + .map_err(|e| Status::failed_precondition(e.to_string()))?; + tracing::info!( + seq = req.seq, + timestamp_secs = req.timestamp_secs, + "Signed guardian withdrawal request", + ); + Ok(Response::new(SignGuardianWithdrawalRequestResponse { + member_signature: Some(member_signature), + })) + } + #[tracing::instrument( level = "info", skip_all, diff --git a/crates/hashi/src/grpc/guardian_client.rs b/crates/hashi/src/grpc/guardian_client.rs index a640f80ec..1d4c3f829 100644 --- a/crates/hashi/src/grpc/guardian_client.rs +++ b/crates/hashi/src/grpc/guardian_client.rs @@ -47,4 +47,15 @@ impl GuardianClient { .await?; Ok(response.into_inner()) } + + pub async fn standard_withdrawal( + &self, + request: hashi_types::proto::SignedStandardWithdrawalRequest, + ) -> Result { + let response = self + .guardian_service_client() + .standard_withdrawal(request) + .await?; + Ok(response.into_inner()) + } } diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index 11c9cd1d4..c9cc53208 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -35,6 +35,7 @@ use hashi_types::committee::MemberSignature; use hashi_types::committee::certificate_threshold; use hashi_types::guardian::bitcoin_utils; use hashi_types::proto::SignDepositConfirmationRequest; +use hashi_types::proto::SignGuardianWithdrawalRequestRequest; use hashi_types::proto::SignWithdrawalConfirmationRequest; use hashi_types::proto::SignWithdrawalRequestApprovalRequest; use hashi_types::proto::SignWithdrawalTransactionRequest; @@ -1111,7 +1112,13 @@ impl LeaderService { self.inflight_withdrawal_signings .retain(|id| pending_ids.contains(id)); - let max_concurrent = self.inner.config.max_concurrent_leader_job_tasks(); + // Guardian rejects out-of-order `timestamp_ms`, so serialize + // hard-reserves to one in-flight when guardian is configured. + let max_concurrent = if self.inner.guardian_client().is_some() { + 1 + } else { + self.inner.config.max_concurrent_leader_job_tasks() + }; for txn in withdrawal_txns { if self.withdrawal_signing_tasks.len() >= max_concurrent { break; @@ -1211,7 +1218,23 @@ impl LeaderService { .map(|s| s.to_byte_array().to_vec()) .collect(); - // 3. Build the WithdrawalTxSigning and get BLS certificate via fan-out + // 3. Post-MPC guardian RPC for the enclave signature. Each node has + // already advanced its local limiter at MPC signing time; this call + // records the consume with the authoritative guardian. + if let (Some(guardian), Some(seq)) = (inner.guardian_client(), expected_limiter_seq) { + let timestamp_secs = txn.timestamp_ms / 1000; + Self::finalize_withdrawal_through_guardian( + &inner, + &txn, + &members, + guardian, + timestamp_secs, + seq, + ) + .await?; + } + + // 4. Build the WithdrawalTxSigning and get BLS certificate via fan-out let signed_message = WithdrawalTxSigning { withdrawal_id: txn.id, request_ids: txn.request_ids.clone(), @@ -1255,7 +1278,7 @@ impl LeaderService { let signed = aggregator.finish()?; - // 4. Submit sign_withdrawal to Sui (writes signatures on-chain). + // 5. Submit sign_withdrawal to Sui (writes signatures on-chain). // Broadcast + confirm happens via process_signed_withdrawal_txns on the next tick. Self::submit_sign_withdrawal( &inner, @@ -1876,6 +1899,169 @@ impl LeaderService { info!("Successfully confirmed withdrawal {:?}", withdrawal_txn_id); Ok(()) } + + // ======================================================================== + // Guardian: post-MPC enclave-signature RPC + // ======================================================================== + + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id, seq))] + async fn finalize_withdrawal_through_guardian( + inner: &Arc, + txn: &WithdrawalTransaction, + members: &[CommitteeMember], + guardian: &crate::grpc::guardian_client::GuardianClient, + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result<()> { + let signed_request = + Self::collect_guardian_withdrawal_signatures(inner, txn, members, timestamp_secs, seq) + .await?; + + let proto_request = + hashi_types::guardian::proto_conversions::signed_standard_withdrawal_request_to_pb( + &signed_request, + ); + + match guardian.standard_withdrawal(proto_request).await { + Ok(response_pb) => { + if let Some(pubkey) = inner.guardian_signing_pubkey() { + let signed_response = hashi_types::guardian::GuardianSigned::< + hashi_types::guardian::StandardWithdrawalResponse, + >::try_from(response_pb) + .map_err(|e| { + anyhow::anyhow!("Failed to parse guardian withdrawal response: {e}") + })?; + signed_response.verify(pubkey).map_err(|e| { + anyhow::anyhow!("Guardian response signature verification failed: {e:?}") + })?; + info!(seq, "Guardian approved withdrawal"); + } else { + warn!( + seq, + "Guardian approved but signing pubkey unavailable; \ + skipping response verification" + ); + } + Ok(()) + } + Err(status) => { + let label = if status.message().contains("seq mismatch") { + "GuardianSeqMismatch" + } else if status.message().contains("Rate limit exceeded") { + warn!("Guardian rate-limited withdrawal, will retry later"); + "GuardianRateLimited" + } else { + error!("Guardian call failed: {}", status.message()); + "GuardianUnavailable" + }; + inner + .metrics + .leader_retries_total + .with_label_values(&["withdrawal_signing", label]) + .inc(); + anyhow::bail!("Guardian rejected withdrawal: {}", status.message()) + } + } + } + + async fn collect_guardian_withdrawal_signatures( + inner: &Arc, + txn: &WithdrawalTransaction, + members: &[CommitteeMember], + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result< + hashi_types::committee::SignedMessage, + > { + let guardian_request = + crate::withdrawals::build_guardian_withdrawal_request(inner, txn, timestamp_secs, seq)?; + + let committee = inner + .onchain_state() + .current_committee() + .expect("No current committee"); + let required_weight = certificate_threshold(committee.total_weight()); + + let proto_request = SignGuardianWithdrawalRequestRequest { + withdrawal_txn_id: txn.id.as_bytes().to_vec().into(), + timestamp_secs, + seq, + }; + + let mut sig_tasks = JoinSet::new(); + for member in members { + let inner = inner.clone(); + let proto_request = proto_request.clone(); + let member = member.clone(); + sig_tasks.spawn(async move { + Self::request_guardian_withdrawal_signature(&inner, proto_request, &member).await + }); + } + + let mut aggregator = BlsSignatureAggregator::new(&committee, guardian_request); + while let Some(result) = sig_tasks.join_next().await { + let Ok(Some(sig)) = result else { continue }; + if let Err(e) = aggregator.add_signature(sig) { + error!( + withdrawal_txn_id = %txn.id, + "Failed to add guardian withdrawal signature: {e}" + ); + } + if aggregator.weight() >= required_weight { + break; + } + } + + let weight = aggregator.weight(); + if weight < required_weight { + anyhow::bail!( + "Insufficient guardian withdrawal signatures: weight {weight} < {required_weight}" + ); + } + + Ok(aggregator.finish()?) + } + + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] + async fn request_guardian_withdrawal_signature( + inner: &Arc, + proto_request: SignGuardianWithdrawalRequestRequest, + member: &CommitteeMember, + ) -> Option { + let validator_address = member.validator_address(); + trace!("Requesting guardian withdrawal signature"); + + let mut rpc_client = inner + .onchain_state() + .bridge_service_client(&validator_address) + .or_else(|| { + error!( + "Cannot find client for validator address: {:?}", + validator_address + ); + None + })?; + + let response = rpc_client + .sign_guardian_withdrawal_request(proto_request.clone()) + .await + .inspect_err(|e| { + error!("Failed to get guardian withdrawal signature from {validator_address}: {e}"); + }) + .ok()?; + + response + .into_inner() + .member_signature + .ok_or_else(|| anyhow::anyhow!("No member_signature in response")) + .and_then(parse_member_signature) + .inspect_err(|e| { + error!( + "Failed to parse guardian withdrawal member signature from {validator_address}: {e}" + ); + }) + .ok() + } } fn deposit_request_to_proto(req: &DepositRequest) -> SignDepositConfirmationRequest { diff --git a/crates/hashi/src/withdrawals.rs b/crates/hashi/src/withdrawals.rs index 697ca2ab0..d47b51d98 100644 --- a/crates/hashi/src/withdrawals.rs +++ b/crates/hashi/src/withdrawals.rs @@ -393,6 +393,27 @@ impl Hashi { self.sign_message_proto(&confirmation) } + // --- Guardian: validate and BLS-sign a `StandardWithdrawalRequest` --- + + #[tracing::instrument(level = "info", skip_all, fields(%withdrawal_txn_id, seq))] + pub fn validate_and_sign_guardian_withdrawal_request( + &self, + withdrawal_txn_id: &Address, + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result { + let txn = self + .onchain_state() + .withdrawal_txn(withdrawal_txn_id) + .ok_or_else(|| { + anyhow!("WithdrawalTransaction {withdrawal_txn_id} not found on-chain") + })?; + + let guardian_request = build_guardian_withdrawal_request(self, &txn, timestamp_secs, seq)?; + + self.sign_message_proto(&guardian_request) + } + // --- Step 3: Sign withdrawal (store witness signatures on-chain) --- #[tracing::instrument(level = "info", skip_all, fields(withdrawal_id = %message.withdrawal_id))] @@ -1239,3 +1260,93 @@ fn script_pubkey_from_raw_address(address_bytes: &[u8]) -> anyhow::Result anyhow::Result { + use hashi_types::guardian::bitcoin_utils::InputUTXO; + use hashi_types::guardian::bitcoin_utils::OutputUTXO; + use hashi_types::guardian::bitcoin_utils::TxUTXOs; + + let hashi_pubkey = hashi.get_hashi_pubkey()?; + let network = hashi.config.bitcoin_network(); + + // Inputs: per-UTXO taproot script-path artifacts, keyed by deposit pubkey. + let inputs = txn + .inputs + .iter() + .map(|utxo| { + let pubkey = hashi.deposit_pubkey(&hashi_pubkey, utxo.derivation_path.as_ref())?; + let (_, _, leaf_hash) = + bitcoin_utils::single_key_taproot_script_path_spend_artifacts(&pubkey); + let address = hashi.bitcoin_address_from_pubkey(&pubkey); + + let outpoint = bitcoin::OutPoint { + txid: utxo.id.txid.into(), + vout: utxo.id.vout, + }; + + InputUTXO::new( + outpoint, + Amount::from_sat(utxo.amount), + address.into_unchecked(), + leaf_hash, + network, + ) + .map_err(|e| anyhow!("Failed to build guardian InputUTXO: {e}")) + }) + .collect::>>()?; + + // First N outputs are `External` payouts; any trailing output is the + // `Internal` change back to the hashi root. + let all_outputs = txn.all_outputs(); + let num_requests = txn.request_ids.len(); + let outputs = all_outputs + .iter() + .enumerate() + .map(|(i, output)| { + if i < num_requests { + let script_pubkey = script_pubkey_from_raw_address(&output.bitcoin_address)?; + let address = BitcoinAddress::from_script(&script_pubkey, network) + .map_err(|e| anyhow!("Cannot derive address from output script: {e}"))?; + OutputUTXO::new_external( + address.into_unchecked(), + Amount::from_sat(output.amount), + network, + ) + .map_err(|e| anyhow!("Failed to build guardian external OutputUTXO: {e}")) + } else { + let derivation_path = [0u8; 32]; + Ok(OutputUTXO::new_internal( + derivation_path, + Amount::from_sat(output.amount), + )) + } + }) + .collect::>>()?; + + let utxos = TxUTXOs::new(inputs, outputs) + .map_err(|e| anyhow!("Failed to build guardian TxUTXOs: {e}"))?; + + let wid = compute_withdrawal_wid(&txn.request_ids); + + Ok(hashi_types::guardian::StandardWithdrawalRequest::new( + wid, + utxos, + timestamp_secs, + seq, + )) +} + +/// Deterministic 64-bit id for a withdrawal — leading bytes of Blake2b256 over +/// the BCS-encoded request ids. Stable across restarts and leader rotations. +pub fn compute_withdrawal_wid(request_ids: &[Address]) -> u64 { + let bytes = bcs::to_bytes(&request_ids).expect("serialization should succeed"); + let hash = Blake2b256::digest(&bytes); + u64::from_le_bytes(hash.digest[..8].try_into().unwrap()) +} From 0a803c7a29e86f74800e88d2887de68945b83fc3 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 10:41:23 -0700 Subject: [PATCH 2/5] [guardian] capacity-aware batching at Step 2 Pre-filter the approved-withdrawal batch to the longest timestamp-sorted prefix whose cumulative `btc_amount` fits the local limiter's currently available capacity, and add a third "process now" trigger when that prefix is shorter than the full pending set. This short-circuits the batching window when accumulated demand has already filled the bucket and avoids broadcasting a Step 2 BLS round that #495's per-node MPC validation would just reject. A request larger than the currently-available capacity stays at the head of the queue (waiting for refill if possible, or stuck forever if it exceeds `max_bucket_capacity`). Warn once per request id so the operator notices a stuck head; the warn set is pruned each checkpoint to the still-pending ids. `process_approved_withdrawal_requests` becomes async since `LocalLimiter::capacity_at` holds a tokio mutex; the only call site is already in the leader's async loop. No-guardian deployments take the `else` branch and behave bit-for-bit as before. --- crates/hashi/src/leader/mod.rs | 64 +++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 9 deletions(-) diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index c9cc53208..0664dc78b 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -72,6 +72,9 @@ pub struct LeaderService { inflight_withdrawal_signings: HashSet
, withdrawal_broadcast_tasks: JoinSet<(Address, anyhow::Result<()>)>, inflight_withdrawal_broadcasts: HashSet
, + /// Withdrawal requests already warned about exceeding local limiter + /// capacity. Pruned each checkpoint to track only currently-pending ids. + stuck_withdrawal_warned: HashSet
, deposit_gc_task: Option>>, proposal_gc_task: Option>>, } @@ -92,6 +95,7 @@ impl LeaderService { inflight_withdrawal_signings: HashSet::new(), withdrawal_broadcast_tasks: JoinSet::new(), inflight_withdrawal_broadcasts: HashSet::new(), + stuck_withdrawal_warned: HashSet::new(), deposit_gc_task: None, proposal_gc_task: None, } @@ -141,7 +145,8 @@ impl LeaderService { } self.process_unapproved_withdrawal_requests(checkpoint_timestamp_ms); - self.process_approved_withdrawal_requests(checkpoint_timestamp_ms); + self.process_approved_withdrawal_requests(checkpoint_timestamp_ms) + .await; self.process_unsigned_withdrawal_txns(); self.process_signed_withdrawal_txns(); self.check_delete_proposals(checkpoint_timestamp_ms); @@ -852,7 +857,7 @@ impl LeaderService { // Step 2: Construct withdrawal tx for approved requests // ======================================================================== - fn process_approved_withdrawal_requests(&mut self, checkpoint_timestamp_ms: u64) { + async fn process_approved_withdrawal_requests(&mut self, checkpoint_timestamp_ms: u64) { debug!("Entering process_approved_withdrawal_requests"); if self.is_reconfiguring() { debug!("Reconfig in progress, skipping withdrawal commitment processing"); @@ -873,6 +878,12 @@ impl LeaderService { .collect(); approved.sort_by_key(|r| r.timestamp_ms); + // Drop stuck-warn entries for requests that are no longer pending, + // so a re-stuck request would warn again. + let pending_ids: HashSet
= approved.iter().map(|r| r.id).collect(); + self.stuck_withdrawal_warned + .retain(|id| pending_ids.contains(id)); + if self .withdrawal_commitment_retry_tracker .should_skip(checkpoint_timestamp_ms) @@ -888,11 +899,6 @@ impl LeaderService { return; } - // Collect all approved requests and process them as a single batch. - // The coin selection algorithm picks up to - // max_withdrawal_requests oldest requests from the slice. - let batch: Vec = approved.into_iter().collect(); - self.inner .metrics .leader_items_in_backoff @@ -902,10 +908,50 @@ impl LeaderService { .in_backoff_count(checkpoint_timestamp_ms) as i64, ); - if batch.is_empty() { + if approved.is_empty() { return; } + // Capacity-aware batching: take the longest timestamp-sorted prefix + // whose cumulative `btc_amount` fits the local limiter's currently + // available capacity. Anything dropped fires `at_capacity` as a + // third "process now" trigger so the leader doesn't sit on excess + // demand for the full batching window. + let (batch, at_capacity) = if let Some(limiter) = self.inner.local_limiter() { + let timestamp_secs = checkpoint_timestamp_ms / 1000; + let capacity = limiter.capacity_at(timestamp_secs).await; + let mut cumulative = 0u64; + let mut filtered: Vec = Vec::with_capacity(approved.len()); + for req in &approved { + let candidate = cumulative.saturating_add(req.btc_amount); + if candidate > capacity { + break; + } + cumulative = candidate; + filtered.push(req.clone()); + } + if filtered.is_empty() { + // Single request larger than the currently-available capacity. + // It may fit after refill, or it may exceed `max_bucket_capacity` + // entirely — either way, leave it queued and warn once so the + // operator notices a stuck head. + let stuck = &approved[0]; + if self.stuck_withdrawal_warned.insert(stuck.id) { + warn!( + request_id = %stuck.id, + btc_amount = stuck.btc_amount, + available = capacity, + "Withdrawal request exceeds local limiter capacity; staying in queue" + ); + } + return; + } + let at_capacity = filtered.len() < approved.len(); + (filtered, at_capacity) + } else { + (approved, false) + }; + let max_batch = self.inner.config.withdrawal_max_batch_size(); let delay_ms = self.inner.config.withdrawal_batching_delay_ms(); @@ -914,7 +960,7 @@ impl LeaderService { .first() .is_some_and(|r| checkpoint_timestamp_ms >= r.timestamp_ms + delay_ms); - if !batch_is_full && !oldest_has_waited { + if !batch_is_full && !oldest_has_waited && !at_capacity { debug!( "Holding {} approved request(s): oldest is {}ms old, \ waiting for {}ms delay or {} requests", From be247e5d3f9ecea975150495a4a9db75601a7333 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 16:09:29 -0700 Subject: [PATCH 3/5] [guardian] hold inflight until watcher sees signed withdrawal After submit_sign_withdrawal succeeds, wait for the watcher to observe the on-chain signatures before returning from process_unsigned_withdrawal_txn. Until this returns, the txn ID stays in inflight_withdrawal_signings, so the next checkpoint can't respawn the task and re-call the guardian RPC with seq+1, which would double-advance the authoritative limiter. Gated on guardian_client.is_some() so non-guardian deployments behave as before. --- crates/hashi/src/leader/mod.rs | 37 ++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index 0664dc78b..8b4269f9e 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -1349,6 +1349,13 @@ impl LeaderService { .inc(); })?; + // Hold the txn in inflight_withdrawal_signings until the watcher + // sees the on-chain signatures; otherwise the next checkpoint may + // respawn this task and double-advance the guardian seq. + if inner.guardian_client().is_some() { + Self::wait_for_signed_withdrawal_visible(&inner, &txn.id).await; + } + Ok(()) } @@ -1930,6 +1937,36 @@ impl LeaderService { .await } + async fn wait_for_signed_withdrawal_visible(inner: &Arc, txn_id: &Address) { + const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(30); + let mut checkpoint_rx = inner.onchain_state().subscribe_checkpoint(); + let wait = async { + loop { + let visible = inner + .onchain_state() + .withdrawal_txns() + .iter() + .any(|p| p.id == *txn_id && p.signatures.is_some()); + if visible { + return; + } + if checkpoint_rx.changed().await.is_err() { + return; + } + } + }; + if tokio::time::timeout(VISIBILITY_TIMEOUT, wait) + .await + .is_err() + { + warn!( + withdrawal_txn_id = %txn_id, + "Timeout waiting for watcher to observe signed withdrawal; \ + a duplicate guardian RPC may follow on the next checkpoint" + ); + } + } + async fn submit_confirm_withdrawal( inner: &Arc, withdrawal_txn_id: &Address, From 074e9527be5095d80438cca023d6b69fb5716af7 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 16:09:35 -0700 Subject: [PATCH 4/5] [guardian] e2e: wait for every node's local limiter before withdrawing Local-limiter bootstrap is event-driven and per-node, so without an explicit wait the first MPC sign request can race with bootstrap on slow CI: a follower whose limiter is configured but a leader whose limiter isn't (or vice versa) bails on the validate_consume guard, MPC times out, and the leader retries. Add HashiNodeHandle::wait_for_local_limiter and call it after the guardian harness finalizes. --- crates/e2e-tests/src/e2e_flow.rs | 3 +++ crates/e2e-tests/src/hashi_network.rs | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/crates/e2e-tests/src/e2e_flow.rs b/crates/e2e-tests/src/e2e_flow.rs index f3522cf8f..cae13adf4 100644 --- a/crates/e2e-tests/src/e2e_flow.rs +++ b/crates/e2e-tests/src/e2e_flow.rs @@ -288,6 +288,9 @@ mod tests { .as_ref() .expect("harness present when .with_guardian() is set"); assert!(harness.enclave().is_fully_initialized()); + for node in networks.hashi_network.nodes() { + node.wait_for_local_limiter(Duration::from_secs(60)).await?; + } } let deposit_amount_sats = 100_000u64; diff --git a/crates/e2e-tests/src/hashi_network.rs b/crates/e2e-tests/src/hashi_network.rs index dff4b5084..c774404ef 100644 --- a/crates/e2e-tests/src/hashi_network.rs +++ b/crates/e2e-tests/src/hashi_network.rs @@ -158,6 +158,24 @@ impl HashiNodeHandle { } } + pub async fn wait_for_local_limiter(&self, timeout: std::time::Duration) -> Result<()> { + tokio::time::timeout(timeout, self.wait_for_local_limiter_inner()) + .await + .map_err(|_| { + anyhow::anyhow!("local limiter bootstrap timed out after {:?}", timeout) + })?; + Ok(()) + } + + async fn wait_for_local_limiter_inner(&self) { + loop { + if self.hashi().local_limiter().is_some() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + pub fn current_epoch(&self) -> Option { self.hashi() .onchain_state_opt() From 8ff6a845ab1cf3a8846e914f9fda32d20cb345da Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 28 Apr 2026 16:30:31 -0700 Subject: [PATCH 5/5] [guardian] simplify visibility wait and parallelize bootstrap wait Tech-lead review feedback on the two previous commits: leader::wait_until_signed_visible (renamed from wait_for_signed_withdrawal_visible): - Drop the guardian_client gate at the call site. The same respawn race produces a duplicate sign_withdrawal submission without guardian, just with a different failure mode (noisy sui_tx_submissions_total). One unconditional wait covers both. - Take &Hashi instead of &Arc; the helper never clones. - Use the existing OnchainState::withdrawal_txn(id) getter instead of cloning the whole withdrawal_txns map and scanning it. - Use is_some_and for the visibility check. - Drop the redundant is_err() branch on the watch wakeup; loop body re-checks visibility on every iteration regardless. e2e: drop the ?; Ok(()) tail on wait_for_local_limiter and parallelize the per-node bootstrap waits with try_join_all. --- crates/e2e-tests/src/e2e_flow.rs | 11 ++++++++--- crates/e2e-tests/src/hashi_network.rs | 5 +---- crates/hashi/src/leader/mod.rs | 23 +++++++++-------------- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/crates/e2e-tests/src/e2e_flow.rs b/crates/e2e-tests/src/e2e_flow.rs index cae13adf4..576cedd33 100644 --- a/crates/e2e-tests/src/e2e_flow.rs +++ b/crates/e2e-tests/src/e2e_flow.rs @@ -288,9 +288,14 @@ mod tests { .as_ref() .expect("harness present when .with_guardian() is set"); assert!(harness.enclave().is_fully_initialized()); - for node in networks.hashi_network.nodes() { - node.wait_for_local_limiter(Duration::from_secs(60)).await?; - } + futures::future::try_join_all( + networks + .hashi_network + .nodes() + .iter() + .map(|node| node.wait_for_local_limiter(Duration::from_secs(60))), + ) + .await?; } let deposit_amount_sats = 100_000u64; diff --git a/crates/e2e-tests/src/hashi_network.rs b/crates/e2e-tests/src/hashi_network.rs index c774404ef..e793677e8 100644 --- a/crates/e2e-tests/src/hashi_network.rs +++ b/crates/e2e-tests/src/hashi_network.rs @@ -161,10 +161,7 @@ impl HashiNodeHandle { pub async fn wait_for_local_limiter(&self, timeout: std::time::Duration) -> Result<()> { tokio::time::timeout(timeout, self.wait_for_local_limiter_inner()) .await - .map_err(|_| { - anyhow::anyhow!("local limiter bootstrap timed out after {:?}", timeout) - })?; - Ok(()) + .map_err(|_| anyhow::anyhow!("local limiter bootstrap timed out after {:?}", timeout)) } async fn wait_for_local_limiter_inner(&self) { diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index 8b4269f9e..5a655a332 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -1349,12 +1349,10 @@ impl LeaderService { .inc(); })?; - // Hold the txn in inflight_withdrawal_signings until the watcher - // sees the on-chain signatures; otherwise the next checkpoint may - // respawn this task and double-advance the guardian seq. - if inner.guardian_client().is_some() { - Self::wait_for_signed_withdrawal_visible(&inner, &txn.id).await; - } + // Hold inflight until the watcher sees on-chain signatures; otherwise + // the next checkpoint can respawn and double-advance the guardian seq + // (or push a duplicate sign_withdrawal without guardian). + Self::wait_until_signed_visible(&inner, &txn.id).await; Ok(()) } @@ -1937,22 +1935,19 @@ impl LeaderService { .await } - async fn wait_for_signed_withdrawal_visible(inner: &Arc, txn_id: &Address) { + async fn wait_until_signed_visible(inner: &Hashi, txn_id: &Address) { const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(30); let mut checkpoint_rx = inner.onchain_state().subscribe_checkpoint(); let wait = async { loop { let visible = inner .onchain_state() - .withdrawal_txns() - .iter() - .any(|p| p.id == *txn_id && p.signatures.is_some()); + .withdrawal_txn(txn_id) + .is_some_and(|t| t.signatures.is_some()); if visible { return; } - if checkpoint_rx.changed().await.is_err() { - return; - } + let _ = checkpoint_rx.changed().await; } }; if tokio::time::timeout(VISIBILITY_TIMEOUT, wait) @@ -1962,7 +1957,7 @@ impl LeaderService { warn!( withdrawal_txn_id = %txn_id, "Timeout waiting for watcher to observe signed withdrawal; \ - a duplicate guardian RPC may follow on the next checkpoint" + a duplicate sign attempt may follow on the next checkpoint" ); } }