diff --git a/CLAUDE.md b/CLAUDE.md index fbe394ca85..d2cfba7046 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -23,7 +23,7 @@ Design constraints, invariants, and reference commands for the Rivet monorepo. F **Always use versioned BARE (`vbare`) instead of raw `serde_bare` for any persisted or wire-format encoding unless explicitly told otherwise.** Raw `serde_bare::to_vec` / `from_slice` has no version header, so any future schema change forces hand-rolled `LegacyXxx` fallback structs. `vbare::OwnedVersionedData` plus a versioned `*.bare` schema is the standard pattern. Acceptable raw-bare exceptions: ephemeral in-memory encodings that never cross a process boundary or hit disk, and wire formats whose protocol version is coordinated out-of-band (e.g. an HTTP path like `/v{PROTOCOL_VERSION}/...` or another channel that pins both peers to one schema per call). - Avoid raw `f64` fields in vbare protocol schemas that use hashable maps; generated Rust derives `Eq`/`Hash`, so encode floats as fixed bytes or an ordered wrapper. -- Version converters must manually map fields between versions; never convert by serializing one version and deserializing it as another. +- Version converters must manually map fields between versions; never use serialize-deserialize round trips such as `transcode_version` or `serde_bare::to_vec` plus `from_slice`. When talking about "Rivet Actors" make sure to capitalize "Rivet Actor" as a proper noun and lowercase "actor" as a generic noun. diff --git a/Cargo.lock b/Cargo.lock index 51f898eb4f..78fb26e0ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1043,14 +1043,14 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-named-pipe", - "hyper-rustls", + "hyper-rustls 0.27.7", "hyper-util", "hyperlocal", "log", "pin-project-lite", - "rustls", + "rustls 0.23.29", "rustls-native-certs 0.8.3", - "rustls-pemfile", + "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", "serde_derive", @@ -2154,6 +2154,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + [[package]] name = "dyn-clone" version = "1.0.19" @@ -7767,17 +7773,6 @@ dependencies = [ "xattr", ] -[[package]] -name = "tokio-test" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545" -dependencies = [ - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "tokio-tungstenite" version = "0.26.2" @@ -9122,6 +9117,12 @@ dependencies = [ "rustix", ] +[[package]] +name = "xmlparser" +version = "0.13.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" + [[package]] name = "yaml-rust2" version = "0.8.1" diff --git a/engine/packages/engine/tests/common/freeze_proxy.rs b/engine/packages/engine/tests/common/freeze_proxy.rs new file mode 100644 index 0000000000..987947f9be --- /dev/null +++ b/engine/packages/engine/tests/common/freeze_proxy.rs @@ -0,0 +1,108 @@ +//! Minimal TCP forwarder with a `freeze` mode used by network-fault tests. +//! +//! Toxiproxy can stall traffic but always relays a peer's TCP close to the other side, which +//! defeats tests that need to observe one peer's behavior when it has no signal that the other +//! peer hung up. `FreezeProxy` is a single-purpose forwarder that supports a true black-hole +//! mode: while frozen, bytes are read from each peer and discarded, and an EOF from either peer +//! is held instead of being forwarded. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use anyhow::{Context, Result}; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; +use tokio::net::{TcpListener, TcpStream}; + +pub struct FreezeProxy { + listen_addr: SocketAddr, + frozen: Arc, +} + +impl FreezeProxy { + pub async fn start(upstream: SocketAddr) -> Result { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("failed to bind FreezeProxy listener")?; + let listen_addr = listener + .local_addr() + .context("failed to read FreezeProxy listen addr")?; + let frozen = Arc::new(AtomicBool::new(false)); + + tokio::spawn({ + let frozen = frozen.clone(); + async move { + loop { + let (client, _) = match listener.accept().await { + Ok(pair) => pair, + Err(err) => { + tracing::warn!(?err, "freeze proxy accept failed"); + return; + } + }; + let _ = client.set_nodelay(true); + let frozen = frozen.clone(); + tokio::spawn(async move { + let server = match TcpStream::connect(upstream).await { + Ok(stream) => stream, + Err(err) => { + tracing::warn!(?err, %upstream, "freeze proxy upstream connect failed"); + return; + } + }; + let _ = server.set_nodelay(true); + let (client_r, client_w) = client.into_split(); + let (server_r, server_w) = server.into_split(); + tokio::spawn(forward(client_r, server_w, frozen.clone())); + tokio::spawn(forward(server_r, client_w, frozen)); + }); + } + } + }); + + Ok(Self { + listen_addr, + frozen, + }) + } + + pub fn endpoint(&self) -> String { + format!("http://{}", self.listen_addr) + } + + /// Stops shuttling bytes between the two peers and starts swallowing EOFs so neither peer + /// learns that the other has hung up. Bytes already in flight before this call may still + /// reach the other side. + pub fn freeze(&self) { + self.frozen.store(true, Ordering::SeqCst); + } +} + +async fn forward(mut src: OwnedReadHalf, mut dst: OwnedWriteHalf, frozen: Arc) { + let mut buf = vec![0u8; 8192]; + loop { + match src.read(&mut buf).await { + Ok(0) => { + if frozen.load(Ordering::SeqCst) { + // Hold the destination open: the peer's own send/recv buffer keeps it + // believing the connection is alive, with no FIN ever delivered. + std::future::pending::<()>().await; + } + return; + } + Ok(n) => { + if frozen.load(Ordering::SeqCst) { + // Drain-and-discard so the sender's TCP window stays open and it does not + // notice the link is dead via back-pressure. + continue; + } + if dst.write_all(&buf[..n]).await.is_err() { + return; + } + } + Err(_) => return, + } + } +} diff --git a/engine/packages/engine/tests/common/mod.rs b/engine/packages/engine/tests/common/mod.rs index cfa8085eb6..e5d11ed25f 100644 --- a/engine/packages/engine/tests/common/mod.rs +++ b/engine/packages/engine/tests/common/mod.rs @@ -3,6 +3,7 @@ pub mod actors; pub mod api; pub mod ctx; +pub mod freeze_proxy; pub mod test_envoy; pub mod test_helpers; pub mod test_runner; diff --git a/engine/packages/engine/tests/common/test_envoy.rs b/engine/packages/engine/tests/common/test_envoy.rs index d1a9ad37a8..9c09ea7fc9 100644 --- a/engine/packages/engine/tests/common/test_envoy.rs +++ b/engine/packages/engine/tests/common/test_envoy.rs @@ -254,6 +254,14 @@ impl Envoy { } } + pub async fn is_ping_healthy(&self) -> Option { + self.handle + .lock() + .await + .as_ref() + .map(|handle| handle.is_ping_healthy()) + } + pub async fn shutdown(&self) { if let Some(handle) = self.handle.lock().await.take() { handle.shutdown_and_wait(false).await; @@ -299,7 +307,10 @@ impl TestEnvoyCallbacks { impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks { fn on_connect(&self, _handle: EnvoyHandle) { - let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected); + let _ = self + .inner + .connection_tx + .send(EnvoyConnectionEvent::Connected); } fn on_disconnect(&self, _handle: EnvoyHandle) { diff --git a/engine/packages/engine/tests/envoy/mod.rs b/engine/packages/engine/tests/envoy/mod.rs index 2ef4ab9bff..90032de246 100644 --- a/engine/packages/engine/tests/envoy/mod.rs +++ b/engine/packages/engine/tests/envoy/mod.rs @@ -11,5 +11,4 @@ pub mod api_actors_get_or_create; pub mod api_actors_list; pub mod api_actors_list_names; pub mod auth; -pub mod auth; pub mod network_faults; diff --git a/engine/packages/engine/tests/envoy/network_faults.rs b/engine/packages/engine/tests/envoy/network_faults.rs index 6392780701..b9888f8eed 100644 --- a/engine/packages/engine/tests/envoy/network_faults.rs +++ b/engine/packages/engine/tests/envoy/network_faults.rs @@ -5,10 +5,17 @@ use std::sync::{ use super::super::common; +const ENVOY_PING_HEALTHY_THRESHOLD: std::time::Duration = std::time::Duration::from_millis( + common::test_envoy::EnvoyHandle::PING_HEALTHY_THRESHOLD_MS as u64, +); +const ENVOY_PING_INTERVAL_MARGIN: std::time::Duration = std::time::Duration::from_secs(5); + #[test] fn envoy_reconnects_after_server_side_tcp_reset() { common::run( - common::TestOpts::new(1).with_timeout(90).with_network_faults(), + common::TestOpts::new(1) + .with_timeout(90) + .with_network_faults(), |ctx| async move { let dc = ctx.leader_dc(); let (namespace, _) = common::setup_test_namespace(dc).await; @@ -55,7 +62,7 @@ fn envoy_reconnects_after_server_side_tcp_reset() { wait_for_envoy_actor(&envoy, &actor_id).await; wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await; - let response = common::ping_actor_via_guard(dc, &actor_id).await; + let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await; assert_eq!(response["status"], "ok"); let mut disconnect = envoy.wait_for_next_connection_event( @@ -71,8 +78,9 @@ fn envoy_reconnects_after_server_side_tcp_reset() { // The ping task writes every few seconds in the test config. disconnect.wait().await; - let reconnect = envoy - .wait_for_next_connection_event(common::test_envoy::EnvoyConnectionEvent::Connected); + let reconnect = envoy.wait_for_next_connection_event( + common::test_envoy::EnvoyConnectionEvent::Connected, + ); envoy_proxy .clear_toxics() .await @@ -90,6 +98,108 @@ fn envoy_reconnects_after_server_side_tcp_reset() { ); } +#[test] +fn engine_closes_envoy_ws_after_ping_timeout_while_envoy_remains_unaware() { + common::run( + common::TestOpts::new(1).with_timeout(120), + |ctx| async move { + let dc = ctx.leader_dc(); + let (namespace, _) = common::setup_test_namespace(dc).await; + + // Stand up our own forwarder so we can simulate a true network partition. + // Toxiproxy can stall traffic but always relays a peer's TCP close to the other + // side, which would let envoy-client notice the engine has hung up. + let freeze_proxy = common::freeze_proxy::FreezeProxy::start( + std::net::SocketAddr::from(([127, 0, 0, 1], dc.guard_port())), + ) + .await + .expect("failed to start freeze proxy"); + + let envoy = common::setup_envoy(dc, &namespace, |builder| { + builder + .with_endpoint(freeze_proxy.endpoint()) + .with_actor_behavior("network-fault-actor", |_| { + Box::new( + common::test_envoy::CustomActorBuilder::new() + .on_start(|_| { + Box::pin(async { + Ok(common::test_envoy::ActorStartResult::Running) + }) + }) + .build(), + ) + }) + }) + .await; + + let res = common::create_actor( + dc.guard_port(), + &namespace, + "network-fault-actor", + envoy.pool_name(), + rivet_types::actors::CrashPolicy::Sleep, + ) + .await; + let actor_id = res.actor.actor_id.to_string(); + wait_for_envoy_actor(&envoy, &actor_id).await; + wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await; + + let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await; + assert_eq!(response["status"], "ok"); + + // Wait past the local health threshold before injecting the fault. A healthy + // connection must stay healthy because engine pings refresh envoy-client's + // receive timestamp. + tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await; + let healthy = envoy + .is_ping_healthy() + .await + .expect("envoy handle should exist"); + assert!( + healthy, + "envoy-client should remain healthy while engine pings are arriving" + ); + + // Subscribe before injecting the fault so we can assert no event slips through. + let mut disconnect = envoy.wait_for_next_connection_event( + common::test_envoy::EnvoyConnectionEvent::Disconnected, + ); + disconnect.assert_no_event(); + + // Black-hole the link in both directions. Bytes are read from both peers and + // discarded, and EOFs are swallowed so neither peer's TCP stack ever sees a FIN. + // The engine still keeps sending pings every few seconds (default 3s) but no pongs + // come back, so its application-level ping timeout (default 15s) will eventually + // fire and close the WebSocket. The envoy-client has no application-level + // liveness check of its own, so as long as its TCP socket stays open it continues + // to believe the connection is healthy. + freeze_proxy.freeze(); + + // Wait well past the engine's 15s ping timeout and the envoy-client's local ping + // health threshold. + tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await; + + // The envoy-client is still oblivious. The engine's close frame and TCP FIN never + // reach it because the freeze proxy is holding the link open from envoy-client's + // perspective. + disconnect.assert_no_event(); + + // Even though the envoy-client thinks the WebSocket is alive, its own ping-tracker + // must report unhealthy because no engine ping arrived in the last 20s. This is + // the signal the rivetkit `/health` endpoint uses to ask its host to recycle the + // container. + let healthy = envoy + .is_ping_healthy() + .await + .expect("envoy handle should exist"); + assert!( + !healthy, + "envoy-client should report unhealthy after 20s without an engine ping" + ); + }, + ); +} + async fn wait_for_envoy_actor(envoy: &common::test_envoy::TestEnvoy, actor_id: &str) { common::wait_with_poll( std::time::Duration::from_secs(5), @@ -138,7 +248,9 @@ async fn ping_actor_via_gateway(guard_port: u16, actor_id: &str) -> serde_json:: .expect("failed to build reqwest client"); let response = client - .get(format!("http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping")) + .get(format!( + "http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping" + )) .send() .await .expect("failed to ping actor through gateway"); diff --git a/engine/packages/test-deps-docker/src/toxiproxy.rs b/engine/packages/test-deps-docker/src/toxiproxy.rs index f907dc2168..4392ea0fae 100644 --- a/engine/packages/test-deps-docker/src/toxiproxy.rs +++ b/engine/packages/test-deps-docker/src/toxiproxy.rs @@ -215,6 +215,19 @@ impl ToxiproxyProxy { .await } + pub async fn timeout_upstream(&self, timeout_ms: u64, toxicity: f32) -> Result<()> { + self.add_toxic( + "timeout-upstream", + "timeout", + ToxiproxyDirection::Upstream, + toxicity, + TimeoutAttributes { + timeout: timeout_ms, + }, + ) + .await + } + pub async fn bandwidth_downstream(&self, kbps: u64) -> Result<()> { self.add_toxic( "bandwidth-downstream", diff --git a/engine/sdks/rust/envoy-client/src/actor.rs b/engine/sdks/rust/envoy-client/src/actor.rs index 1a6cac63c5..e15f789c68 100644 --- a/engine/sdks/rust/envoy-client/src/actor.rs +++ b/engine/sdks/rust/envoy-client/src/actor.rs @@ -1661,6 +1661,7 @@ mod tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: std::sync::atomic::AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(crate::time::now_millis()), stopped_tx: tokio::sync::watch::channel(true).0, }); (shared, envoy_rx) diff --git a/engine/sdks/rust/envoy-client/src/connection/mod.rs b/engine/sdks/rust/envoy-client/src/connection/mod.rs index 73f9138f65..c2c066fb0b 100644 --- a/engine/sdks/rust/envoy-client/src/connection/mod.rs +++ b/engine/sdks/rust/envoy-client/src/connection/mod.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::Ordering; + use rivet_envoy_protocol as protocol; #[cfg(any( feature = "native-transport", @@ -83,6 +85,9 @@ async fn forward_to_envoy(shared: &SharedContext, message: protocol::ToEnvoy) { match message { protocol::ToEnvoy::ToEnvoyPing(ping) => { + shared + .last_ping_ts + .store(crate::time::now_millis(), Ordering::Release); ws_send( shared, protocol::ToRivet::ToRivetPong(protocol::ToRivetPong { ts: ping.ts }), diff --git a/engine/sdks/rust/envoy-client/src/context.rs b/engine/sdks/rust/envoy-client/src/context.rs index 06487bd708..d286e64a45 100644 --- a/engine/sdks/rust/envoy-client/src/context.rs +++ b/engine/sdks/rust/envoy-client/src/context.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex as StdMutex; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, AtomicI64}; use crate::async_counter::AsyncCounter; use rivet_envoy_protocol as protocol; @@ -32,6 +32,11 @@ pub struct SharedContext { pub ws_tx: Arc>>>, pub protocol_metadata: Arc>>, pub shutting_down: AtomicBool, + /// Epoch ms timestamp of the most recent ping packet received from the engine. Used by + /// `EnvoyHandle::is_ping_healthy` to surface a dead engine link to upstream health checks. + /// Initialized to the construction time so a freshly created envoy reports healthy until + /// its first ping arrives or the threshold elapses without one. + pub last_ping_ts: AtomicI64, // Latched signal fired by `envoy_loop` after its cleanup block completes. // Waiters observing `true` are guaranteed that the loop has exited and // every pending KV/SQLite request has been resolved (with `EnvoyShutdownError` diff --git a/engine/sdks/rust/envoy-client/src/envoy.rs b/engine/sdks/rust/envoy-client/src/envoy.rs index 0218471b8b..0e3cf80494 100644 --- a/engine/sdks/rust/envoy-client/src/envoy.rs +++ b/engine/sdks/rust/envoy-client/src/envoy.rs @@ -307,6 +307,7 @@ fn start_envoy_sync_inner(config: EnvoyConfig) -> EnvoyHandle { ws_tx: Arc::new(tokio::sync::Mutex::new(None)), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: std::sync::atomic::AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(crate::time::now_millis()), stopped_tx, }); diff --git a/engine/sdks/rust/envoy-client/src/events.rs b/engine/sdks/rust/envoy-client/src/events.rs index bc7c034c94..b3fcbdf7a0 100644 --- a/engine/sdks/rust/envoy-client/src/events.rs +++ b/engine/sdks/rust/envoy-client/src/events.rs @@ -172,6 +172,7 @@ mod tests { )), protocol_metadata: Arc::new(tokio::sync::Mutex::new(None)), shutting_down: std::sync::atomic::AtomicBool::new(false), + last_ping_ts: std::sync::atomic::AtomicI64::new(crate::time::now_millis()), stopped_tx: tokio::sync::watch::channel(true).0, }); let handle = EnvoyHandle { diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index d724b449d0..1d1b1ee9cc 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -75,6 +75,17 @@ impl EnvoyHandle { self.shared.protocol_metadata.lock().await.clone() } + /// Threshold for `is_ping_healthy`. + pub const PING_HEALTHY_THRESHOLD_MS: i64 = 20_000; + + /// True when the engine sent a ping within `PING_HEALTHY_THRESHOLD_MS`. Returns false once + /// the engine link has been silently dead long enough that an upstream health check should + /// treat this envoy as unhealthy and recycle it. + pub fn is_ping_healthy(&self) -> bool { + let last = self.shared.last_ping_ts.load(Ordering::Acquire); + crate::time::now_millis() - last < Self::PING_HEALTHY_THRESHOLD_MS + } + pub fn get_envoy_key(&self) -> &str { &self.shared.envoy_key } diff --git a/engine/sdks/rust/envoy-client/src/lib.rs b/engine/sdks/rust/envoy-client/src/lib.rs index 3126f06573..5a225112d6 100644 --- a/engine/sdks/rust/envoy-client/src/lib.rs +++ b/engine/sdks/rust/envoy-client/src/lib.rs @@ -16,6 +16,21 @@ pub(crate) mod time { pub use std::time::Instant; #[cfg(target_arch = "wasm32")] pub use web_time::Instant; + + pub fn now_millis() -> i64 { + #[cfg(not(target_arch = "wasm32"))] + { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system clock should be after UNIX epoch") + .as_millis() as i64 + } + + #[cfg(target_arch = "wasm32")] + { + js_sys::Date::now() as i64 + } + } } pub mod tunnel; pub mod utils; diff --git a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs index b947b6c44e..284fc6c166 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/serverless.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/serverless.rs @@ -247,14 +247,37 @@ impl CoreServerlessRuntime { "text/plain; charset=utf-8", "This is a RivetKit server.\n\nLearn more at https://rivet.dev", )), - ("GET", "/health") => Ok(json_response( - StatusCode::OK, - json!({ - "status": "ok", - "runtime": "rivetkit", - "version": self.settings.package_version, - }), - )), + ("GET", "/health") => { + // Report unhealthy when an envoy is currently running but its link to the + // engine has gone silent. A 503 is the conventional "recycle me" signal for + // container hosts (Cloud Run, k8s, etc.) running behind an HTTP health probe. + let envoy_unhealthy = { + let guard = self.envoy.lock().await; + guard + .as_ref() + .map(|handle| !handle.is_ping_healthy()) + .unwrap_or(false) + }; + if envoy_unhealthy { + Ok(json_response( + StatusCode::SERVICE_UNAVAILABLE, + json!({ + "status": "engine_ping_stale", + "runtime": "rivetkit", + "version": self.settings.package_version, + }), + )) + } else { + Ok(json_response( + StatusCode::OK, + json!({ + "status": "ok", + "runtime": "rivetkit", + "version": self.settings.package_version, + }), + )) + } + } ("GET", "/metadata") => Ok(self.metadata_response()), ("GET", "/start") | ("POST", "/start") => self.start_response(req).await, ("OPTIONS", _) => Ok(bytes_response(