Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions ethexe/node-loader/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
marker::PhantomData,
sync::Arc,
time::Duration,
};
use tokio::sync::{
RwLock,
Expand Down Expand Up @@ -117,6 +118,13 @@ type WorkerBatchFuture =

const MAX_MULTICALL_CALLDATA_BYTES: usize = 120 * 1024;

/// Per-batch watchdog: drop and reschedule a batch if a hung RPC call parks
/// the worker. Generous: code validation alone takes ~14 s for 5 codes.
const BATCH_TIMEOUT: Duration = Duration::from_secs(180);
Comment thread
grishasobol marked this conversation as resolved.

/// Cadence of pool-progress heartbeats so a stalled pool is visible in `docker logs`.
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);

/// Converts an arbitrary salt buffer into the fixed 32-byte form expected by
/// Ethereum ABI bindings.
pub(crate) fn salt_to_h256(salt: &[u8]) -> H256 {
Expand Down Expand Up @@ -350,6 +358,11 @@ impl<Rng: CallGenRng> BatchPool<Rng> {
});
}

let mut heartbeat = tokio::time::interval(HEARTBEAT_INTERVAL);
heartbeat.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// Skip the immediate first tick: don't log a heartbeat before any batch starts.
heartbeat.tick().await;

while !batches.is_empty() {
let (worker_idx, rpc_pool, report) = tokio::select! {
Some(result) = batches.next() => result,
Expand All @@ -366,6 +379,16 @@ impl<Rng: CallGenRng> BatchPool<Rng> {
}
continue;
}
_ = heartbeat.tick() => {
tracing::info!(
completed = self.batch_stats.completed_batches,
failed = self.batch_stats.failed_batches,
in_flight = batches.len(),
shutting_down,
"Pool heartbeat"
);
continue;
}
};

self.rpc_pools[worker_idx] = Some(rpc_pool);
Expand Down Expand Up @@ -501,23 +524,37 @@ async fn run_batch(
) -> (EthexeRpcPool, Result<BatchRunReport>) {
let PreparedBatchWithSeed { seed, batch, .. } = batch;

let result = match run_batch_impl(
api,
&mut rpc_pool,
endpoint_idx,
batch,
send_message_multicall,
use_send_message_multicall,
rx,
mid_map,
let result = match tokio::time::timeout(
BATCH_TIMEOUT,
run_batch_impl(
api,
&mut rpc_pool,
endpoint_idx,
batch,
send_message_multicall,
use_send_message_multicall,
rx,
mid_map,
),
)
.await
{
Ok(report) => Ok(BatchRunReport::new(seed, report)),
Err(err) => {
Ok(Ok(report)) => Ok(BatchRunReport::new(seed, report)),
Ok(Err(err)) => {
tracing::warn!("Batch failed: {err:?}");
Err(err)
}
Err(_) => {
tracing::warn!(
seed,
timeout_secs = BATCH_TIMEOUT.as_secs(),
"Batch timed out, dropping it and rescheduling worker"
);
Err(anyhow::anyhow!(
"batch {seed} exceeded {:?} watchdog timeout",
BATCH_TIMEOUT
))
}
};
(rpc_pool, result)
}
Expand Down
28 changes: 23 additions & 5 deletions ethexe/node-loader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,24 @@ fn worker_mint_amount(override_amount: Option<u128>, policy: Option<&ValuePolicy
.max(MIN_POLICY_WORKER_MINT_AMOUNT)
}

/// Resolves on the first SIGINT or SIGTERM. Without SIGTERM `docker stop`
/// hard-kills the run before drain.
async fn wait_for_shutdown_signal() -> std::io::Result<&'static str> {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate())?;
tokio::select! {
result = tokio::signal::ctrl_c() => result.map(|()| "Ctrl+C / SIGINT"),
_ = sigterm.recv() => Ok("SIGTERM"),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.map(|()| "Ctrl+C / SIGINT")
}
}

async fn run_load_runtime(
batch_pool: BatchPool<SmallRng>,
config: LoadRunConfig,
Expand All @@ -444,11 +462,11 @@ async fn run_load_runtime(
let (listener_shutdown_tx, listener_shutdown_rx) = tokio::sync::watch::channel(false);
let pool_task = batch_pool.run(config, pool_shutdown_rx);
let block_listener = utils::listen_blocks(tx, provider, listener_shutdown_rx);
let ctrl_c = tokio::signal::ctrl_c();
let shutdown_signal = wait_for_shutdown_signal();

tokio::pin!(pool_task);
tokio::pin!(block_listener);
tokio::pin!(ctrl_c);
tokio::pin!(shutdown_signal);

let mut interrupted = false;
let mut pool_result = None;
Expand All @@ -464,10 +482,10 @@ async fn run_load_runtime(
listener_result = Some(result);
let _ = pool_shutdown_tx.send(true);
}
signal = &mut ctrl_c, if !interrupted => {
signal?;
signal = &mut shutdown_signal, if !interrupted => {
let kind = signal?;
interrupted = true;
info!("Ctrl+C received; stopping new batches and draining in-flight work");
info!("{kind} received; stopping new batches and draining in-flight work");
let _ = pool_shutdown_tx.send(true);
}
}
Expand Down
Loading