diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index 3937e0f7f8..b224c3af70 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -389,6 +389,23 @@ "Guard": { "type": "object", "properties": { + "actor_force_wake_pending_timeout_ms": { + "description": "Timeout sent with actor force-wake requests in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "int64" + }, + "actor_ready_timeout_ms": { + "description": "Timeout for waiting for an actor to become ready in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "enable_websocket_health_route": { "description": "Enables the internal websocket health route for debug and latency testing. This is intended for websocket ping/pong verification and should remain disabled in normal deployments.", "type": [ @@ -433,6 +450,24 @@ "format": "uint16", "minimum": 0.0 }, + "route_cache_ttl_ms": { + "description": "TTL for cached route lookups in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "route_timeout_ms": { + "description": "Timeout for route resolution in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "tcp_nodelay": { "description": "Enables TCP_NODELAY on accepted Guard sockets.", "type": [ diff --git a/engine/packages/config/src/config/guard.rs b/engine/packages/config/src/config/guard.rs index e9fb3cf52d..a410baffce 100644 --- a/engine/packages/config/src/config/guard.rs +++ b/engine/packages/config/src/config/guard.rs @@ -14,6 +14,14 @@ pub struct Guard { /// Enables the internal websocket health route for debug and latency testing. This is intended /// for websocket ping/pong verification and should remain disabled in normal deployments. pub enable_websocket_health_route: Option, + /// TTL for cached route lookups in milliseconds. + pub route_cache_ttl_ms: Option, + /// Timeout for route resolution in milliseconds. + pub route_timeout_ms: Option, + /// Timeout for waiting for an actor to become ready in milliseconds. + pub actor_ready_timeout_ms: Option, + /// Timeout sent with actor force-wake requests in milliseconds. + pub actor_force_wake_pending_timeout_ms: Option, /// Enable & configure HTTPS pub https: Option, /// Max HTTP request body size in bytes (first line of defense). @@ -37,6 +45,23 @@ impl Guard { self.enable_websocket_health_route.unwrap_or(false) } + pub fn route_cache_ttl(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.route_cache_ttl_ms.unwrap_or(60 * 10 * 1000)) + } + + pub fn route_timeout(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.route_timeout_ms.unwrap_or(15_000)) + } + + pub fn actor_ready_timeout(&self) -> std::time::Duration { + std::time::Duration::from_millis(self.actor_ready_timeout_ms.unwrap_or(10_000)) + } + + pub fn actor_force_wake_pending_timeout(&self) -> i64 { + self.actor_force_wake_pending_timeout_ms + .unwrap_or(60 * 1000) + } + pub fn http_max_request_body_size(&self) -> usize { self.http_max_request_body_size.unwrap_or(20 * 1024 * 1024) // 20 MiB } diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 68081dbb23..657771e6ac 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -31,7 +31,7 @@ use crate::RouteTarget; use crate::request_context::RequestContext; use crate::response_body::ResponseBody; use crate::route::{ - CacheKeyFn, DEFAULT_ROUTE_TIMEOUT, ResolveRouteOutput, RouteCache, RoutingFn, RoutingOutput, + CacheKeyFn, ResolveRouteOutput, RouteCache, RoutingFn, RoutingOutput, }; use crate::utils::{InFlightCounter, RateLimiter}; use crate::{ @@ -89,13 +89,14 @@ impl ProxyState { let client = Client::builder(TokioExecutor::new()) .pool_idle_timeout(Duration::from_secs(30)) .build(https_connector); + let route_cache_ttl = config.guard().route_cache_ttl(); Self { config, routing_fn, cache_key_fn, client, - route_cache: RouteCache::new(), + route_cache: RouteCache::new(route_cache_ttl), rate_limiters: Cache::builder() .max_capacity(10_000) .time_to_live(PROXY_STATE_CACHE_TTL) @@ -134,21 +135,22 @@ impl ProxyState { let res = if let Some(res) = cache_res { res } else { - // Not in cache, call routing function with a default timeout - // Default 15 seconds, routing functions should have their own internal timeouts that are shorter + // Not in cache, call routing function with a configured timeout. + // Routing functions should have their own internal timeouts that are shorter. + let route_timeout = self.config.guard().route_timeout(); tracing::debug!( hostname = %req_ctx.hostname, path = %req_ctx.path, cache_hit = false, - timeout_seconds = DEFAULT_ROUTE_TIMEOUT.as_secs(), + timeout_seconds = route_timeout.as_secs(), "Cache miss, calling routing function" ); - let routing_res = timeout(DEFAULT_ROUTE_TIMEOUT, (self.routing_fn)(req_ctx)) + let routing_res = timeout(route_timeout, (self.routing_fn)(req_ctx)) .await .map_err(|_| { errors::RequestTimeout { - timeout_seconds: DEFAULT_ROUTE_TIMEOUT.as_secs(), + timeout_seconds: route_timeout.as_secs(), } .build() })??; diff --git a/engine/packages/guard-core/src/route.rs b/engine/packages/guard-core/src/route.rs index bfbb769684..e12e7bca24 100644 --- a/engine/packages/guard-core/src/route.rs +++ b/engine/packages/guard-core/src/route.rs @@ -7,9 +7,6 @@ use crate::custom_serve::CustomServeTrait; use crate::metrics; use crate::request_context::RequestContext; -const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60 * 10); // 10 minutes -pub(crate) const DEFAULT_ROUTE_TIMEOUT: Duration = Duration::from_secs(15); - // Routing types #[derive(Clone, Debug)] pub struct RouteTarget { @@ -51,11 +48,11 @@ pub(crate) struct RouteCache { } impl RouteCache { - pub(crate) fn new() -> Self { + pub(crate) fn new(ttl: Duration) -> Self { Self { cache: Cache::builder() .max_capacity(10_000) - .time_to_live(ROUTE_CACHE_TTL) + .time_to_live(ttl) .build(), } } diff --git a/engine/packages/guard/src/routing/pegboard_gateway/mod.rs b/engine/packages/guard/src/routing/pegboard_gateway/mod.rs index 222c7b80e6..70e7c4ef2e 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway/mod.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway/mod.rs @@ -23,9 +23,6 @@ use crate::{ use cors::{CorsPreflight, set_non_preflight_cors}; use resolve_actor_query::resolve_query; -const ACTOR_FORCE_WAKE_PENDING_TIMEOUT: i64 = util::duration::seconds(60); -const ACTOR_READY_TIMEOUT: Duration = Duration::from_secs(10); - /// Time to wait before starting pool error checks const RUNNER_POOL_ERROR_CHECK_DELAY: Duration = Duration::from_secs(1); /// Interval between pool error checks @@ -436,7 +433,7 @@ async fn handle_actor_v2( } } // Ready timeout - _ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => { + _ = tokio::time::sleep(ctx.config().guard().actor_ready_timeout()) => { return Err(errors::ActorReadyTimeout { actor_id }.build()); } } @@ -480,7 +477,7 @@ async fn handle_actor_v1( ctx.signal(pegboard::workflows::actor::Wake { allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep { - pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT), + pending_timeout: Some(ctx.config().guard().actor_force_wake_pending_timeout()), }, }) .to_workflow_id(actor.workflow_id) @@ -516,7 +513,9 @@ async fn handle_actor_v1( let res = ctx.signal(pegboard::workflows::actor::Wake { allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep { - pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT), + pending_timeout: Some( + ctx.config().guard().actor_force_wake_pending_timeout(), + ), }, }) .to_workflow_id(actor.workflow_id) @@ -565,7 +564,7 @@ async fn handle_actor_v1( } } // Ready timeout - _ = tokio::time::sleep(ACTOR_READY_TIMEOUT) => { + _ = tokio::time::sleep(ctx.config().guard().actor_ready_timeout()) => { return Err(errors::ActorReadyTimeout { actor_id }.build()); } } diff --git a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts index 2748fde94e..c77d7b3cea 100644 --- a/examples/kitchen-sink/scripts/sqlite-memory-soak.ts +++ b/examples/kitchen-sink/scripts/sqlite-memory-soak.ts @@ -54,6 +54,11 @@ interface Args { postChurnWaitMs: number; postCleanupWaitMs: number; requestLifespanSeconds: number; + engineRouteTimeoutSeconds: number; + engineActorReadyTimeoutSeconds: number; + engineActorLifecycleTimeoutMs: number; + engineEnvoyPingTimeoutMs: number; + engineEnvoyLostThresholdMs: number; serverlessMaxStartPayloadBytes: number; outputDir: string; metricsToken: string; @@ -146,6 +151,16 @@ Options: --post-cleanup-wait-ms Final sample window after cleanup. Default: 5000. --request-lifespan-seconds Serverless request lifespan. Default: scheduled work plus startup and slow-tail margin. + --engine-route-timeout-seconds + Local engine route-resolution timeout. Default: 300. + --engine-actor-ready-timeout-seconds + Local engine actor-ready timeout. Default: 300. + --engine-actor-lifecycle-timeout-ms + Local engine actor allocation/start/stop timeout. Default: 300000. + --engine-envoy-ping-timeout-ms + Local engine envoy ping timeout. Default: 300000. + --engine-envoy-lost-threshold-ms + Local engine envoy lost threshold. Default: 300000. --serverless-max-start-payload-bytes Local /api/rivet/start body limit. Default: 8388608. --output-dir Output directory. Default: ${DEFAULT_OUTPUT_DIR}. @@ -348,6 +363,36 @@ function parseArgs(argv: string[]): Args { 5000, ), requestLifespanSeconds: 0, + engineRouteTimeoutSeconds: readNumber( + argv, + "--engine-route-timeout-seconds", + "SQLITE_MEMORY_SOAK_ENGINE_ROUTE_TIMEOUT_SECONDS", + 300, + ), + engineActorReadyTimeoutSeconds: readNumber( + argv, + "--engine-actor-ready-timeout-seconds", + "SQLITE_MEMORY_SOAK_ENGINE_ACTOR_READY_TIMEOUT_SECONDS", + 300, + ), + engineActorLifecycleTimeoutMs: readNumber( + argv, + "--engine-actor-lifecycle-timeout-ms", + "SQLITE_MEMORY_SOAK_ENGINE_ACTOR_LIFECYCLE_TIMEOUT_MS", + 300_000, + ), + engineEnvoyPingTimeoutMs: readNumber( + argv, + "--engine-envoy-ping-timeout-ms", + "SQLITE_MEMORY_SOAK_ENGINE_ENVOY_PING_TIMEOUT_MS", + 300_000, + ), + engineEnvoyLostThresholdMs: readNumber( + argv, + "--engine-envoy-lost-threshold-ms", + "SQLITE_MEMORY_SOAK_ENGINE_ENVOY_LOST_THRESHOLD_MS", + 300_000, + ), serverlessMaxStartPayloadBytes: readNumber( argv, "--serverless-max-start-payload-bytes", @@ -401,6 +446,20 @@ function parseArgs(argv: string[]): Args { ["--scan-rows", args.scanRows], ["--sample-interval-ms", args.sampleIntervalMs], ["--request-lifespan-seconds", args.requestLifespanSeconds], + ["--engine-route-timeout-seconds", args.engineRouteTimeoutSeconds], + [ + "--engine-actor-ready-timeout-seconds", + args.engineActorReadyTimeoutSeconds, + ], + [ + "--engine-actor-lifecycle-timeout-ms", + args.engineActorLifecycleTimeoutMs, + ], + ["--engine-envoy-ping-timeout-ms", args.engineEnvoyPingTimeoutMs], + [ + "--engine-envoy-lost-threshold-ms", + args.engineEnvoyLostThresholdMs, + ], ["--sleep-log-timeout-ms", args.sleepLogTimeoutMs], [ "--serverless-max-start-payload-bytes", @@ -546,6 +605,18 @@ async function startEngine(args: Args, runDir: string): Promise { }, }, }, + guard: { + route_timeout_ms: args.engineRouteTimeoutSeconds * 1000, + actor_ready_timeout_ms: + args.engineActorReadyTimeoutSeconds * 1000, + }, + pegboard: { + actor_allocation_threshold: args.engineActorLifecycleTimeoutMs, + actor_start_threshold: args.engineActorLifecycleTimeoutMs, + actor_stop_threshold: args.engineActorLifecycleTimeoutMs, + envoy_ping_timeout: args.engineEnvoyPingTimeoutMs, + envoy_lost_threshold: args.engineEnvoyLostThresholdMs, + }, }, null, 2, @@ -561,6 +632,10 @@ async function startEngine(args: Args, runDir: string): Promise { RIVET__METRICS__PORT: (guardPort + 10).toString(), RIVET__FILE_SYSTEM__PATH: join(dbRoot, "db"), _RIVET_METRICS_TOKEN: args.metricsToken, + RIVET__PEGBOARD__ENVOY_PING_TIMEOUT: + args.engineEnvoyPingTimeoutMs.toString(), + RIVET__PEGBOARD__ENVOY_LOST_THRESHOLD: + args.engineEnvoyLostThresholdMs.toString(), MALLOC_ARENA_MAX: process.env.MALLOC_ARENA_MAX ?? "2", MALLOC_TRIM_THRESHOLD_: process.env.MALLOC_TRIM_THRESHOLD_ ?? "131072", }; @@ -944,6 +1019,10 @@ function writeEvent(jsonlPath: string, event: unknown) { appendFileSync(jsonlPath, `${JSON.stringify(event)}\n`); } +function stringifyError(err: unknown): string { + return err instanceof Error ? err.message : String(err); +} + function logOffset(logPath: string): number { try { return statSync(logPath).size; @@ -1257,7 +1336,25 @@ async function runChurnActorDriver( const actorId = await handle.resolve(); const logStart = logOffset(server.logPath); const sleepStartedAt = performance.now(); - const response = await forceActorSleepViaApi(args, actorId); + let response: unknown; + try { + response = await forceActorSleepViaApi(args, actorId); + } catch (err) { + const error = stringifyError(err); + writeEvent(jsonlPath, { + kind: "actor_api_sleep_failed", + actorIndex, + key, + actorId, + durationMs: performance.now() - sleepStartedAt, + error, + timestamp: new Date().toISOString(), + }); + console.warn( + `actor sleep api failed actor=${actorIndex} actor_id=${actorId} error=${error}`, + ); + return; + } writeEvent(jsonlPath, { kind: "actor_api_sleep", actorIndex, @@ -1267,21 +1364,38 @@ async function runChurnActorDriver( response, timestamp: new Date().toISOString(), }); - const verified = await waitForActorSleepLog( - server, - actorId, - logStart, - args.sleepLogTimeoutMs, - ); - writeEvent(jsonlPath, { - kind: "actor_sleep_verified", - actorIndex, - key, - actorId, - log: verified.matched, - timestamp: new Date().toISOString(), - }); - console.log(`actor sleep verified actor=${actorIndex} actor_id=${actorId}`); + + try { + const verified = await waitForActorSleepLog( + server, + actorId, + logStart, + args.sleepLogTimeoutMs, + ); + writeEvent(jsonlPath, { + kind: "actor_sleep_verified", + actorIndex, + key, + actorId, + log: verified.matched, + timestamp: new Date().toISOString(), + }); + console.log(`actor sleep verified actor=${actorIndex} actor_id=${actorId}`); + } catch (err) { + const error = stringifyError(err); + writeEvent(jsonlPath, { + kind: "actor_sleep_unverified", + actorIndex, + key, + actorId, + timeoutMs: args.sleepLogTimeoutMs, + error, + timestamp: new Date().toISOString(), + }); + console.warn( + `actor sleep unverified actor=${actorIndex} actor_id=${actorId} error=${error}`, + ); + } } async function resetActorOnSchedule( @@ -1654,15 +1768,22 @@ function summarizeCycleVfs(jsonlPath: string): string | undefined { function summarizeActorSleeps(jsonlPath: string): string | undefined { let requested = 0; + let apiFailed = 0; let verified = 0; + let unverified = 0; for (const line of readFileSync(jsonlPath, "utf8").split("\n")) { if (!line) continue; const event = JSON.parse(line) as { kind?: string }; if (event.kind === "actor_api_sleep") requested++; + if (event.kind === "actor_api_sleep_failed") { + requested++; + apiFailed++; + } if (event.kind === "actor_sleep_verified") verified++; + if (event.kind === "actor_sleep_unverified") unverified++; } - if (requested === 0 && verified === 0) return undefined; - return `actor sleeps: api_requested=${requested} log_verified=${verified}`; + if (requested === 0 && verified === 0 && unverified === 0) return undefined; + return `actor sleeps: api_requested=${requested} api_failed=${apiFailed} log_verified=${verified} log_unverified=${unverified}`; } async function main(): Promise {