diff --git a/crates/e2e-tests/src/e2e_flow.rs b/crates/e2e-tests/src/e2e_flow.rs index 9a658692e..576cedd33 100644 --- a/crates/e2e-tests/src/e2e_flow.rs +++ b/crates/e2e-tests/src/e2e_flow.rs @@ -288,6 +288,14 @@ mod tests { .as_ref() .expect("harness present when .with_guardian() is set"); assert!(harness.enclave().is_fully_initialized()); + futures::future::try_join_all( + networks + .hashi_network + .nodes() + .iter() + .map(|node| node.wait_for_local_limiter(Duration::from_secs(60))), + ) + .await?; } let deposit_amount_sats = 100_000u64; @@ -365,6 +373,25 @@ mod tests { ) .await?; + if with_guardian { + let guardian_state = networks + .guardian_harness + .as_ref() + .expect("harness present when .with_guardian() is set") + .enclave() + .state + .limiter_state() + .await + .expect("guardian limiter state present after a successful withdrawal"); + assert_eq!(guardian_state.next_seq, 1); + let local_state = hashi + .local_limiter() + .expect("local limiter present after bootstrap") + .snapshot() + .await; + assert_eq!(local_state, guardian_state); + } + info!("=== Bitcoin Withdrawal E2E Test{label} Passed ==="); Ok(()) } diff --git a/crates/e2e-tests/src/hashi_network.rs b/crates/e2e-tests/src/hashi_network.rs index dff4b5084..e793677e8 100644 --- a/crates/e2e-tests/src/hashi_network.rs +++ b/crates/e2e-tests/src/hashi_network.rs @@ -158,6 +158,21 @@ impl HashiNodeHandle { } } + pub async fn wait_for_local_limiter(&self, timeout: std::time::Duration) -> Result<()> { + tokio::time::timeout(timeout, self.wait_for_local_limiter_inner()) + .await + .map_err(|_| anyhow::anyhow!("local limiter bootstrap timed out after {:?}", timeout)) + } + + async fn wait_for_local_limiter_inner(&self) { + loop { + if self.hashi().local_limiter().is_some() { + return; + } + tokio::time::sleep(POLL_INTERVAL).await; + } + } + pub fn current_epoch(&self) -> Option { self.hashi() .onchain_state_opt() diff --git a/crates/hashi-guardian/src/enclave.rs b/crates/hashi-guardian/src/enclave.rs index 0ea84a51d..49ea09c25 100644 --- a/crates/hashi-guardian/src/enclave.rs +++ b/crates/hashi-guardian/src/enclave.rs @@ -310,6 +310,7 @@ impl EnclaveState { pub async fn consume_from_limiter( &self, + wid: u64, seq: u64, timestamp: u64, amount_sats: u64, @@ -324,7 +325,7 @@ impl EnclaveState { ) .await .map_err(|_| InvalidInputs("timed out waiting for rate limiter lock".into()))?; - guard.consume(seq, timestamp, amount_sats)?; + guard.consume(wid, seq, timestamp, amount_sats)?; Ok(LimiterGuard::new(guard)) } diff --git a/crates/hashi-guardian/src/withdraw.rs b/crates/hashi-guardian/src/withdraw.rs index 3bbcad79a..bb881a3af 100644 --- a/crates/hashi-guardian/src/withdraw.rs +++ b/crates/hashi-guardian/src/withdraw.rs @@ -99,9 +99,11 @@ async fn normal_withdrawal_inner( info!("Checking rate limits."); let consumed_amount_sats = request.utxos().external_out_amount().to_sat(); + let wid = *request.wid(); let limiter_guard = enclave .state .consume_from_limiter( + wid, request.seq(), request.timestamp_secs(), consumed_amount_sats, diff --git a/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto b/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto index 95b506611..affa4054b 100644 --- a/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto +++ b/crates/hashi-types/proto/sui/hashi/v1alpha/bridge_service.proto @@ -18,6 +18,9 @@ service BridgeService { rpc SignWithdrawalTxConstruction(SignWithdrawalTxConstructionRequest) returns (SignWithdrawalTxConstructionResponse); // Step 2b: Sign a bitcoin withdrawal transaction (MPC Schnorr). rpc SignWithdrawalTransaction(SignWithdrawalTransactionRequest) returns (SignWithdrawalTransactionResponse); + // Sign a guardian rate-limiting request so the leader can aggregate a + // committee certificate and forward it to the guardian after MPC quorum. + rpc SignGuardianWithdrawalRequest(SignGuardianWithdrawalRequestRequest) returns (SignGuardianWithdrawalRequestResponse); // Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. rpc SignWithdrawalTxSigning(SignWithdrawalTxSigningRequest) returns (SignWithdrawalTxSigningResponse); // Step 4: Sign committee approval to confirm a processed withdrawal on-chain. @@ -165,3 +168,21 @@ message SignWithdrawalConfirmationRequest { message SignWithdrawalConfirmationResponse { MemberSignature member_signature = 1; } + +// The leader sends the withdrawal-transaction id along with the guardian-specific +// fields (timestamp, seq) so each validator can independently reconstruct and +// BLS-sign the same `StandardWithdrawalRequest`. +message SignGuardianWithdrawalRequestRequest { + // The id of the WithdrawalTransaction on Sui (32 bytes). + bytes withdrawal_txn_id = 1; + + // Timestamp in unix seconds (used for guardian rate limiting). + uint64 timestamp_secs = 2; + + // Monotonic sequence number (used by guardian for replay prevention). + uint64 seq = 3; +} + +message SignGuardianWithdrawalRequestResponse { + MemberSignature member_signature = 1; +} diff --git a/crates/hashi-types/src/guardian/limiter.rs b/crates/hashi-types/src/guardian/limiter.rs index 9dc3f88fb..927b4afa2 100644 --- a/crates/hashi-types/src/guardian/limiter.rs +++ b/crates/hashi-types/src/guardian/limiter.rs @@ -66,7 +66,13 @@ impl RateLimiter { /// Consume tokens from the bucket. Validates seq and timestamp ordering, /// refills based on elapsed time, then debits the requested amount. - pub fn consume(&mut self, seq: u64, timestamp: u64, amount_sats: u64) -> GuardianResult<()> { + pub fn consume( + &mut self, + _wid: u64, + seq: u64, + timestamp: u64, + amount_sats: u64, + ) -> GuardianResult<()> { if seq != self.state.next_seq { return Err(InvalidInputs(format!( "seq mismatch: expected {}, got {}", @@ -95,7 +101,6 @@ impl RateLimiter { return Err(RateLimitExceeded); } - // Snapshot for revert, then mutate. self.prev_state = self.state; self.state.last_updated_at = timestamp; self.state.num_tokens_available = capacity - amount_sats; @@ -131,18 +136,18 @@ mod test { fn test_basic() { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); - assert!(limiter.consume(0, 1, config.refill_rate).is_ok()); + assert!(limiter.consume(1, 0, 1, config.refill_rate).is_ok()); let target_amount = 1_000_000u64; let num_secs_required = target_amount.div_ceil(config.refill_rate); assert!( limiter - .consume(1, num_secs_required, target_amount) + .consume(2, 1, num_secs_required, target_amount) .is_err() ); assert!( limiter - .consume(1, 1 + num_secs_required, target_amount) + .consume(2, 1, 1 + num_secs_required, target_amount) .is_ok() ); } @@ -153,12 +158,12 @@ mod test { let mut limiter = RateLimiter::new(config, state).unwrap(); assert!( limiter - .consume(0, u64::MAX, config.max_bucket_capacity + 1) + .consume(1, 0, u64::MAX, config.max_bucket_capacity + 1) .is_err() ); assert!( limiter - .consume(0, u64::MAX, config.max_bucket_capacity) + .consume(1, 0, u64::MAX, config.max_bucket_capacity) .is_ok() ); } @@ -168,7 +173,7 @@ mod test { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); // Consume after refill, then revert — should restore original state. - limiter.consume(0, 100, 50_000).unwrap(); + limiter.consume(1, 0, 100, 50_000).unwrap(); assert_eq!(limiter.state().num_tokens_available, 50_000); // 100*1000 - 50_000 limiter.revert(); assert_eq!(limiter.state().num_tokens_available, 0); @@ -181,10 +186,10 @@ mod test { let (config, state) = make_limiter(); let mut limiter = RateLimiter::new(config, state).unwrap(); // Wrong seq. - assert!(limiter.consume(1, 0, 0).is_err()); + assert!(limiter.consume(1, 1, 0, 0).is_err()); // Advance state. - limiter.consume(0, 100, 1_000).unwrap(); + limiter.consume(1, 0, 100, 1_000).unwrap(); // Old timestamp. - assert!(limiter.consume(1, 50, 1_000).is_err()); + assert!(limiter.consume(2, 1, 50, 1_000).is_err()); } } diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin index 2407c17ba..f5332db7d 100644 Binary files a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin and b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.fds.bin differ diff --git a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs index 8e465f6b4..e1c540cef 100644 --- a/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs +++ b/crates/hashi-types/src/proto/generated/sui.hashi.v1alpha.rs @@ -155,6 +155,26 @@ pub struct SignWithdrawalConfirmationResponse { #[prost(message, optional, tag = "1")] pub member_signature: ::core::option::Option, } +/// The leader sends the withdrawal-transaction id along with the guardian-specific +/// fields (timestamp, seq) so each validator can independently reconstruct and +/// BLS-sign the same `StandardWithdrawalRequest`. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SignGuardianWithdrawalRequestRequest { + /// The id of the WithdrawalTransaction on Sui (32 bytes). + #[prost(bytes = "bytes", tag = "1")] + pub withdrawal_txn_id: ::prost::bytes::Bytes, + /// Timestamp in unix seconds (used for guardian rate limiting). + #[prost(uint64, tag = "2")] + pub timestamp_secs: u64, + /// Monotonic sequence number (used by guardian for replay prevention). + #[prost(uint64, tag = "3")] + pub seq: u64, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct SignGuardianWithdrawalRequestResponse { + #[prost(message, optional, tag = "1")] + pub member_signature: ::core::option::Option, +} /// Generated client implementations. pub mod bridge_service_client { #![allow( @@ -394,6 +414,37 @@ pub mod bridge_service_client { ); self.inner.unary(req, path, codec).await } + /// Sign a guardian rate-limiting request so the leader can aggregate a + /// committee certificate and forward it to the guardian after MPC quorum. + pub async fn sign_guardian_withdrawal_request( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sui.hashi.v1alpha.BridgeService/SignGuardianWithdrawalRequest", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "sui.hashi.v1alpha.BridgeService", + "SignGuardianWithdrawalRequest", + ), + ); + self.inner.unary(req, path, codec).await + } /// Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. pub async fn sign_withdrawal_tx_signing( &mut self, @@ -510,6 +561,15 @@ pub mod bridge_service_server { tonic::Response, tonic::Status, >; + /// Sign a guardian rate-limiting request so the leader can aggregate a + /// committee certificate and forward it to the guardian after MPC quorum. + async fn sign_guardian_withdrawal_request( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Step 3: Sign the BLS certificate over the witness signatures for on-chain storage. async fn sign_withdrawal_tx_signing( &self, @@ -858,6 +918,60 @@ pub mod bridge_service_server { }; Box::pin(fut) } + "/sui.hashi.v1alpha.BridgeService/SignGuardianWithdrawalRequest" => { + #[allow(non_camel_case_types)] + struct SignGuardianWithdrawalRequestSvc( + pub Arc, + ); + impl< + T: BridgeService, + > tonic::server::UnaryService< + super::SignGuardianWithdrawalRequestRequest, + > for SignGuardianWithdrawalRequestSvc { + type Response = super::SignGuardianWithdrawalRequestResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + super::SignGuardianWithdrawalRequestRequest, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::sign_guardian_withdrawal_request( + &inner, + request, + ) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SignGuardianWithdrawalRequestSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } "/sui.hashi.v1alpha.BridgeService/SignWithdrawalTxSigning" => { #[allow(non_camel_case_types)] struct SignWithdrawalTxSigningSvc(pub Arc); diff --git a/crates/hashi/src/grpc/bridge_service.rs b/crates/hashi/src/grpc/bridge_service.rs index 762658edb..be6113ab3 100644 --- a/crates/hashi/src/grpc/bridge_service.rs +++ b/crates/hashi/src/grpc/bridge_service.rs @@ -19,6 +19,8 @@ use hashi_types::proto::GetServiceInfoRequest; use hashi_types::proto::GetServiceInfoResponse; use hashi_types::proto::SignDepositConfirmationRequest; use hashi_types::proto::SignDepositConfirmationResponse; +use hashi_types::proto::SignGuardianWithdrawalRequestRequest; +use hashi_types::proto::SignGuardianWithdrawalRequestResponse; use hashi_types::proto::SignWithdrawalConfirmationRequest; use hashi_types::proto::SignWithdrawalConfirmationResponse; use hashi_types::proto::SignWithdrawalRequestApprovalRequest; @@ -131,6 +133,43 @@ impl BridgeService for HttpService { })) } + /// Validate and BLS-sign a `StandardWithdrawalRequest` for the guardian. + #[tracing::instrument( + level = "info", + skip_all, + fields(withdrawal_txn_id = tracing::field::Empty, caller = tracing::field::Empty), + )] + async fn sign_guardian_withdrawal_request( + &self, + request: Request, + ) -> Result, Status> { + let caller = authenticate_caller(&request)?; + tracing::Span::current().record("caller", tracing::field::display(&caller)); + let req = request.get_ref(); + let withdrawal_txn_id = Address::from_bytes(&req.withdrawal_txn_id) + .map_err(|e| Status::invalid_argument(format!("invalid withdrawal_txn_id: {e}")))?; + tracing::Span::current().record( + "withdrawal_txn_id", + tracing::field::display(&withdrawal_txn_id), + ); + let member_signature = self + .inner + .validate_and_sign_guardian_withdrawal_request( + &withdrawal_txn_id, + req.timestamp_secs, + req.seq, + ) + .map_err(|e| Status::failed_precondition(e.to_string()))?; + tracing::info!( + seq = req.seq, + timestamp_secs = req.timestamp_secs, + "Signed guardian withdrawal request", + ); + Ok(Response::new(SignGuardianWithdrawalRequestResponse { + member_signature: Some(member_signature), + })) + } + #[tracing::instrument( level = "info", skip_all, diff --git a/crates/hashi/src/grpc/guardian_client.rs b/crates/hashi/src/grpc/guardian_client.rs index a640f80ec..1d4c3f829 100644 --- a/crates/hashi/src/grpc/guardian_client.rs +++ b/crates/hashi/src/grpc/guardian_client.rs @@ -47,4 +47,15 @@ impl GuardianClient { .await?; Ok(response.into_inner()) } + + pub async fn standard_withdrawal( + &self, + request: hashi_types::proto::SignedStandardWithdrawalRequest, + ) -> Result { + let response = self + .guardian_service_client() + .standard_withdrawal(request) + .await?; + Ok(response.into_inner()) + } } diff --git a/crates/hashi/src/leader/mod.rs b/crates/hashi/src/leader/mod.rs index 11c9cd1d4..5a655a332 100644 --- a/crates/hashi/src/leader/mod.rs +++ b/crates/hashi/src/leader/mod.rs @@ -35,6 +35,7 @@ use hashi_types::committee::MemberSignature; use hashi_types::committee::certificate_threshold; use hashi_types::guardian::bitcoin_utils; use hashi_types::proto::SignDepositConfirmationRequest; +use hashi_types::proto::SignGuardianWithdrawalRequestRequest; use hashi_types::proto::SignWithdrawalConfirmationRequest; use hashi_types::proto::SignWithdrawalRequestApprovalRequest; use hashi_types::proto::SignWithdrawalTransactionRequest; @@ -71,6 +72,9 @@ pub struct LeaderService { inflight_withdrawal_signings: HashSet
, withdrawal_broadcast_tasks: JoinSet<(Address, anyhow::Result<()>)>, inflight_withdrawal_broadcasts: HashSet
, + /// Withdrawal requests already warned about exceeding local limiter + /// capacity. Pruned each checkpoint to track only currently-pending ids. + stuck_withdrawal_warned: HashSet
, deposit_gc_task: Option>>, proposal_gc_task: Option>>, } @@ -91,6 +95,7 @@ impl LeaderService { inflight_withdrawal_signings: HashSet::new(), withdrawal_broadcast_tasks: JoinSet::new(), inflight_withdrawal_broadcasts: HashSet::new(), + stuck_withdrawal_warned: HashSet::new(), deposit_gc_task: None, proposal_gc_task: None, } @@ -140,7 +145,8 @@ impl LeaderService { } self.process_unapproved_withdrawal_requests(checkpoint_timestamp_ms); - self.process_approved_withdrawal_requests(checkpoint_timestamp_ms); + self.process_approved_withdrawal_requests(checkpoint_timestamp_ms) + .await; self.process_unsigned_withdrawal_txns(); self.process_signed_withdrawal_txns(); self.check_delete_proposals(checkpoint_timestamp_ms); @@ -851,7 +857,7 @@ impl LeaderService { // Step 2: Construct withdrawal tx for approved requests // ======================================================================== - fn process_approved_withdrawal_requests(&mut self, checkpoint_timestamp_ms: u64) { + async fn process_approved_withdrawal_requests(&mut self, checkpoint_timestamp_ms: u64) { debug!("Entering process_approved_withdrawal_requests"); if self.is_reconfiguring() { debug!("Reconfig in progress, skipping withdrawal commitment processing"); @@ -872,6 +878,12 @@ impl LeaderService { .collect(); approved.sort_by_key(|r| r.timestamp_ms); + // Drop stuck-warn entries for requests that are no longer pending, + // so a re-stuck request would warn again. + let pending_ids: HashSet
= approved.iter().map(|r| r.id).collect(); + self.stuck_withdrawal_warned + .retain(|id| pending_ids.contains(id)); + if self .withdrawal_commitment_retry_tracker .should_skip(checkpoint_timestamp_ms) @@ -887,11 +899,6 @@ impl LeaderService { return; } - // Collect all approved requests and process them as a single batch. - // The coin selection algorithm picks up to - // max_withdrawal_requests oldest requests from the slice. - let batch: Vec = approved.into_iter().collect(); - self.inner .metrics .leader_items_in_backoff @@ -901,10 +908,50 @@ impl LeaderService { .in_backoff_count(checkpoint_timestamp_ms) as i64, ); - if batch.is_empty() { + if approved.is_empty() { return; } + // Capacity-aware batching: take the longest timestamp-sorted prefix + // whose cumulative `btc_amount` fits the local limiter's currently + // available capacity. Anything dropped fires `at_capacity` as a + // third "process now" trigger so the leader doesn't sit on excess + // demand for the full batching window. + let (batch, at_capacity) = if let Some(limiter) = self.inner.local_limiter() { + let timestamp_secs = checkpoint_timestamp_ms / 1000; + let capacity = limiter.capacity_at(timestamp_secs).await; + let mut cumulative = 0u64; + let mut filtered: Vec = Vec::with_capacity(approved.len()); + for req in &approved { + let candidate = cumulative.saturating_add(req.btc_amount); + if candidate > capacity { + break; + } + cumulative = candidate; + filtered.push(req.clone()); + } + if filtered.is_empty() { + // Single request larger than the currently-available capacity. + // It may fit after refill, or it may exceed `max_bucket_capacity` + // entirely — either way, leave it queued and warn once so the + // operator notices a stuck head. + let stuck = &approved[0]; + if self.stuck_withdrawal_warned.insert(stuck.id) { + warn!( + request_id = %stuck.id, + btc_amount = stuck.btc_amount, + available = capacity, + "Withdrawal request exceeds local limiter capacity; staying in queue" + ); + } + return; + } + let at_capacity = filtered.len() < approved.len(); + (filtered, at_capacity) + } else { + (approved, false) + }; + let max_batch = self.inner.config.withdrawal_max_batch_size(); let delay_ms = self.inner.config.withdrawal_batching_delay_ms(); @@ -913,7 +960,7 @@ impl LeaderService { .first() .is_some_and(|r| checkpoint_timestamp_ms >= r.timestamp_ms + delay_ms); - if !batch_is_full && !oldest_has_waited { + if !batch_is_full && !oldest_has_waited && !at_capacity { debug!( "Holding {} approved request(s): oldest is {}ms old, \ waiting for {}ms delay or {} requests", @@ -1111,7 +1158,13 @@ impl LeaderService { self.inflight_withdrawal_signings .retain(|id| pending_ids.contains(id)); - let max_concurrent = self.inner.config.max_concurrent_leader_job_tasks(); + // Guardian rejects out-of-order `timestamp_ms`, so serialize + // hard-reserves to one in-flight when guardian is configured. + let max_concurrent = if self.inner.guardian_client().is_some() { + 1 + } else { + self.inner.config.max_concurrent_leader_job_tasks() + }; for txn in withdrawal_txns { if self.withdrawal_signing_tasks.len() >= max_concurrent { break; @@ -1211,7 +1264,23 @@ impl LeaderService { .map(|s| s.to_byte_array().to_vec()) .collect(); - // 3. Build the WithdrawalTxSigning and get BLS certificate via fan-out + // 3. Post-MPC guardian RPC for the enclave signature. Each node has + // already advanced its local limiter at MPC signing time; this call + // records the consume with the authoritative guardian. + if let (Some(guardian), Some(seq)) = (inner.guardian_client(), expected_limiter_seq) { + let timestamp_secs = txn.timestamp_ms / 1000; + Self::finalize_withdrawal_through_guardian( + &inner, + &txn, + &members, + guardian, + timestamp_secs, + seq, + ) + .await?; + } + + // 4. Build the WithdrawalTxSigning and get BLS certificate via fan-out let signed_message = WithdrawalTxSigning { withdrawal_id: txn.id, request_ids: txn.request_ids.clone(), @@ -1255,7 +1324,7 @@ impl LeaderService { let signed = aggregator.finish()?; - // 4. Submit sign_withdrawal to Sui (writes signatures on-chain). + // 5. Submit sign_withdrawal to Sui (writes signatures on-chain). // Broadcast + confirm happens via process_signed_withdrawal_txns on the next tick. Self::submit_sign_withdrawal( &inner, @@ -1280,6 +1349,11 @@ impl LeaderService { .inc(); })?; + // Hold inflight until the watcher sees on-chain signatures; otherwise + // the next checkpoint can respawn and double-advance the guardian seq + // (or push a duplicate sign_withdrawal without guardian). + Self::wait_until_signed_visible(&inner, &txn.id).await; + Ok(()) } @@ -1861,6 +1935,33 @@ impl LeaderService { .await } + async fn wait_until_signed_visible(inner: &Hashi, txn_id: &Address) { + const VISIBILITY_TIMEOUT: Duration = Duration::from_secs(30); + let mut checkpoint_rx = inner.onchain_state().subscribe_checkpoint(); + let wait = async { + loop { + let visible = inner + .onchain_state() + .withdrawal_txn(txn_id) + .is_some_and(|t| t.signatures.is_some()); + if visible { + return; + } + let _ = checkpoint_rx.changed().await; + } + }; + if tokio::time::timeout(VISIBILITY_TIMEOUT, wait) + .await + .is_err() + { + warn!( + withdrawal_txn_id = %txn_id, + "Timeout waiting for watcher to observe signed withdrawal; \ + a duplicate sign attempt may follow on the next checkpoint" + ); + } + } + async fn submit_confirm_withdrawal( inner: &Arc, withdrawal_txn_id: &Address, @@ -1876,6 +1977,169 @@ impl LeaderService { info!("Successfully confirmed withdrawal {:?}", withdrawal_txn_id); Ok(()) } + + // ======================================================================== + // Guardian: post-MPC enclave-signature RPC + // ======================================================================== + + #[tracing::instrument(level = "info", skip_all, fields(withdrawal_txn_id = %txn.id, seq))] + async fn finalize_withdrawal_through_guardian( + inner: &Arc, + txn: &WithdrawalTransaction, + members: &[CommitteeMember], + guardian: &crate::grpc::guardian_client::GuardianClient, + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result<()> { + let signed_request = + Self::collect_guardian_withdrawal_signatures(inner, txn, members, timestamp_secs, seq) + .await?; + + let proto_request = + hashi_types::guardian::proto_conversions::signed_standard_withdrawal_request_to_pb( + &signed_request, + ); + + match guardian.standard_withdrawal(proto_request).await { + Ok(response_pb) => { + if let Some(pubkey) = inner.guardian_signing_pubkey() { + let signed_response = hashi_types::guardian::GuardianSigned::< + hashi_types::guardian::StandardWithdrawalResponse, + >::try_from(response_pb) + .map_err(|e| { + anyhow::anyhow!("Failed to parse guardian withdrawal response: {e}") + })?; + signed_response.verify(pubkey).map_err(|e| { + anyhow::anyhow!("Guardian response signature verification failed: {e:?}") + })?; + info!(seq, "Guardian approved withdrawal"); + } else { + warn!( + seq, + "Guardian approved but signing pubkey unavailable; \ + skipping response verification" + ); + } + Ok(()) + } + Err(status) => { + let label = if status.message().contains("seq mismatch") { + "GuardianSeqMismatch" + } else if status.message().contains("Rate limit exceeded") { + warn!("Guardian rate-limited withdrawal, will retry later"); + "GuardianRateLimited" + } else { + error!("Guardian call failed: {}", status.message()); + "GuardianUnavailable" + }; + inner + .metrics + .leader_retries_total + .with_label_values(&["withdrawal_signing", label]) + .inc(); + anyhow::bail!("Guardian rejected withdrawal: {}", status.message()) + } + } + } + + async fn collect_guardian_withdrawal_signatures( + inner: &Arc, + txn: &WithdrawalTransaction, + members: &[CommitteeMember], + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result< + hashi_types::committee::SignedMessage, + > { + let guardian_request = + crate::withdrawals::build_guardian_withdrawal_request(inner, txn, timestamp_secs, seq)?; + + let committee = inner + .onchain_state() + .current_committee() + .expect("No current committee"); + let required_weight = certificate_threshold(committee.total_weight()); + + let proto_request = SignGuardianWithdrawalRequestRequest { + withdrawal_txn_id: txn.id.as_bytes().to_vec().into(), + timestamp_secs, + seq, + }; + + let mut sig_tasks = JoinSet::new(); + for member in members { + let inner = inner.clone(); + let proto_request = proto_request.clone(); + let member = member.clone(); + sig_tasks.spawn(async move { + Self::request_guardian_withdrawal_signature(&inner, proto_request, &member).await + }); + } + + let mut aggregator = BlsSignatureAggregator::new(&committee, guardian_request); + while let Some(result) = sig_tasks.join_next().await { + let Ok(Some(sig)) = result else { continue }; + if let Err(e) = aggregator.add_signature(sig) { + error!( + withdrawal_txn_id = %txn.id, + "Failed to add guardian withdrawal signature: {e}" + ); + } + if aggregator.weight() >= required_weight { + break; + } + } + + let weight = aggregator.weight(); + if weight < required_weight { + anyhow::bail!( + "Insufficient guardian withdrawal signatures: weight {weight} < {required_weight}" + ); + } + + Ok(aggregator.finish()?) + } + + #[tracing::instrument(level = "debug", skip_all, fields(validator = %member.validator_address()))] + async fn request_guardian_withdrawal_signature( + inner: &Arc, + proto_request: SignGuardianWithdrawalRequestRequest, + member: &CommitteeMember, + ) -> Option { + let validator_address = member.validator_address(); + trace!("Requesting guardian withdrawal signature"); + + let mut rpc_client = inner + .onchain_state() + .bridge_service_client(&validator_address) + .or_else(|| { + error!( + "Cannot find client for validator address: {:?}", + validator_address + ); + None + })?; + + let response = rpc_client + .sign_guardian_withdrawal_request(proto_request.clone()) + .await + .inspect_err(|e| { + error!("Failed to get guardian withdrawal signature from {validator_address}: {e}"); + }) + .ok()?; + + response + .into_inner() + .member_signature + .ok_or_else(|| anyhow::anyhow!("No member_signature in response")) + .and_then(parse_member_signature) + .inspect_err(|e| { + error!( + "Failed to parse guardian withdrawal member signature from {validator_address}: {e}" + ); + }) + .ok() + } } fn deposit_request_to_proto(req: &DepositRequest) -> SignDepositConfirmationRequest { diff --git a/crates/hashi/src/withdrawals.rs b/crates/hashi/src/withdrawals.rs index 697ca2ab0..d47b51d98 100644 --- a/crates/hashi/src/withdrawals.rs +++ b/crates/hashi/src/withdrawals.rs @@ -393,6 +393,27 @@ impl Hashi { self.sign_message_proto(&confirmation) } + // --- Guardian: validate and BLS-sign a `StandardWithdrawalRequest` --- + + #[tracing::instrument(level = "info", skip_all, fields(%withdrawal_txn_id, seq))] + pub fn validate_and_sign_guardian_withdrawal_request( + &self, + withdrawal_txn_id: &Address, + timestamp_secs: u64, + seq: u64, + ) -> anyhow::Result { + let txn = self + .onchain_state() + .withdrawal_txn(withdrawal_txn_id) + .ok_or_else(|| { + anyhow!("WithdrawalTransaction {withdrawal_txn_id} not found on-chain") + })?; + + let guardian_request = build_guardian_withdrawal_request(self, &txn, timestamp_secs, seq)?; + + self.sign_message_proto(&guardian_request) + } + // --- Step 3: Sign withdrawal (store witness signatures on-chain) --- #[tracing::instrument(level = "info", skip_all, fields(withdrawal_id = %message.withdrawal_id))] @@ -1239,3 +1260,93 @@ fn script_pubkey_from_raw_address(address_bytes: &[u8]) -> anyhow::Result anyhow::Result { + use hashi_types::guardian::bitcoin_utils::InputUTXO; + use hashi_types::guardian::bitcoin_utils::OutputUTXO; + use hashi_types::guardian::bitcoin_utils::TxUTXOs; + + let hashi_pubkey = hashi.get_hashi_pubkey()?; + let network = hashi.config.bitcoin_network(); + + // Inputs: per-UTXO taproot script-path artifacts, keyed by deposit pubkey. + let inputs = txn + .inputs + .iter() + .map(|utxo| { + let pubkey = hashi.deposit_pubkey(&hashi_pubkey, utxo.derivation_path.as_ref())?; + let (_, _, leaf_hash) = + bitcoin_utils::single_key_taproot_script_path_spend_artifacts(&pubkey); + let address = hashi.bitcoin_address_from_pubkey(&pubkey); + + let outpoint = bitcoin::OutPoint { + txid: utxo.id.txid.into(), + vout: utxo.id.vout, + }; + + InputUTXO::new( + outpoint, + Amount::from_sat(utxo.amount), + address.into_unchecked(), + leaf_hash, + network, + ) + .map_err(|e| anyhow!("Failed to build guardian InputUTXO: {e}")) + }) + .collect::>>()?; + + // First N outputs are `External` payouts; any trailing output is the + // `Internal` change back to the hashi root. + let all_outputs = txn.all_outputs(); + let num_requests = txn.request_ids.len(); + let outputs = all_outputs + .iter() + .enumerate() + .map(|(i, output)| { + if i < num_requests { + let script_pubkey = script_pubkey_from_raw_address(&output.bitcoin_address)?; + let address = BitcoinAddress::from_script(&script_pubkey, network) + .map_err(|e| anyhow!("Cannot derive address from output script: {e}"))?; + OutputUTXO::new_external( + address.into_unchecked(), + Amount::from_sat(output.amount), + network, + ) + .map_err(|e| anyhow!("Failed to build guardian external OutputUTXO: {e}")) + } else { + let derivation_path = [0u8; 32]; + Ok(OutputUTXO::new_internal( + derivation_path, + Amount::from_sat(output.amount), + )) + } + }) + .collect::>>()?; + + let utxos = TxUTXOs::new(inputs, outputs) + .map_err(|e| anyhow!("Failed to build guardian TxUTXOs: {e}"))?; + + let wid = compute_withdrawal_wid(&txn.request_ids); + + Ok(hashi_types::guardian::StandardWithdrawalRequest::new( + wid, + utxos, + timestamp_secs, + seq, + )) +} + +/// Deterministic 64-bit id for a withdrawal — leading bytes of Blake2b256 over +/// the BCS-encoded request ids. Stable across restarts and leader rotations. +pub fn compute_withdrawal_wid(request_ids: &[Address]) -> u64 { + let bytes = bcs::to_bytes(&request_ids).expect("serialization should succeed"); + let hash = Blake2b256::digest(&bytes); + u64::from_le_bytes(hash.digest[..8].try_into().unwrap()) +}