Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions rivetkit-rust/packages/rivetkit-core/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1364,58 +1364,71 @@ impl ActorContext {
}

fn configure_sleep_hooks(&self) {
let keep_awake_ctx = self.clone();
let keep_awake_ctx = self.downgrade();
self.0
.sleep
.work
.keep_awake
.register_change_callback(Arc::new(move || {
keep_awake_ctx
.0
.metrics
.set_keep_awake_active(keep_awake_ctx.sleep_keep_awake_count());
if let Some(ctx) = ActorContext::from_weak(&keep_awake_ctx) {
ctx.0
.metrics
.set_keep_awake_active(ctx.sleep_keep_awake_count());
}
}));

let internal_keep_awake_metric_ctx = self.clone();
let internal_keep_awake_metric_ctx = self.downgrade();
self.0
.sleep
.work
.internal_keep_awake
.register_change_callback(Arc::new(move || {
internal_keep_awake_metric_ctx
.0
.metrics
.set_internal_keep_awake_active(
internal_keep_awake_metric_ctx.sleep_internal_keep_awake_count(),
);
if let Some(ctx) = ActorContext::from_weak(&internal_keep_awake_metric_ctx) {
ctx.0
.metrics
.set_internal_keep_awake_active(ctx.sleep_internal_keep_awake_count());
}
}));

let shutdown_tasks_ctx = self.clone();
let shutdown_tasks_ctx = self.downgrade();
self.0
.sleep
.work
.shutdown_counter
.register_change_callback(Arc::new(move || {
shutdown_tasks_ctx
.0
.metrics
.set_shutdown_tasks_active(shutdown_tasks_ctx.shutdown_task_count());
if let Some(ctx) = ActorContext::from_weak(&shutdown_tasks_ctx) {
ctx.0
.metrics
.set_shutdown_tasks_active(ctx.shutdown_task_count());
}
}));

let internal_keep_awake_ctx = self.clone();
let internal_keep_awake_ctx = self.downgrade();
self.set_internal_keep_awake(Some(Arc::new(move |future| {
let ctx = internal_keep_awake_ctx.clone();
Box::pin(async move { ctx.internal_keep_awake_task(future).await })
let ctx = ActorContext::from_weak(&internal_keep_awake_ctx);
Box::pin(async move {
let Some(ctx) = ctx else {
return Err(ActorRuntime::NotConfigured {
component: "actor context".to_owned(),
}
.build());
};
ctx.internal_keep_awake_task(future).await
})
})));

let queue_ctx = self.clone();
let queue_ctx = self.downgrade();
self.set_wait_activity_callback(Some(Arc::new(move || {
queue_ctx.reset_sleep_timer();
if let Some(ctx) = ActorContext::from_weak(&queue_ctx) {
ctx.reset_sleep_timer();
}
})));

let queue_ctx = self.clone();
let queue_ctx = self.downgrade();
self.set_inspector_update_callback(Some(Arc::new(move |queue_size| {
queue_ctx.record_queue_updated(queue_size);
if let Some(ctx) = ActorContext::from_weak(&queue_ctx) {
ctx.record_queue_updated(queue_size);
}
})));
}

Expand Down
19 changes: 17 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::fmt;
use std::sync::{Arc, LazyLock};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;

use parking_lot::Mutex;
Expand Down Expand Up @@ -34,6 +35,7 @@ pub(crate) struct ActorMetrics {
struct ActorMetricInner {
labels: ActorMetricLabels,
state: Mutex<ActorMetricState>,
active: AtomicBool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -643,6 +645,7 @@ impl ActorMetrics {
inner: Arc::new(ActorMetricInner {
labels,
state: Mutex::new(ActorMetricState::default()),
active: AtomicBool::new(true),
}),
}
}
Expand Down Expand Up @@ -850,10 +853,24 @@ impl ActorMetrics {
.with_label_values(&[labels[0], subsystem, operation])
.inc();
}

pub(crate) fn record_actor_stopped(&self) {
self.inner.record_actor_stopped();
}
}

impl Drop for ActorMetricInner {
fn drop(&mut self) {
self.record_actor_stopped();
}
}

impl ActorMetricInner {
fn record_actor_stopped(&self) {
if !self.active.swap(false, Ordering::AcqRel) {
return;
}

self.clear_aggregated_gauges();
let metrics = &*METRICS;
metrics
Expand All @@ -865,9 +882,7 @@ impl Drop for ActorMetricInner {
.with_label_values(&self.labels.as_label_values())
.inc();
}
}

impl ActorMetricInner {
fn clear_aggregated_gauges(&self) {
let labels = self.labels.as_label_values();
let mut state = self.state.lock();
Expand Down
8 changes: 6 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,11 +387,13 @@ impl ActorTask {
Arc::clone(&inspector_attach_count),
inspector_overlay_tx.clone(),
);
let inspector_ctx = ctx.clone();
let inspector_ctx = ctx.downgrade();
let inspector_attach_count_for_hook = Arc::clone(&inspector_attach_count);
ctx.on_request_save(Box::new(move |_opts| {
if inspector_attach_count_for_hook.load(Ordering::SeqCst) > 0 {
inspector_ctx.notify_inspector_serialize_requested();
if let Some(ctx) = ActorContext::from_weak(&inspector_ctx) {
ctx.notify_inspector_serialize_requested();
}
}
}));
Self {
Expand Down Expand Up @@ -444,6 +446,7 @@ impl ActorTask {
let exit = self.run_live().await;
let LiveExit::Shutdown { reason } = exit else {
self.record_inbox_depths();
self.ctx.metrics().record_actor_stopped();
return Ok(());
};

Expand All @@ -457,6 +460,7 @@ impl ActorTask {
self.deliver_shutdown_reply(reason, &result);
self.transition_to(LifecycleState::Terminated);
self.record_inbox_depths();
self.ctx.metrics().record_actor_stopped();
result
}

Expand Down
4 changes: 2 additions & 2 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,15 +528,15 @@ impl CoreRegistry {
// TODO: Move into envoy-client since timing out has to do with protocol compliance
// Read threshold from protocol metadata, fall back to 30 min
let stop_threshold = handle
.get_protocol_metadata
.get_protocol_metadata()
.await
.map(|x| x.actor_stop_threshold)
.unwrap_or(30 * 60 * 1000);
// Bounded drain. If envoy cannot reach the engine (reconnect loop stuck),
// we fall back to immediate `Stop` rather than hanging indefinitely.
// The outer host (TS signal handler / Rust binary) is the backstop.
match timeout(
Duration::from_millis(SHUTDOWN_DRAIN_TIMEOUT as u64),
Duration::from_millis(stop_threshold as u64),
handle.shutdown_and_wait(false),
)
.await
Expand Down
12 changes: 12 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,18 @@ mod moved_tests {
)
}

#[test]
fn request_save_hook_does_not_retain_actor_context() {
let ctx = ActorContext::new("actor-hook-drop", "task-hook-drop", Vec::new(), "local");
let weak = ctx.downgrade();
let task = new_task(ctx.clone());

drop(task);
drop(ctx);

assert!(weak.upgrade().is_none());
}

struct IdleEnvoyCallbacks;

impl EnvoyCallbacks for IdleEnvoyCallbacks {
Expand Down
Loading