Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Design constraints, invariants, and reference commands for the Rivet monorepo. F
**Always use versioned BARE (`vbare`) instead of raw `serde_bare` for any persisted or wire-format encoding unless explicitly told otherwise.** Raw `serde_bare::to_vec` / `from_slice` has no version header, so any future schema change forces hand-rolled `LegacyXxx` fallback structs. `vbare::OwnedVersionedData` plus a versioned `*.bare` schema is the standard pattern. Acceptable raw-bare exceptions: ephemeral in-memory encodings that never cross a process boundary or hit disk, and wire formats whose protocol version is coordinated out-of-band (e.g. an HTTP path like `/v{PROTOCOL_VERSION}/...` or another channel that pins both peers to one schema per call).

- Avoid raw `f64` fields in vbare protocol schemas that use hashable maps; generated Rust derives `Eq`/`Hash`, so encode floats as fixed bytes or an ordered wrapper.
- Version converters must manually map fields between versions; never convert by serializing one version and deserializing it as another.
- Version converters must manually map fields between versions; never use serialize-deserialize round trips such as `transcode_version` or `serde_bare::to_vec` plus `from_slice`.

When talking about "Rivet Actors" make sure to capitalize "Rivet Actor" as a proper noun and lowercase "actor" as a generic noun.

Expand Down
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

108 changes: 108 additions & 0 deletions engine/packages/engine/tests/common/freeze_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! Minimal TCP forwarder with a `freeze` mode used by network-fault tests.
//!
//! Toxiproxy can stall traffic but always relays a peer's TCP close to the other side, which
//! defeats tests that need to observe one peer's behavior when it has no signal that the other
//! peer hung up. `FreezeProxy` is a single-purpose forwarder that supports a true black-hole
//! mode: while frozen, bytes are read from each peer and discarded, and an EOF from either peer
//! is held instead of being forwarded.

use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::{Context, Result};
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};

pub struct FreezeProxy {
listen_addr: SocketAddr,
frozen: Arc<AtomicBool>,
}

impl FreezeProxy {
pub async fn start(upstream: SocketAddr) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("failed to bind FreezeProxy listener")?;
let listen_addr = listener
.local_addr()
.context("failed to read FreezeProxy listen addr")?;
let frozen = Arc::new(AtomicBool::new(false));

tokio::spawn({
let frozen = frozen.clone();
async move {
loop {
let (client, _) = match listener.accept().await {
Ok(pair) => pair,
Err(err) => {
tracing::warn!(?err, "freeze proxy accept failed");
return;
}
};
let _ = client.set_nodelay(true);
let frozen = frozen.clone();
tokio::spawn(async move {
let server = match TcpStream::connect(upstream).await {
Ok(stream) => stream,
Err(err) => {
tracing::warn!(?err, %upstream, "freeze proxy upstream connect failed");
return;
}
};
let _ = server.set_nodelay(true);
let (client_r, client_w) = client.into_split();
let (server_r, server_w) = server.into_split();
tokio::spawn(forward(client_r, server_w, frozen.clone()));
tokio::spawn(forward(server_r, client_w, frozen));
});
}
}
});

Ok(Self {
listen_addr,
frozen,
})
}

pub fn endpoint(&self) -> String {
format!("http://{}", self.listen_addr)
}

/// Stops shuttling bytes between the two peers and starts swallowing EOFs so neither peer
/// learns that the other has hung up. Bytes already in flight before this call may still
/// reach the other side.
pub fn freeze(&self) {
self.frozen.store(true, Ordering::SeqCst);
}
}

async fn forward(mut src: OwnedReadHalf, mut dst: OwnedWriteHalf, frozen: Arc<AtomicBool>) {
let mut buf = vec![0u8; 8192];
loop {
match src.read(&mut buf).await {
Ok(0) => {
if frozen.load(Ordering::SeqCst) {
// Hold the destination open: the peer's own send/recv buffer keeps it
// believing the connection is alive, with no FIN ever delivered.
std::future::pending::<()>().await;
}
return;
}
Ok(n) => {
if frozen.load(Ordering::SeqCst) {
// Drain-and-discard so the sender's TCP window stays open and it does not
// notice the link is dead via back-pressure.
continue;
}
if dst.write_all(&buf[..n]).await.is_err() {
return;
}
}
Err(_) => return,
}
}
}
1 change: 1 addition & 0 deletions engine/packages/engine/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
pub mod actors;
pub mod api;
pub mod ctx;
pub mod freeze_proxy;
pub mod test_envoy;
pub mod test_helpers;
pub mod test_runner;
Expand Down
13 changes: 12 additions & 1 deletion engine/packages/engine/tests/common/test_envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,14 @@ impl Envoy {
}
}

pub async fn is_ping_healthy(&self) -> Option<bool> {
self.handle
.lock()
.await
.as_ref()
.map(|handle| handle.is_ping_healthy())
}

