diff --git a/ethexe/node-loader/src/batch.rs b/ethexe/node-loader/src/batch.rs index 1992564b572..3ab6ecb9e5a 100644 --- a/ethexe/node-loader/src/batch.rs +++ b/ethexe/node-loader/src/batch.rs @@ -36,6 +36,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, marker::PhantomData, sync::Arc, + time::Duration, }; use tokio::sync::{ RwLock, @@ -117,6 +118,13 @@ type WorkerBatchFuture = const MAX_MULTICALL_CALLDATA_BYTES: usize = 120 * 1024; +/// Per-batch watchdog: drop and reschedule a batch if a hung RPC call parks +/// the worker. Generous: code validation alone takes ~14 s for 5 codes. +const BATCH_TIMEOUT: Duration = Duration::from_secs(180); + +/// Cadence of pool-progress heartbeats so a stalled pool is visible in `docker logs`. +const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10); + /// Converts an arbitrary salt buffer into the fixed 32-byte form expected by /// Ethereum ABI bindings. pub(crate) fn salt_to_h256(salt: &[u8]) -> H256 { @@ -350,6 +358,11 @@ impl BatchPool { }); } + let mut heartbeat = tokio::time::interval(HEARTBEAT_INTERVAL); + heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + // Skip the immediate first tick: don't log a heartbeat before any batch starts. + heartbeat.tick().await; + while !batches.is_empty() { let (worker_idx, rpc_pool, report) = tokio::select! { Some(result) = batches.next() => result, @@ -366,6 +379,16 @@ impl BatchPool { } continue; } + _ = heartbeat.tick() => { + tracing::info!( + completed = self.batch_stats.completed_batches, + failed = self.batch_stats.failed_batches, + in_flight = batches.len(), + shutting_down, + "Pool heartbeat" + ); + continue; + } }; self.rpc_pools[worker_idx] = Some(rpc_pool); @@ -501,23 +524,37 @@ async fn run_batch( ) -> (EthexeRpcPool, Result) { let PreparedBatchWithSeed { seed, batch, .. } = batch; - let result = match run_batch_impl( - api, - &mut rpc_pool, - endpoint_idx, - batch, - send_message_multicall, - use_send_message_multicall, - rx, - mid_map, + let result = match tokio::time::timeout( + BATCH_TIMEOUT, + run_batch_impl( + api, + &mut rpc_pool, + endpoint_idx, + batch, + send_message_multicall, + use_send_message_multicall, + rx, + mid_map, + ), ) .await { - Ok(report) => Ok(BatchRunReport::new(seed, report)), - Err(err) => { + Ok(Ok(report)) => Ok(BatchRunReport::new(seed, report)), + Ok(Err(err)) => { tracing::warn!("Batch failed: {err:?}"); Err(err) } + Err(_) => { + tracing::warn!( + seed, + timeout_secs = BATCH_TIMEOUT.as_secs(), + "Batch timed out, dropping it and rescheduling worker" + ); + Err(anyhow::anyhow!( + "batch {seed} exceeded {:?} watchdog timeout", + BATCH_TIMEOUT + )) + } }; (rpc_pool, result) } diff --git a/ethexe/node-loader/src/main.rs b/ethexe/node-loader/src/main.rs index 2889aae66ed..4010ba59a60 100644 --- a/ethexe/node-loader/src/main.rs +++ b/ethexe/node-loader/src/main.rs @@ -434,6 +434,24 @@ fn worker_mint_amount(override_amount: Option, policy: Option<&ValuePolicy .max(MIN_POLICY_WORKER_MINT_AMOUNT) } +/// Resolves on the first SIGINT or SIGTERM. Without SIGTERM `docker stop` +/// hard-kills the run before drain. +async fn wait_for_shutdown_signal() -> std::io::Result<&'static str> { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + let mut sigterm = signal(SignalKind::terminate())?; + tokio::select! { + result = tokio::signal::ctrl_c() => result.map(|()| "Ctrl+C / SIGINT"), + _ = sigterm.recv() => Ok("SIGTERM"), + } + } + #[cfg(not(unix))] + { + tokio::signal::ctrl_c().await.map(|()| "Ctrl+C / SIGINT") + } +} + async fn run_load_runtime( batch_pool: BatchPool, config: LoadRunConfig, @@ -444,11 +462,11 @@ async fn run_load_runtime( let (listener_shutdown_tx, listener_shutdown_rx) = tokio::sync::watch::channel(false); let pool_task = batch_pool.run(config, pool_shutdown_rx); let block_listener = utils::listen_blocks(tx, provider, listener_shutdown_rx); - let ctrl_c = tokio::signal::ctrl_c(); + let shutdown_signal = wait_for_shutdown_signal(); tokio::pin!(pool_task); tokio::pin!(block_listener); - tokio::pin!(ctrl_c); + tokio::pin!(shutdown_signal); let mut interrupted = false; let mut pool_result = None; @@ -464,10 +482,10 @@ async fn run_load_runtime( listener_result = Some(result); let _ = pool_shutdown_tx.send(true); } - signal = &mut ctrl_c, if !interrupted => { - signal?; + signal = &mut shutdown_signal, if !interrupted => { + let kind = signal?; interrupted = true; - info!("Ctrl+C received; stopping new batches and draining in-flight work"); + info!("{kind} received; stopping new batches and draining in-flight work"); let _ = pool_shutdown_tx.send(true); } }