diff --git a/Cargo.lock b/Cargo.lock index 794b520a5a..51f898eb4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,7 +977,7 @@ version = "0.72.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f72209734318d0b619a5e0f5129918b848c416e122a3c4ce054e03cb87b726f" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cexpr", "clang-sys", "itertools 0.13.0", @@ -989,6 +989,12 @@ dependencies = [ "syn 2.0.104", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.10.0" @@ -1020,6 +1026,56 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119771309b95163ec7aaf79810da82f7cd0599c19722d48b9c03894dca833966" +[[package]] +name = "bollard" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97ccca1260af6a459d75994ad5acc1651bcabcbdbc41467cc9786519ab854c30" +dependencies = [ + "base64 0.22.1", + "bollard-stubs", + "bytes", + "futures-core", + "futures-util", + "hex", + "home", + "http 1.3.1", + "http-body-util", + "hyper 1.6.0", + "hyper-named-pipe", + "hyper-rustls", + "hyper-util", + "hyperlocal", + "log", + "pin-project-lite", + "rustls", + "rustls-native-certs 0.8.3", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_derive", + "serde_json", + "serde_repr", + "serde_urlencoded", + "thiserror 2.0.12", + "tokio", + "tokio-util", + "tower-service", + "url", + "winapi", +] + +[[package]] +name = "bollard-stubs" +version = "1.47.1-rc.27.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f179cfbddb6e77a5472703d4b30436bff32929c0aa8a9008ecf23d1d3cdd0da" +dependencies = [ + "serde", + "serde_repr", + "serde_with", +] + [[package]] name = "bstr" version = "1.12.0" @@ -2088,10 +2144,15 @@ dependencies = [ ] [[package]] -name = "dunce" -version = "1.0.5" +name = "docker_credential" +version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +checksum = "1d89dfcba45b4afad7450a99b39e751590463e45c04728cf555d36bb66940de8" +dependencies = [ + "base64 0.21.7", + "serde", + "serde_json", +] [[package]] name = "dyn-clone" @@ -2257,6 +2318,17 @@ version = "3.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59" +[[package]] +name = "etcetera" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c7b13d0780cb82722fd59f6f57f925e143427e4a75313a6c77243bf5326ae6" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.59.0", +] + [[package]] name = "event-listener" version = "5.4.0" @@ -2340,6 +2412,17 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" +[[package]] +name = "filetime" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f98844151eee8917efc50bd9e8318cb963ae8b297431495d3f758616ea5c57db" +dependencies = [ + "cfg-if", + "libc", + "libredox", +] + [[package]] name = "findshlibs" version = "0.10.2" @@ -2959,6 +3042,21 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-named-pipe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" +dependencies = [ + "hex", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", + "winapi", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -3047,6 +3145,21 @@ dependencies = [ "windows-registry", ] +[[package]] +name = "hyperlocal" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" +dependencies = [ + "hex", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "iana-time-zone" version = "0.1.63" @@ -3246,7 +3359,7 @@ version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "libc", ] @@ -3382,8 +3495,9 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4488594b9328dee448adb906d8b126d9b7deb7cf5c22161ee591610bb1be83c0" dependencies = [ - "bitflags", + "bitflags 2.10.0", "libc", + "redox_syscall 0.5.15", ] [[package]] @@ -3683,7 +3797,7 @@ version = "2.16.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55740c4ae1d8696773c78fdafd5d0e5fe9bc9f1b071c7ba493ba5c413a9184f3" dependencies = [ - "bitflags", + "bitflags 2.10.0", "ctor", "napi-derive", "napi-sys", @@ -3758,7 +3872,7 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "cfg_aliases", "libc", @@ -3770,7 +3884,7 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "cfg_aliases", "libc", @@ -3931,7 +4045,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4160,11 +4274,36 @@ checksum = "bc838d2a56b5b1a6c25f55575dfc605fabb63bb2365f6c2353ef9159aa69e4a5" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.15", "smallvec", "windows-targets 0.52.6", ] +[[package]] +name = "parse-display" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "914a1c2265c98e2446911282c6ac86d8524f495792c38c5bd884f80499c7538a" +dependencies = [ + "parse-display-derive", + "regex", + "regex-syntax", +] + +[[package]] +name = "parse-display-derive" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ae7800a4c974efd12df917266338e79a7a74415173caf7e70aa0a0707345281" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "regex-syntax", + "structmeta", + "syn 2.0.104", +] + [[package]] name = "paste" version = "1.0.15" @@ -4935,7 +5074,7 @@ version = "11.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -4944,13 +5083,22 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a357793950651c4ed0f3f52338f53b2f809f32d83a07f72909fa13e4c6c1e3" +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -5850,7 +5998,11 @@ version = "2.3.0-rc.5" dependencies = [ "anyhow", "portpicker", + "reqwest", "rivet-config", + "serde", + "serde_json", + "testcontainers", "tokio", "tokio-postgres", "tracing", @@ -6146,7 +6298,7 @@ version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ - "bitflags", + "bitflags 2.10.0", "fallible-iterator 0.3.0", "fallible-streaming-iterator", "hashlink 0.9.1", @@ -6196,7 +6348,7 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ - "bitflags", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys", @@ -6340,7 +6492,7 @@ version = "15.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ee1e066dc922e513bda599c6ccb5f3bb2b0ea5870a579448f2622993f0a9a2f" dependencies = [ - "bitflags", + "bitflags 2.10.0", "cfg-if", "clipboard-win", "fd-lock", @@ -6494,7 +6646,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "core-foundation-sys", "libc", @@ -6507,7 +6659,7 @@ version = "3.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d17b898a6d6948c3a8ee4372c17cb384f90d2e6e912ef00895b14fd7ab54ec38" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.10.1", "core-foundation-sys", "libc", @@ -6639,7 +6791,7 @@ version = "0.45.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428f780866a613142dcc81b7f8551ae4d1c056f4df22b6d7ddd9154a9974eb03" dependencies = [ - "bitflags", + "bitflags 2.10.0", "sentry-backtrace", "sentry-core", "tracing-core", @@ -7082,6 +7234,29 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive", + "syn 2.0.104", +] + +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "strum" version = "0.26.3" @@ -7172,7 +7347,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags", + "bitflags 2.10.0", "core-foundation 0.9.4", "system-configuration-sys", ] @@ -7301,6 +7476,35 @@ dependencies = [ "uuid", ] +[[package]] +name = "testcontainers" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23bb7577dca13ad86a78e8271ef5d322f37229ec83b8d98da6d996c588a1ddb1" +dependencies = [ + "async-trait", + "bollard", + "bollard-stubs", + "bytes", + "docker_credential", + "either", + "etcetera", + "futures", + "log", + "memchr", + "parse-display", + "pin-project-lite", + "serde", + "serde_json", + "serde_with", + "thiserror 2.0.12", + "tokio", + "tokio-stream", + "tokio-tar", + "tokio-util", + "url", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -7548,6 +7752,32 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tar" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5714c010ca3e5c27114c1cdeb9d14641ace49874aa5626d7149e47aedace75" +dependencies = [ + "filetime", + "futures-core", + "libc", + "redox_syscall 0.3.5", + "tokio", + "tokio-stream", + "xattr", +] + +[[package]] +name = "tokio-test" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545" +dependencies = [ + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-tungstenite" version = "0.26.2" @@ -7671,7 +7901,7 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adc82fd73de2a9722ac5da747f12383d2bfdb93591ee6c58486e0097890f05f2" dependencies = [ - "bitflags", + "bitflags 2.10.0", "bytes", "futures-util", "http 1.3.1", @@ -8396,7 +8626,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" dependencies = [ - "redox_syscall", + "redox_syscall 0.5.15", "wasite", "web-sys", ] @@ -8861,7 +9091,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags", + "bitflags 2.10.0", ] [[package]] @@ -8883,10 +9113,14 @@ dependencies = [ ] [[package]] -name = "xmlparser" -version = "0.13.6" +name = "xattr" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] [[package]] name = "yaml-rust2" diff --git a/engine/packages/engine/tests/common/ctx.rs b/engine/packages/engine/tests/common/ctx.rs index d9de9b2cdd..0c0cb02625 100644 --- a/engine/packages/engine/tests/common/ctx.rs +++ b/engine/packages/engine/tests/common/ctx.rs @@ -8,6 +8,7 @@ pub struct TestOpts { pub timeout_secs: u64, pub pegboard_outbound: bool, pub auth_admin_token: Option, + pub network_faults: bool, } impl TestOpts { @@ -17,6 +18,7 @@ impl TestOpts { timeout_secs: 10, pegboard_outbound: false, auth_admin_token: None, + network_faults: false, } } @@ -34,6 +36,11 @@ impl TestOpts { self.auth_admin_token = Some(token.into()); self } + + pub fn with_network_faults(mut self) -> Self { + self.network_faults = true; + self + } } impl Default for TestOpts { @@ -43,6 +50,7 @@ impl Default for TestOpts { timeout_secs: 10, pegboard_outbound: false, auth_admin_token: None, + network_faults: false, } } } @@ -50,6 +58,7 @@ impl Default for TestOpts { pub struct TestCtx { dcs: Vec, pub opts: TestOpts, + network_faults: Option, } pub struct TestDatacenter { @@ -99,7 +108,17 @@ impl TestCtx { futures_util::future::try_join_all(setup_futures).await?; dcs.sort_by_key(|dc| dc.config.dc_label()); - Ok(Self { dcs, opts }) + let network_faults = if opts.network_faults { + Some(rivet_test_deps::ToxiproxyTestServer::start().await?) + } else { + None + }; + + Ok(Self { + dcs, + opts, + network_faults, + }) } async fn setup_instance( @@ -204,6 +223,12 @@ impl TestCtx { .unwrap_or_else(|| panic!("No datacenter found with label {}", label)) } + pub fn network_faults(&self) -> &rivet_test_deps::ToxiproxyTestServer { + self.network_faults + .as_ref() + .expect("Network faults were not enabled. Use TestOpts::with_network_faults().") + } + pub async fn shutdown(self) { tracing::info!("shutting down multi-DC test context"); for dc in self.dcs { diff --git a/engine/packages/engine/tests/common/test_envoy.rs b/engine/packages/engine/tests/common/test_envoy.rs index ef4e74c263..d1a9ad37a8 100644 --- a/engine/packages/engine/tests/common/test_envoy.rs +++ b/engine/packages/engine/tests/common/test_envoy.rs @@ -20,6 +20,46 @@ type ActorFactory = Arc Box + Send + Sync> pub type TestEnvoy = Envoy; +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum EnvoyConnectionEvent { + Connected, + Disconnected, +} + +pub struct EnvoyConnectionEventWaiter { + rx: broadcast::Receiver, + expected: EnvoyConnectionEvent, + timeout: std::time::Duration, +} + +impl EnvoyConnectionEventWaiter { + pub fn assert_no_event(&mut self) { + match self.rx.try_recv() { + Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {} + Ok(event) => panic!("unexpected Envoy connection event before fault: {event:?}"), + Err(tokio::sync::broadcast::error::TryRecvError::Lagged(count)) => { + panic!("missed {count} Envoy connection events before fault") + } + Err(err) => panic!("Envoy connection event channel closed: {err}"), + } + } + + pub async fn wait(mut self) { + tokio::time::timeout(self.timeout, async { + loop { + match self.rx.recv().await { + Ok(event) if event == self.expected => break, + Ok(_) => {} + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(err) => panic!("Envoy connection event channel closed: {err}"), + } + } + }) + .await + .expect("timed out waiting for Envoy connection event"); + } +} + #[derive(Clone)] pub struct EnvoyConfig { endpoint: String, @@ -119,6 +159,7 @@ impl EnvoyBuilder { actor_factories: self.actor_factories, actors: tokio::sync::Mutex::new(HashMap::new()), lifecycle_tx, + connection_tx: broadcast::channel(100).0, }), handle: tokio::sync::Mutex::new(None), envoy_key: uuid::Uuid::new_v4().to_string(), @@ -130,6 +171,7 @@ struct EnvoyInner { actor_factories: HashMap, actors: tokio::sync::Mutex>>, lifecycle_tx: broadcast::Sender, + connection_tx: broadcast::Sender, } pub struct Envoy { @@ -197,6 +239,21 @@ impl Envoy { self.inner.lifecycle_tx.subscribe() } + pub fn subscribe_connection_events(&self) -> broadcast::Receiver { + self.inner.connection_tx.subscribe() + } + + pub fn wait_for_next_connection_event( + &self, + expected: EnvoyConnectionEvent, + ) -> EnvoyConnectionEventWaiter { + EnvoyConnectionEventWaiter { + rx: self.subscribe_connection_events(), + expected, + timeout: std::time::Duration::from_secs(20), + } + } + pub async fn shutdown(&self) { if let Some(handle) = self.handle.lock().await.take() { handle.shutdown_and_wait(false).await; @@ -241,6 +298,17 @@ impl TestEnvoyCallbacks { } impl rivet_test_envoy::EnvoyCallbacks for TestEnvoyCallbacks { + fn on_connect(&self, _handle: EnvoyHandle) { + let _ = self.inner.connection_tx.send(EnvoyConnectionEvent::Connected); + } + + fn on_disconnect(&self, _handle: EnvoyHandle) { + let _ = self + .inner + .connection_tx + .send(EnvoyConnectionEvent::Disconnected); + } + fn on_actor_start( &self, handle: EnvoyHandle, @@ -527,6 +595,7 @@ pub struct TestEnvoyBuilder { namespace: String, pool_name: String, version: u32, + endpoint: Option, actor_factories: HashMap, } @@ -536,6 +605,7 @@ impl TestEnvoyBuilder { namespace: namespace.to_string(), pool_name: "test-envoy".to_string(), version: 1, + endpoint: None, actor_factories: HashMap::new(), } } @@ -550,6 +620,11 @@ impl TestEnvoyBuilder { self } + pub fn with_endpoint(mut self, endpoint: impl Into) -> Self { + self.endpoint = Some(endpoint.into()); + self + } + pub fn with_actor_behavior(mut self, actor_name: &str, factory: F) -> Self where F: Fn(ActorConfig) -> Box + Send + Sync + 'static, @@ -561,7 +636,10 @@ impl TestEnvoyBuilder { pub async fn build(self, dc: &super::TestDatacenter) -> Result { let config = EnvoyConfig::builder() - .endpoint(format!("http://127.0.0.1:{}", dc.guard_port())) + .endpoint( + self.endpoint + .unwrap_or_else(|| format!("http://127.0.0.1:{}", dc.guard_port())), + ) .token("dev") .namespace(&self.namespace) .pool_name(&self.pool_name) diff --git a/engine/packages/engine/tests/common/test_helpers.rs b/engine/packages/engine/tests/common/test_helpers.rs index 45afdf3173..ca19c1719b 100644 --- a/engine/packages/engine/tests/common/test_helpers.rs +++ b/engine/packages/engine/tests/common/test_helpers.rs @@ -1,9 +1,31 @@ -use std::{collections::HashMap, str::FromStr}; +use std::{collections::HashMap, future::Future, str::FromStr}; use serde_json::json; use super::TestDatacenter; +pub async fn wait_with_poll( + timeout: std::time::Duration, + poll_interval: std::time::Duration, + mut check: F, +) -> Option +where + F: FnMut() -> Fut, + Fut: Future>, +{ + tokio::time::timeout(timeout, async { + loop { + if let Some(value) = check().await { + break value; + } + + tokio::time::sleep(poll_interval).await; + } + }) + .await + .ok() +} + // Namespace helpers pub async fn setup_test_namespace(leader_dc: &TestDatacenter) -> (String, rivet_util::Id) { let random_suffix = rand::random::(); diff --git a/engine/packages/engine/tests/envoy/mod.rs b/engine/packages/engine/tests/envoy/mod.rs index 27d8532804..2ef4ab9bff 100644 --- a/engine/packages/engine/tests/envoy/mod.rs +++ b/engine/packages/engine/tests/envoy/mod.rs @@ -11,3 +11,5 @@ pub mod api_actors_get_or_create; pub mod api_actors_list; pub mod api_actors_list_names; pub mod auth; +pub mod auth; +pub mod network_faults; diff --git a/engine/packages/engine/tests/envoy/network_faults.rs b/engine/packages/engine/tests/envoy/network_faults.rs new file mode 100644 index 0000000000..6392780701 --- /dev/null +++ b/engine/packages/engine/tests/envoy/network_faults.rs @@ -0,0 +1,156 @@ +use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, +}; + +use super::super::common; + +#[test] +fn envoy_reconnects_after_server_side_tcp_reset() { + common::run( + common::TestOpts::new(1).with_timeout(90).with_network_faults(), + |ctx| async move { + let dc = ctx.leader_dc(); + let (namespace, _) = common::setup_test_namespace(dc).await; + let envoy_proxy = ctx + .network_faults() + .proxy( + "envoy-connect", + std::net::SocketAddr::from(([127, 0, 0, 1], dc.guard_port())), + ) + .await + .expect("failed to create Envoy Toxiproxy proxy"); + + let start_count = Arc::new(AtomicUsize::new(0)); + let actor_start_count = start_count.clone(); + let envoy = common::setup_envoy(dc, &namespace, |builder| { + builder + .with_endpoint(envoy_proxy.endpoint()) + .with_actor_behavior("network-fault-actor", move |_| { + let actor_start_count = actor_start_count.clone(); + Box::new( + common::test_envoy::CustomActorBuilder::new() + .on_start(move |_| { + let actor_start_count = actor_start_count.clone(); + Box::pin(async move { + actor_start_count.fetch_add(1, Ordering::SeqCst); + Ok(common::test_envoy::ActorStartResult::Running) + }) + }) + .build(), + ) + }) + }) + .await; + + let res = common::create_actor( + dc.guard_port(), + &namespace, + "network-fault-actor", + envoy.pool_name(), + rivet_types::actors::CrashPolicy::Sleep, + ) + .await; + let actor_id = res.actor.actor_id.to_string(); + wait_for_envoy_actor(&envoy, &actor_id).await; + wait_for_connectable(dc.guard_port(), &namespace, &actor_id).await; + + let response = common::ping_actor_via_guard(dc, &actor_id).await; + assert_eq!(response["status"], "ok"); + + let mut disconnect = envoy.wait_for_next_connection_event( + common::test_envoy::EnvoyConnectionEvent::Disconnected, + ); + disconnect.assert_no_event(); + envoy_proxy + .reset_downstream() + .await + .expect("failed to inject downstream TCP reset"); + + // The reset toxic applies when the engine next writes to the Envoy control WebSocket. + // The ping task writes every few seconds in the test config. + disconnect.wait().await; + + let reconnect = envoy + .wait_for_next_connection_event(common::test_envoy::EnvoyConnectionEvent::Connected); + envoy_proxy + .clear_toxics() + .await + .expect("failed to clear downstream TCP reset"); + reconnect.wait().await; + + let response = ping_actor_via_gateway(dc.guard_port(), &actor_id).await; + assert_eq!(response["status"], "ok"); + assert_eq!( + start_count.load(Ordering::SeqCst), + 1, + "Envoy reconnect should not replay the actor start command" + ); + }, + ); +} + +async fn wait_for_envoy_actor(envoy: &common::test_envoy::TestEnvoy, actor_id: &str) { + common::wait_with_poll( + std::time::Duration::from_secs(5), + std::time::Duration::from_millis(50), + || async { + if envoy.has_actor(actor_id).await { + Some(()) + } else { + None + } + }, + ) + .await + .expect("envoy should receive actor"); +} + +async fn wait_for_connectable( + guard_port: u16, + namespace: &str, + actor_id: &str, +) -> rivet_types::actors::Actor { + common::wait_with_poll( + std::time::Duration::from_secs(10), + std::time::Duration::from_millis(50), + || async { + let actor = common::try_get_actor(guard_port, actor_id, namespace) + .await + .expect("failed to get actor") + .expect("actor should exist"); + + if actor.connectable_ts.is_some() { + Some(actor) + } else { + None + } + }, + ) + .await + .expect("actor should become connectable") +} + +async fn ping_actor_via_gateway(guard_port: u16, actor_id: &str) -> serde_json::Value { + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(2)) + .build() + .expect("failed to build reqwest client"); + + let response = client + .get(format!("http://127.0.0.1:{guard_port}/gateway/{actor_id}/ping")) + .send() + .await + .expect("failed to ping actor through gateway"); + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + panic!("failed to ping actor through gateway: {status}: {text}"); + } + + response + .json() + .await + .expect("failed to decode gateway ping response") +} diff --git a/engine/packages/test-deps-docker/Cargo.toml b/engine/packages/test-deps-docker/Cargo.toml index ae95f7a5b2..8cf8487ff6 100644 --- a/engine/packages/test-deps-docker/Cargo.toml +++ b/engine/packages/test-deps-docker/Cargo.toml @@ -7,8 +7,12 @@ license.workspace = true [dependencies] anyhow.workspace = true portpicker.workspace = true +reqwest.workspace = true rivet-config.workspace = true +serde.workspace = true +serde_json.workspace = true +testcontainers.workspace = true tokio.workspace = true tokio-postgres.workspace = true tracing.workspace = true -uuid.workspace = true \ No newline at end of file +uuid.workspace = true diff --git a/engine/packages/test-deps-docker/src/lib.rs b/engine/packages/test-deps-docker/src/lib.rs index 644e57ad53..f79bf318df 100644 --- a/engine/packages/test-deps-docker/src/lib.rs +++ b/engine/packages/test-deps-docker/src/lib.rs @@ -3,9 +3,11 @@ use tokio::process::Command; mod database; mod pubsub; +mod toxiproxy; pub use database::*; pub use pubsub::*; +pub use toxiproxy::*; #[derive(Debug, Clone)] pub struct DockerRunConfig { diff --git a/engine/packages/test-deps-docker/src/toxiproxy.rs b/engine/packages/test-deps-docker/src/toxiproxy.rs new file mode 100644 index 0000000000..f907dc2168 --- /dev/null +++ b/engine/packages/test-deps-docker/src/toxiproxy.rs @@ -0,0 +1,350 @@ +use std::net::SocketAddr; + +use anyhow::{Context, Result}; +use serde::Serialize; +use testcontainers::{ + GenericImage, ImageExt, + core::{ContainerAsync, Host, IntoContainerPort}, + runners::AsyncRunner, +}; + +const ADMIN_PORT: u16 = 8474; +const DEFAULT_PROXY_PORT: u16 = 8666; + +pub struct ToxiproxyTestServer { + _container: ContainerAsync, + client: reqwest::Client, + admin_base_url: String, + proxy_port: u16, +} + +#[derive(Clone)] +pub struct ToxiproxyProxy { + client: reqwest::Client, + admin_base_url: String, + name: String, + listen_addr: SocketAddr, +} + +#[derive(Serialize)] +struct CreateProxyRequest<'a> { + name: &'a str, + listen: String, + upstream: String, + enabled: bool, +} + +#[derive(Serialize)] +struct UpdateProxyRequest { + enabled: bool, +} + +#[derive(Serialize)] +struct CreateToxicRequest<'a, T> +where + T: Serialize, +{ + name: &'a str, + #[serde(rename = "type")] + toxic_type: &'a str, + stream: &'a str, + toxicity: f32, + attributes: T, +} + +#[derive(Serialize)] +struct ResetPeerAttributes { + timeout: u64, +} + +#[derive(Serialize)] +struct LatencyAttributes { + latency: u64, + jitter: u64, +} + +#[derive(Serialize)] +struct TimeoutAttributes { + timeout: u64, +} + +#[derive(Serialize)] +struct BandwidthAttributes { + rate: u64, +} + +#[derive(Clone, Copy)] +pub enum ToxiproxyDirection { + Upstream, + Downstream, +} + +impl ToxiproxyDirection { + fn as_str(self) -> &'static str { + match self { + ToxiproxyDirection::Upstream => "upstream", + ToxiproxyDirection::Downstream => "downstream", + } + } +} + +impl ToxiproxyTestServer { + pub async fn start() -> Result { + let image = GenericImage::new("ghcr.io/shopify/toxiproxy", "2.12.0") + .with_exposed_port(ADMIN_PORT.tcp()) + .with_exposed_port(DEFAULT_PROXY_PORT.tcp()) + .with_host("host.docker.internal", Host::HostGateway); + + let container = image.start().await.context("failed to start Toxiproxy")?; + let admin_port = container + .get_host_port_ipv4(ADMIN_PORT.tcp()) + .await + .context("failed to get Toxiproxy admin port")?; + let proxy_port = container + .get_host_port_ipv4(DEFAULT_PROXY_PORT.tcp()) + .await + .context("failed to get Toxiproxy proxy port")?; + let admin_base_url = format!("http://127.0.0.1:{admin_port}"); + let client = reqwest::Client::new(); + + wait_for_admin(&client, &admin_base_url).await?; + + Ok(Self { + _container: container, + client, + admin_base_url, + proxy_port, + }) + } + + pub async fn proxy(&self, name: &str, upstream: SocketAddr) -> Result { + let request = CreateProxyRequest { + name, + listen: format!("0.0.0.0:{}", DEFAULT_PROXY_PORT), + upstream: format!("host.docker.internal:{}", upstream.port()), + enabled: true, + }; + + let response = self + .client + .post(format!("{}/proxies", self.admin_base_url)) + .json(&request) + .send() + .await + .context("failed to create Toxiproxy proxy")?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to create Toxiproxy proxy: {status}: {text}"); + } + + Ok(ToxiproxyProxy { + client: self.client.clone(), + admin_base_url: self.admin_base_url.clone(), + name: name.to_string(), + listen_addr: SocketAddr::from(([127, 0, 0, 1], self.proxy_port)), + }) + } +} + +impl ToxiproxyProxy { + pub fn listen_addr(&self) -> SocketAddr { + self.listen_addr + } + + pub fn endpoint(&self) -> String { + format!("http://{}", self.listen_addr) + } + + pub async fn enable(&self) -> Result<()> { + self.update_enabled(true).await + } + + pub async fn disable(&self) -> Result<()> { + self.update_enabled(false).await + } + + pub async fn reset_downstream(&self) -> Result<()> { + self.reset_peer(ToxiproxyDirection::Downstream, 0, 1.0) + .await + } + + pub async fn reset_peer( + &self, + direction: ToxiproxyDirection, + timeout_ms: u64, + toxicity: f32, + ) -> Result<()> { + self.add_toxic( + "reset-peer", + "reset_peer", + direction, + toxicity, + ResetPeerAttributes { + timeout: timeout_ms, + }, + ) + .await + } + + pub async fn latency_downstream(&self, latency_ms: u64, jitter_ms: u64) -> Result<()> { + self.add_toxic( + "latency-downstream", + "latency", + ToxiproxyDirection::Downstream, + 1.0, + LatencyAttributes { + latency: latency_ms, + jitter: jitter_ms, + }, + ) + .await + } + + pub async fn timeout_downstream(&self, timeout_ms: u64, toxicity: f32) -> Result<()> { + self.add_toxic( + "timeout-downstream", + "timeout", + ToxiproxyDirection::Downstream, + toxicity, + TimeoutAttributes { + timeout: timeout_ms, + }, + ) + .await + } + + pub async fn bandwidth_downstream(&self, kbps: u64) -> Result<()> { + self.add_toxic( + "bandwidth-downstream", + "bandwidth", + ToxiproxyDirection::Downstream, + 1.0, + BandwidthAttributes { rate: kbps }, + ) + .await + } + + pub async fn clear_toxics(&self) -> Result<()> { + let response = self + .client + .get(format!( + "{}/proxies/{}/toxics", + self.admin_base_url, self.name + )) + .send() + .await + .context("failed to list Toxiproxy toxics")?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to list Toxiproxy toxics: {status}: {text}"); + } + + let toxics: Vec = response + .json() + .await + .context("failed to decode Toxiproxy toxics")?; + + for toxic in toxics { + let Some(name) = toxic.get("name").and_then(|value| value.as_str()) else { + continue; + }; + let response = self + .client + .delete(format!( + "{}/proxies/{}/toxics/{}", + self.admin_base_url, self.name, name + )) + .send() + .await + .with_context(|| format!("failed to delete Toxiproxy toxic {name}"))?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to delete Toxiproxy toxic {name}: {status}: {text}"); + } + } + + Ok(()) + } + + async fn update_enabled(&self, enabled: bool) -> Result<()> { + let response = self + .client + .post(format!("{}/proxies/{}", self.admin_base_url, self.name)) + .json(&UpdateProxyRequest { enabled }) + .send() + .await + .context("failed to update Toxiproxy proxy")?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to update Toxiproxy proxy: {status}: {text}"); + } + + Ok(()) + } + + async fn add_toxic( + &self, + name: &str, + toxic_type: &str, + direction: ToxiproxyDirection, + toxicity: f32, + attributes: T, + ) -> Result<()> + where + T: Serialize, + { + let response = self + .client + .post(format!( + "{}/proxies/{}/toxics", + self.admin_base_url, self.name + )) + .json(&CreateToxicRequest { + name, + toxic_type, + stream: direction.as_str(), + toxicity, + attributes, + }) + .send() + .await + .with_context(|| format!("failed to add Toxiproxy toxic {name}"))?; + + if !response.status().is_success() { + let status = response.status(); + let text = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to add Toxiproxy toxic {name}: {status}: {text}"); + } + + Ok(()) + } +} + +async fn wait_for_admin(client: &reqwest::Client, admin_base_url: &str) -> Result<()> { + let start = std::time::Instant::now(); + let timeout = std::time::Duration::from_secs(15); + + loop { + let ready = match client.get(format!("{admin_base_url}/version")).send().await { + Ok(response) => response.status().is_success(), + Err(_) => false, + }; + if ready { + return Ok(()); + } + + if start.elapsed() > timeout { + anyhow::bail!("timed out waiting for Toxiproxy admin API"); + } + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} diff --git a/engine/sdks/rust/envoy-client/src/config.rs b/engine/sdks/rust/envoy-client/src/config.rs index 6ac5d129ce..3c6800aa9b 100644 --- a/engine/sdks/rust/envoy-client/src/config.rs +++ b/engine/sdks/rust/envoy-client/src/config.rs @@ -99,6 +99,10 @@ impl ActorStopHandle { /// Callbacks that the consumer of the envoy client must implement. pub trait EnvoyCallbacks: Send + Sync + 'static { + fn on_connect(&self, _handle: EnvoyHandle) {} + + fn on_disconnect(&self, _handle: EnvoyHandle) {} + fn on_actor_start( &self, handle: EnvoyHandle, diff --git a/engine/sdks/rust/envoy-client/src/connection/native.rs b/engine/sdks/rust/envoy-client/src/connection/native.rs index 2c95d2e261..59d76e587d 100644 --- a/engine/sdks/rust/envoy-client/src/connection/native.rs +++ b/engine/sdks/rust/envoy-client/src/connection/native.rs @@ -10,6 +10,7 @@ use vbare::OwnedVersionedData; use crate::context::{SharedContext, WsTxMessage}; use crate::envoy::ToEnvoyMessage; +use crate::handle::EnvoyHandle; use crate::utils::{BackoffOptions, calculate_backoff, parse_ws_close_reason}; const STABLE_CONNECTION_MS: u64 = 60_000; @@ -117,6 +118,10 @@ async fn single_connection( has_token = shared.config.token.is_some(), "websocket connected" ); + shared + .config + .callbacks + .on_connect(EnvoyHandle::from_shared(shared.clone())); // Spawn write task let shared2 = shared.clone(); @@ -196,6 +201,10 @@ async fn single_connection( *guard = None; } write_handle.abort(); + shared + .config + .callbacks + .on_disconnect(EnvoyHandle::from_shared(shared.clone())); Ok(result) }