diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index c7e70713f1..d71894936c 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -2159,11 +2159,14 @@ impl ActorTask { // only meant to stop the previous generation. self.ctx.reset_abort_signal_for_start(); self.ctx.clear_sleep_requested(); + } + self.ctx + .set_started(matches!( + lifecycle, + LifecycleState::Started | LifecycleState::SleepGrace + )); } - self.ctx - .set_started(matches!(lifecycle, LifecycleState::Started)); } -} fn shutdown_reason_label(reason: ShutdownKind) -> &'static str { match reason { diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 497d6cf5a0..f74387f4e0 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -103,20 +103,23 @@ type ActiveActorInstance = Arc; enum ActorInstanceState { Active(ActiveActorInstance), - Stopping(ActiveActorInstance), + Stopping { + instance: ActiveActorInstance, + reason: ShutdownKind, + }, } impl ActorInstanceState { fn instance(&self) -> ActiveActorInstance { match self { - Self::Active(instance) | Self::Stopping(instance) => instance.clone(), + Self::Active(instance) | Self::Stopping { instance, .. } => instance.clone(), } } fn active_instance(&self) -> Option { match self { Self::Active(instance) => Some(instance.clone()), - Self::Stopping(_) => None, + Self::Stopping { .. } => None, } } } @@ -675,19 +678,20 @@ impl RegistryDispatcher { .remove_async(&request.actor_id.clone()) .await .map(|(_, pending_stop)| pending_stop); - if let Some(pending_stop) = pending_stop { - let actor_id = request.actor_id.clone(); - if matches!( - map_envoy_stop_reason(&pending_stop.reason), - ShutdownKind::Destroy - ) { - instance.ctx.mark_destroy_requested(); - } - self.set_actor_instance_state( - actor_id.clone(), - ActorInstanceState::Stopping(instance.clone()), - ) - .await; + if let Some(pending_stop) = pending_stop { + let actor_id = request.actor_id.clone(); + let stop_reason = map_envoy_stop_reason(&pending_stop.reason); + if matches!(stop_reason, ShutdownKind::Destroy) { + instance.ctx.mark_destroy_requested(); + } + self.set_actor_instance_state( + actor_id.clone(), + ActorInstanceState::Stopping { + instance: instance.clone(), + reason: stop_reason, + }, + ) + .await; let _ = self .starting_instances .remove_async(&request.actor_id.clone()) @@ -753,15 +757,22 @@ impl RegistryDispatcher { } } - async fn transition_actor_to_stopping(&self, actor_id: &str) -> Option { - match self.actor_instances.entry_async(actor_id.to_owned()).await { - SccEntry::Occupied(mut entry) => { - let instance = entry.get().instance(); - if matches!(entry.get(), ActorInstanceState::Active(_)) { - entry.insert(ActorInstanceState::Stopping(instance.clone())); - } else { - instance - .ctx + async fn transition_actor_to_stopping( + &self, + actor_id: &str, + reason: ShutdownKind, + ) -> Option { + match self.actor_instances.entry_async(actor_id.to_owned()).await { + SccEntry::Occupied(mut entry) => { + let instance = entry.get().instance(); + if matches!(entry.get(), ActorInstanceState::Active(_)) { + entry.insert(ActorInstanceState::Stopping { + instance: instance.clone(), + reason, + }); + } else { + instance + .ctx .warn_work_sent_to_stopping_instance("stop_actor"); } Some(instance) @@ -776,10 +787,12 @@ impl RegistryDispatcher { async fn remove_stopping_actor_instance(&self, actor_id: &str, expected: &ActiveActorInstance) { match self.actor_instances.entry_async(actor_id.to_owned()).await { SccEntry::Occupied(entry) => { - let should_remove = match entry.get() { - ActorInstanceState::Stopping(instance) => Arc::ptr_eq(instance, expected), - ActorInstanceState::Active(_) => false, - }; + let should_remove = match entry.get() { + ActorInstanceState::Stopping { instance, .. } => { + Arc::ptr_eq(instance, expected) + } + ActorInstanceState::Active(_) => false, + }; if should_remove { let _ = entry.remove_entry(); } @@ -793,44 +806,49 @@ impl RegistryDispatcher { async fn active_actor(&self, actor_id: &str) -> Result> { if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await { match instance.get() { - ActorInstanceState::Active(instance) => { - let instance = instance.clone(); - if instance.ctx.started() { - if instance.ctx.destroy_requested() || instance.ctx.sleep_requested() { - instance - .ctx - .warn_work_sent_to_stopping_instance("active_actor"); - return Err(if instance.ctx.destroy_requested() { - ActorLifecycleError::Destroying.build() - } else { - ActorLifecycleError::Stopping.build() - }); + ActorInstanceState::Active(instance) => { + let instance = instance.clone(); + if instance.ctx.started() { + if instance.ctx.destroy_requested() { + instance + .ctx + .warn_work_sent_to_stopping_instance("active_actor"); + return Err(ActorLifecycleError::Destroying.build()); + } + return Ok(instance); } - return Ok(instance); - } instance .ctx .warn_work_sent_to_stopping_instance("active_actor"); - return Err(if instance.ctx.destroy_requested() { - ActorLifecycleError::Destroying.build() - } else { - ActorLifecycleError::Starting.build() - }); - } - ActorInstanceState::Stopping(instance) => { - let instance = instance.clone(); - instance - .ctx - .warn_work_sent_to_stopping_instance("active_actor"); - return Err(if instance.ctx.destroy_requested() { - ActorLifecycleError::Destroying.build() - } else { - ActorLifecycleError::Stopping.build() - }); + return Err(if instance.ctx.destroy_requested() { + ActorLifecycleError::Destroying.build() + } else if instance.ctx.sleep_requested() { + ActorLifecycleError::Stopping.build() + } else { + ActorLifecycleError::Starting.build() + }); + } + ActorInstanceState::Stopping { instance, reason } => { + let instance = instance.clone(); + match reason { + ShutdownKind::Sleep if instance.ctx.started() => return Ok(instance), + ShutdownKind::Sleep => { + instance + .ctx + .warn_work_sent_to_stopping_instance("active_actor"); + return Err(ActorLifecycleError::Stopping.build()); + } + ShutdownKind::Destroy => { + instance + .ctx + .warn_work_sent_to_stopping_instance("active_actor"); + return Err(ActorLifecycleError::Destroying.build()); + } + } + } } } - } tracing::warn!(actor_id, "actor instance not found"); Err(ActorRuntime::NotFound { @@ -865,9 +883,13 @@ impl RegistryDispatcher { return Ok(()); } - let instance = match self.transition_actor_to_stopping(actor_id).await { - Some(instance) => instance, - None => { + let task_stop_reason = map_envoy_stop_reason(&reason); + let instance = match self + .transition_actor_to_stopping(actor_id, task_stop_reason) + .await + { + Some(instance) => instance, + None => { let _ = self .pending_stops .insert_async( @@ -881,8 +903,8 @@ impl RegistryDispatcher { return Ok(()); } }; - let result = self - .shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle) + let result = self + .shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle) .await; self.remove_stopping_actor_instance(actor_id, &instance) .await; diff --git a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep-db.test.ts b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep-db.test.ts index d62f03aff0..09dc1bbe70 100644 --- a/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep-db.test.ts +++ b/rivetkit-typescript/packages/rivetkit/tests/driver/actor-sleep-db.test.ts @@ -428,8 +428,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => { expect(events).toContain("sleep-end"); }); - // TODO(#4705): Root-cause handle action dispatch ordering during sleep shutdown and re-enable this coverage. - test.skip("action via handle during sleep shutdown is not queued", async (c) => { + test("action via handle during sleep shutdown is not queued", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const handle = client.sleepWithDbAction.getOrCreate([ @@ -647,8 +646,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => { ); }); - // TODO(#4705): Root-cause connection action dispatch ordering during sleep shutdown and re-enable this coverage. - test.skip("action via WebSocket connection during sleep shutdown is not queued", async (c) => { + test("action via WebSocket connection during sleep shutdown is not queued", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); const handle = client.sleepWithDbAction.getOrCreate([ @@ -698,8 +696,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => { expect(events).not.toContain("ws-during-sleep"); } }); - // TODO(#4705): Root-cause new connection behavior during sleep shutdown and re-enable this coverage. - test.skip("new connections rejected during sleep shutdown", async (c) => { + test("new connections rejected during sleep shutdown", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); // The sleepWithDbAction actor has a 500ms delay in @@ -744,8 +741,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => { await secondConn.dispose(); }); - // TODO(#4705): Root-cause raw WebSocket admission during sleep shutdown and re-enable this coverage. - test.skip("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => { + test("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => { const { client } = await setupDriverTest(c, driverTestConfig); // The sleepWithRawWs actor has a 500ms delay in onSleep.