pub async fn shutdown(&self) {
if let Some(handle) = self.handle.lock().await.take() {
handle.shutdown_and_wait(false).await;
Expand Down Expand Up @@ -299,7 +307,10 @@ impl TestEnvoyCallbacks {

impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks {
fn on_connect(&self, _handle: EnvoyHandle) {
let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected);
let _ = self
.inner
.connection_tx
.send(EnvoyConnectionEvent::Connected);
}

fn on_disconnect(&self, _handle: EnvoyHandle) {
Expand Down
1 change: 0 additions & 1 deletion engine/packages/engine/tests/envoy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ pub mod api_actors_get_or_create;
pub mod api_actors_list;
pub mod api_actors_list_names;
pub mod auth;
pub mod auth;
pub mod network_faults;
122 changes: 117 additions & 5 deletions engine/packages/engine/tests/envoy/network_faults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,17 @@ use std::sync::{

use super::super::common;

const ENVOY_PING_HEALTHY_THRESHOLD: std::time::Duration = std::time::Duration::from_millis(
common::test_envoy::EnvoyHandle::PING_HEALTHY_THRESHOLD_MS as u64,
);
const ENVOY_PING_INTERVAL_MARGIN: std::time::Duration = std::time::Duration::from_secs(5);

#[test]
fn envoy_reconnects_after_server_side_tcp_reset() {
common::run(
common::TestOpts::new(1).with_timeout(90).with_network_faults(),
common::TestOpts::new(1)
.with_timeout(90)
.with_network_faults(),
|ctx| async move {
let dc = ctx.leader_dc();
let (namespace, _) = common::setup_test_namespace(dc).await;
Expand Down Expand Up @@ -55,7 +62,7 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
wait_for_envoy_actor(&envoy, &actor_id).await;
wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await;

let response = common::ping_actor_via_guard(dc, &actor_id).await;
let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await;
assert_eq!(response["status"], "ok");

let mut disconnect = envoy.wait_for_next_connection_event(
Expand All @@ -71,8 +78,9 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
// The ping task writes every few seconds in the test config.
disconnect.wait().await;

let reconnect = envoy
.wait_for_next_connection_event(common::test_envoy::EnvoyConnectionEvent::Connected);
let reconnect = envoy.wait_for_next_connection_event(
common::test_envoy::EnvoyConnectionEvent::Connected,
);
envoy_proxy
.clear_toxics()
.await
Expand All @@ -90,6 +98,108 @@ fn envoy_reconnects_after_server_side_tcp_reset() {
);
}

#[test]
fn engine_closes_envoy_ws_after_ping_timeout_while_envoy_remains_unaware() {
common::run(
common::TestOpts::new(1).with_timeout(120),
|ctx| async move {
let dc = ctx.leader_dc();
let (namespace, _) = common::setup_test_namespace(dc).await;

// Stand up our own forwarder so we can simulate a true network partition.
// Toxiproxy can stall traffic but always relays a peer's TCP close to the other
// side, which would let envoy-client notice the engine has hung up.
let freeze_proxy = common::freeze_proxy::FreezeProxy::start(
std::net::SocketAddr::from(([127, 0, 0, 1], dc.guard_port())),
)
.await
.expect("failed to start freeze proxy");

let envoy = common::setup_envoy(dc, &namespace, |builder| {
builder
.with_endpoint(freeze_proxy.endpoint())
.with_actor_behavior("network-fault-actor", |_| {
Box::new(
common::test_envoy::CustomActorBuilder::new()
.on_start(|_| {
Box::pin(async {
Ok(common::test_envoy::ActorStartResult::Running)
})
})
.build(),
)
})
})
.await;

let res = common::create_actor(
dc.guard_port(),
&namespace,
"network-fault-actor",
envoy.pool_name(),
rivet_types::actors::CrashPolicy::Sleep,
)
.await;
let actor_id = res.actor.actor_id.to_string();
wait_for_envoy_actor(&envoy, &actor_id).await;
wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await;

let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await;
assert_eq!(response["status"], "ok");

// Wait past the local health threshold before injecting the fault. A healthy
// connection must stay healthy because engine pings refresh envoy-client's
// receive timestamp.
tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await;
let healthy = envoy
.is_ping_healthy()
.await
.expect("envoy handle should exist");
assert!(
healthy,
"envoy-client should remain healthy while engine pings are arriving"
);

// Subscribe before injecting the fault so we can assert no event slips through.
let mut disconnect = envoy.wait_for_next_connection_event(
common::test_envoy::EnvoyConnectionEvent::Disconnected,
);
disconnect.assert_no_event();

// Black-hole the link in both directions. Bytes are read from both peers and
// discarded, and EOFs are swallowed so neither peer's TCP stack ever sees a FIN.
// The engine still keeps sending pings every few seconds (default 3s) but no pongs
// come back, so its application-level ping timeout (default 15s) will eventually
// fire and close the WebSocket. The envoy-client has no application-level
// liveness check of its own, so as long as its TCP socket stays open it continues
// to believe the connection is healthy.
freeze_proxy.freeze();

// Wait well past the engine's 15s ping timeout and the envoy-client's local ping
// health threshold.
tokio::time::sleep(ENVOY_PING_HEALTHY_THRESHOLD + ENVOY_PING_INTERVAL_MARGIN).await;

// The envoy-client is still oblivious. The engine's close frame and TCP FIN never
// reach it because the freeze proxy is holding the link open from envoy-client's
// perspective.
disconnect.assert_no_event();

// Even though the envoy-client thinks the WebSocket is alive, its own ping-tracker
// must report unhealthy because no engine ping arrived in the last 20s. This is
// the signal the rivetkit `/health` endpoint uses to ask its host to recycle the
// container.
let healthy = envoy
.is_ping_healthy()
.await
.expect("envoy handle should exist");
assert!(
!healthy,
"envoy-client should report unhealthy after 20s without an engine ping"
);
},
);
}

async fn wait_for_envoy_actor(envoy: &common::test_envoy::TestEnvoy, actor_id: &str) {
common::wait_with_poll(
std::time::Duration::from_secs(5),
Expand Down Expand Up @@ -138,7 +248,9 @@ async fn ping_actor_via_gateway(guard_port: u16, actor_id: &str) -> serde_json::
.expect("failed to build reqwest client");

let response = client
.get(format!("http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"))
.get(format!(
"http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping"
))
.send()
.await
.expect("failed to ping actor through gateway");
Expand Down
Loading
Loading