diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index 490c9d9329..c64ab373b6 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -1364,58 +1364,71 @@ impl ActorContext { } fn configure_sleep_hooks(&self) { - let keep_awake_ctx = self.clone(); + let keep_awake_ctx = self.downgrade(); self.0 .sleep .work .keep_awake .register_change_callback(Arc::new(move || { - keep_awake_ctx - .0 - .metrics - .set_keep_awake_active(keep_awake_ctx.sleep_keep_awake_count()); + if let Some(ctx) = ActorContext::from_weak(&keep_awake_ctx) { + ctx.0 + .metrics + .set_keep_awake_active(ctx.sleep_keep_awake_count()); + } })); - let internal_keep_awake_metric_ctx = self.clone(); + let internal_keep_awake_metric_ctx = self.downgrade(); self.0 .sleep .work .internal_keep_awake .register_change_callback(Arc::new(move || { - internal_keep_awake_metric_ctx - .0 - .metrics - .set_internal_keep_awake_active( - internal_keep_awake_metric_ctx.sleep_internal_keep_awake_count(), - ); + if let Some(ctx) = ActorContext::from_weak(&internal_keep_awake_metric_ctx) { + ctx.0 + .metrics + .set_internal_keep_awake_active(ctx.sleep_internal_keep_awake_count()); + } })); - let shutdown_tasks_ctx = self.clone(); + let shutdown_tasks_ctx = self.downgrade(); self.0 .sleep .work .shutdown_counter .register_change_callback(Arc::new(move || { - shutdown_tasks_ctx - .0 - .metrics - .set_shutdown_tasks_active(shutdown_tasks_ctx.shutdown_task_count()); + if let Some(ctx) = ActorContext::from_weak(&shutdown_tasks_ctx) { + ctx.0 + .metrics + .set_shutdown_tasks_active(ctx.shutdown_task_count()); + } })); - let internal_keep_awake_ctx = self.clone(); + let internal_keep_awake_ctx = self.downgrade(); self.set_internal_keep_awake(Some(Arc::new(move |future| { - let ctx = internal_keep_awake_ctx.clone(); - Box::pin(async move { ctx.internal_keep_awake_task(future).await }) + let ctx = ActorContext::from_weak(&internal_keep_awake_ctx); + Box::pin(async move { + let Some(ctx) = ctx else { + return Err(ActorRuntime::NotConfigured { + component: "actor context".to_owned(), + } + .build()); + }; + ctx.internal_keep_awake_task(future).await + }) }))); - let queue_ctx = self.clone(); + let queue_ctx = self.downgrade(); self.set_wait_activity_callback(Some(Arc::new(move || { - queue_ctx.reset_sleep_timer(); + if let Some(ctx) = ActorContext::from_weak(&queue_ctx) { + ctx.reset_sleep_timer(); + } }))); - let queue_ctx = self.clone(); + let queue_ctx = self.downgrade(); self.set_inspector_update_callback(Some(Arc::new(move |queue_size| { - queue_ctx.record_queue_updated(queue_size); + if let Some(ctx) = ActorContext::from_weak(&queue_ctx) { + ctx.record_queue_updated(queue_size); + } }))); } diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs index 47dbba86b0..9cc70d68d4 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use std::fmt; use std::sync::{Arc, LazyLock}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use parking_lot::Mutex; @@ -34,6 +35,7 @@ pub(crate) struct ActorMetrics { struct ActorMetricInner { labels: ActorMetricLabels, state: Mutex, + active: AtomicBool, } #[derive(Debug)] @@ -643,6 +645,7 @@ impl ActorMetrics { inner: Arc::new(ActorMetricInner { labels, state: Mutex::new(ActorMetricState::default()), + active: AtomicBool::new(true), }), } } @@ -850,10 +853,24 @@ impl ActorMetrics { .with_label_values(&[labels[0], subsystem, operation]) .inc(); } + + pub(crate) fn record_actor_stopped(&self) { + self.inner.record_actor_stopped(); + } } impl Drop for ActorMetricInner { fn drop(&mut self) { + self.record_actor_stopped(); + } +} + +impl ActorMetricInner { + fn record_actor_stopped(&self) { + if !self.active.swap(false, Ordering::AcqRel) { + return; + } + self.clear_aggregated_gauges(); let metrics = &*METRICS; metrics @@ -865,9 +882,7 @@ impl Drop for ActorMetricInner { .with_label_values(&self.labels.as_label_values()) .inc(); } -} -impl ActorMetricInner { fn clear_aggregated_gauges(&self) { let labels = self.labels.as_label_values(); let mut state = self.state.lock(); diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs index 2f771e5c28..db3ae4b5f5 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs @@ -387,11 +387,13 @@ impl ActorTask { Arc::clone(&inspector_attach_count), inspector_overlay_tx.clone(), ); - let inspector_ctx = ctx.clone(); + let inspector_ctx = ctx.downgrade(); let inspector_attach_count_for_hook = Arc::clone(&inspector_attach_count); ctx.on_request_save(Box::new(move |_opts| { if inspector_attach_count_for_hook.load(Ordering::SeqCst) > 0 { - inspector_ctx.notify_inspector_serialize_requested(); + if let Some(ctx) = ActorContext::from_weak(&inspector_ctx) { + ctx.notify_inspector_serialize_requested(); + } } })); Self { @@ -444,6 +446,7 @@ impl ActorTask { let exit = self.run_live().await; let LiveExit::Shutdown { reason } = exit else { self.record_inbox_depths(); + self.ctx.metrics().record_actor_stopped(); return Ok(()); }; @@ -457,6 +460,7 @@ impl ActorTask { self.deliver_shutdown_reply(reason, &result); self.transition_to(LifecycleState::Terminated); self.record_inbox_depths(); + self.ctx.metrics().record_actor_stopped(); result } diff --git a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs index 95dfa03814..59e7f95ea4 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs @@ -528,7 +528,7 @@ impl CoreRegistry { // TODO: Move into envoy-client since timing out has to do with protocol compliance // Read threshold from protocol metadata, fall back to 30 min let stop_threshold = handle - .get_protocol_metadata + .get_protocol_metadata() .await .map(|x| x.actor_stop_threshold) .unwrap_or(30 * 60 * 1000); @@ -536,7 +536,7 @@ impl CoreRegistry { // we fall back to immediate `Stop` rather than hanging indefinitely. // The outer host (TS signal handler / Rust binary) is the backstop. match timeout( - Duration::from_millis(SHUTDOWN_DRAIN_TIMEOUT as u64), + Duration::from_millis(stop_threshold as u64), handle.shutdown_and_wait(false), ) .await diff --git a/rivetkit-rust/packages/rivetkit-core/tests/task.rs b/rivetkit-rust/packages/rivetkit-core/tests/task.rs index 56fa349052..2a48d31128 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/task.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/task.rs @@ -174,6 +174,18 @@ mod moved_tests { ) } + #[test] + fn request_save_hook_does_not_retain_actor_context() { + let ctx = ActorContext::new("actor-hook-drop", "task-hook-drop", Vec::new(), "local"); + let weak = ctx.downgrade(); + let task = new_task(ctx.clone()); + + drop(task); + drop(ctx); + + assert!(weak.upgrade().is_none()); + } + struct IdleEnvoyCallbacks; impl EnvoyCallbacks for IdleEnvoyCallbacks {