diff --git a/engine/packages/depot-client/src/vfs.rs b/engine/packages/depot-client/src/vfs.rs index e447c4d2a6..77babe2cb2 100644 --- a/engine/packages/depot-client/src/vfs.rs +++ b/engine/packages/depot-client/src/vfs.rs @@ -303,6 +303,8 @@ pub trait SqliteVfsMetrics: Send + Sync { fn set_worker_queue_depth(&self, _depth: u64) {} + fn set_worker_active(&self, _active: bool) {} + fn record_worker_queue_overload(&self) {} fn observe_worker_command_duration(&self, _operation: &'static str, _duration_ns: u64) {} diff --git a/examples/kitchen-sink/src/server.ts b/examples/kitchen-sink/src/server.ts index 521c4ea6da..a1317e0841 100644 --- a/examples/kitchen-sink/src/server.ts +++ b/examples/kitchen-sink/src/server.ts @@ -58,16 +58,6 @@ async function memoryBreakdown(forceGc: boolean) { const heap = v8.getHeapStatistics(); const spaces = v8.getHeapSpaceStatistics(); const nativeNonV8Estimate = Math.max(0, memory.rss - heap.total_heap_size); - const diagnostics = - "diagnostics" in registry && - typeof registry.diagnostics === "function" - ? registry.diagnostics.bind(registry) - : undefined; - const registryDiagnostics = diagnostics - ? await diagnostics().catch((error: unknown) => ({ - error: error instanceof Error ? error.message : String(error), - })) - : { error: "registry diagnostics unavailable" }; return { pid: process.pid, @@ -103,7 +93,6 @@ async function memoryBreakdown(forceGc: boolean) { v8ExternalBytes: memory.external, nativeNonV8ResidentEstimateBytes: nativeNonV8Estimate, }, - registry: registryDiagnostics, resourceUsage: process.resourceUsage(), }; } @@ -126,6 +115,12 @@ app.get("/debug/memory", async (c) => { return c.json(await memoryBreakdown(forceGc)); }); +app.get("/health", () => registry.routes.health()); + +app.get("/metadata", () => registry.routes.metadata()); + +app.get("/metrics", (c) => registry.routes.prometheusMetrics(c.req.raw)); + app.post("/debug/heap-snapshot", (c) => { if (process.env.SQLITE_MEMORY_SOAK_DIAGNOSTICS !== "1") { return c.json({ error: "disabled" }, 404); diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs index 66867b00e0..490c9d9329 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/context.rs @@ -1364,6 +1364,44 @@ impl ActorContext { } fn configure_sleep_hooks(&self) { + let keep_awake_ctx = self.clone(); + self.0 + .sleep + .work + .keep_awake + .register_change_callback(Arc::new(move || { + keep_awake_ctx + .0 + .metrics + .set_keep_awake_active(keep_awake_ctx.sleep_keep_awake_count()); + })); + + let internal_keep_awake_metric_ctx = self.clone(); + self.0 + .sleep + .work + .internal_keep_awake + .register_change_callback(Arc::new(move || { + internal_keep_awake_metric_ctx + .0 + .metrics + .set_internal_keep_awake_active( + internal_keep_awake_metric_ctx.sleep_internal_keep_awake_count(), + ); + })); + + let shutdown_tasks_ctx = self.clone(); + self.0 + .sleep + .work + .shutdown_counter + .register_change_callback(Arc::new(move || { + shutdown_tasks_ctx + .0 + .metrics + .set_shutdown_tasks_active(shutdown_tasks_ctx.shutdown_task_count()); + })); + let internal_keep_awake_ctx = self.clone(); self.set_internal_keep_awake(Some(Arc::new(move |future| { let ctx = internal_keep_awake_ctx.clone(); diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs index c81dfe4af3..47dbba86b0 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/metrics.rs @@ -1,7 +1,9 @@ +use std::collections::BTreeMap; use std::fmt; use std::sync::{Arc, LazyLock}; use std::time::Duration; +use parking_lot::Mutex; use rivet_metrics::prometheus::{ CounterVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, }; @@ -9,7 +11,9 @@ use rivet_metrics::prometheus::{ use crate::actor::task_types::{ShutdownKind, StateMutationReason, UserTaskKind}; const ACTOR_LABELS: &[&str] = &["actor_name"]; +const INBOX_LABELS: &[&str] = &["actor_name", "inbox"]; const USER_TASK_LABELS: &[&str] = &["actor_name", "kind"]; +const WORK_LABELS: &[&str] = &["actor_name", "kind"]; const SHUTDOWN_LABELS: &[&str] = &["actor_name", "reason"]; const STATE_MUTATION_LABELS: &[&str] = &["actor_name", "reason"]; const DIRECT_SHUTDOWN_LABELS: &[&str] = &["actor_name", "subsystem", "operation"]; @@ -29,6 +33,7 @@ pub(crate) struct ActorMetrics { #[derive(Debug)] struct ActorMetricInner { labels: ActorMetricLabels, + state: Mutex, } #[derive(Debug)] @@ -36,14 +41,41 @@ struct ActorMetricLabels { actor_name: String, } +#[derive(Debug, Default)] +struct ActorMetricState { + queue_depth: i64, + active_connections: i64, + lifecycle_inbox_depth: i64, + dispatch_inbox_depth: i64, + lifecycle_event_inbox_depth: i64, + user_tasks_active: BTreeMap<&'static str, i64>, + http_requests_active: i64, + keep_awake_active: i64, + internal_keep_awake_active: i64, + shutdown_tasks_active: i64, + #[cfg(feature = "sqlite-local")] + sqlite_worker_queue_depth: i64, + #[cfg(feature = "sqlite-local")] + sqlite_workers_active: i64, +} + struct ActorMetricCollectors { actor_active_count: IntGaugeVec, + actor_started_total: IntCounterVec, + actor_stopped_total: IntCounterVec, create_state_duration_seconds: HistogramVec, create_vars_duration_seconds: HistogramVec, + queue_depth: IntGaugeVec, queue_messages_sent_total: IntCounterVec, queue_messages_received_total: IntCounterVec, + active_connections: IntGaugeVec, connections_total: IntCounterVec, + inbox_depth: IntGaugeVec, + user_tasks_active: IntGaugeVec, user_task_duration_seconds: HistogramVec, + http_requests_active: IntGaugeVec, + keep_awake_active: IntGaugeVec, + shutdown_tasks_active: IntGaugeVec, shutdown_wait_seconds: HistogramVec, shutdown_timeout_total: CounterVec, state_mutation_total: CounterVec, @@ -75,6 +107,10 @@ struct ActorMetricCollectors { #[cfg(feature = "sqlite-local")] sqlite_vfs_commit_duration_seconds_total: CounterVec, #[cfg(feature = "sqlite-local")] + sqlite_worker_queue_depth: IntGaugeVec, + #[cfg(feature = "sqlite-local")] + sqlite_workers_active: IntGaugeVec, + #[cfg(feature = "sqlite-local")] sqlite_worker_queue_overload_total: IntCounterVec, #[cfg(feature = "sqlite-local")] sqlite_worker_command_duration_seconds: HistogramVec, @@ -102,6 +138,22 @@ impl ActorMetricCollectors { ACTOR_LABELS, ) .expect("create actor_active_count gauge"); + let actor_started_total = IntCounterVec::new( + Opts::new( + "rivetkit_actor_started_total", + "total actors started in this process", + ), + ACTOR_LABELS, + ) + .expect("create actor_started_total counter"); + let actor_stopped_total = IntCounterVec::new( + Opts::new( + "rivetkit_actor_stopped_total", + "total actors stopped in this process", + ), + ACTOR_LABELS, + ) + .expect("create actor_stopped_total counter"); let create_state_duration_seconds = HistogramVec::new( HistogramOpts::new( "rivetkit_actor_create_state_duration_seconds", @@ -118,6 +170,11 @@ impl ActorMetricCollectors { ACTOR_LABELS, ) .expect("create actor_create_vars_duration_seconds histogram"); + let queue_depth = IntGaugeVec::new( + Opts::new("rivetkit_actor_queue_depth", "current actor queue depth"), + ACTOR_LABELS, + ) + .expect("create actor_queue_depth gauge"); let queue_messages_sent_total = IntCounterVec::new( Opts::new("rivetkit_actor_queue_messages_sent_total", "total queue messages sent"), ACTOR_LABELS, @@ -131,6 +188,14 @@ impl ActorMetricCollectors { ACTOR_LABELS, ) .expect("create actor_queue_messages_received_total counter"); + let active_connections = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_connections_active", + "current active actor connections", + ), + ACTOR_LABELS, + ) + .expect("create actor_connections_active gauge"); let connections_total = IntCounterVec::new( Opts::new( "rivetkit_actor_connections_total", @@ -139,6 +204,16 @@ impl ActorMetricCollectors { ACTOR_LABELS, ) .expect("create actor_connections_total counter"); + let inbox_depth = IntGaugeVec::new( + Opts::new("rivetkit_actor_inbox_depth", "current actor inbox depth"), + INBOX_LABELS, + ) + .expect("create actor_inbox_depth gauge"); + let user_tasks_active = IntGaugeVec::new( + Opts::new("rivetkit_actor_user_tasks_active", "current active actor user tasks"), + USER_TASK_LABELS, + ) + .expect("create actor_user_tasks_active gauge"); let user_task_duration_seconds = HistogramVec::new( HistogramOpts::new( "rivetkit_actor_user_task_duration_seconds", @@ -147,6 +222,30 @@ impl ActorMetricCollectors { USER_TASK_LABELS, ) .expect("create actor_user_task_duration_seconds histogram"); + let http_requests_active = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_http_requests_active", + "current actor-scoped HTTP requests", + ), + ACTOR_LABELS, + ) + .expect("create actor_http_requests_active gauge"); + let keep_awake_active = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_keep_awake_active", + "current actor keep-awake work", + ), + WORK_LABELS, + ) + .expect("create actor_keep_awake_active gauge"); + let shutdown_tasks_active = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_shutdown_tasks_active", + "current actor work draining during shutdown", + ), + ACTOR_LABELS, + ) + .expect("create actor_shutdown_tasks_active gauge"); let shutdown_wait_seconds = HistogramVec::new( HistogramOpts::new( "rivetkit_actor_shutdown_wait_seconds", @@ -298,6 +397,24 @@ impl ActorMetricCollectors { ) .expect("create actor_sqlite_vfs_commit_duration_seconds_total counter"); #[cfg(feature = "sqlite-local")] + let sqlite_worker_queue_depth = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_sqlite_worker_queue_depth", + "current native SQLite worker SQL command queue depth", + ), + ACTOR_LABELS, + ) + .expect("create actor_sqlite_worker_queue_depth gauge"); + #[cfg(feature = "sqlite-local")] + let sqlite_workers_active = IntGaugeVec::new( + Opts::new( + "rivetkit_actor_sqlite_workers_active", + "current active native SQLite workers", + ), + ACTOR_LABELS, + ) + .expect("create actor_sqlite_workers_active gauge"); + #[cfg(feature = "sqlite-local")] let sqlite_worker_queue_overload_total = IntCounterVec::new( Opts::new( "rivetkit_actor_sqlite_worker_queue_overload_total", @@ -364,6 +481,8 @@ impl ActorMetricCollectors { .expect("create actor_sqlite_worker_unclean_close_total counter"); register_metric(&rivet_metrics::REGISTRY, actor_active_count.clone()); + register_metric(&rivet_metrics::REGISTRY, actor_started_total.clone()); + register_metric(&rivet_metrics::REGISTRY, actor_stopped_total.clone()); register_metric( &rivet_metrics::REGISTRY, create_state_duration_seconds.clone(), @@ -372,10 +491,17 @@ impl ActorMetricCollectors { &rivet_metrics::REGISTRY, create_vars_duration_seconds.clone(), ); + register_metric(&rivet_metrics::REGISTRY, queue_depth.clone()); register_metric(&rivet_metrics::REGISTRY, queue_messages_sent_total.clone()); register_metric(&rivet_metrics::REGISTRY, queue_messages_received_total.clone()); + register_metric(&rivet_metrics::REGISTRY, active_connections.clone()); register_metric(&rivet_metrics::REGISTRY, connections_total.clone()); + register_metric(&rivet_metrics::REGISTRY, inbox_depth.clone()); + register_metric(&rivet_metrics::REGISTRY, user_tasks_active.clone()); register_metric(&rivet_metrics::REGISTRY, user_task_duration_seconds.clone()); + register_metric(&rivet_metrics::REGISTRY, http_requests_active.clone()); + register_metric(&rivet_metrics::REGISTRY, keep_awake_active.clone()); + register_metric(&rivet_metrics::REGISTRY, shutdown_tasks_active.clone()); register_metric(&rivet_metrics::REGISTRY, shutdown_wait_seconds.clone()); register_metric(&rivet_metrics::REGISTRY, shutdown_timeout_total.clone()); register_metric(&rivet_metrics::REGISTRY, state_mutation_total.clone()); @@ -416,6 +542,8 @@ impl ActorMetricCollectors { &rivet_metrics::REGISTRY, sqlite_vfs_commit_duration_seconds_total.clone(), ); + register_metric(&rivet_metrics::REGISTRY, sqlite_worker_queue_depth.clone()); + register_metric(&rivet_metrics::REGISTRY, sqlite_workers_active.clone()); register_metric(&rivet_metrics::REGISTRY, sqlite_worker_queue_overload_total.clone()); register_metric( &rivet_metrics::REGISTRY, @@ -430,12 +558,21 @@ impl ActorMetricCollectors { Self { actor_active_count, + actor_started_total, + actor_stopped_total, create_state_duration_seconds, create_vars_duration_seconds, + queue_depth, queue_messages_sent_total, queue_messages_received_total, + active_connections, connections_total, + inbox_depth, + user_tasks_active, user_task_duration_seconds, + http_requests_active, + keep_awake_active, + shutdown_tasks_active, shutdown_wait_seconds, shutdown_timeout_total, state_mutation_total, @@ -467,6 +604,10 @@ impl ActorMetricCollectors { #[cfg(feature = "sqlite-local")] sqlite_vfs_commit_duration_seconds_total, #[cfg(feature = "sqlite-local")] + sqlite_worker_queue_depth, + #[cfg(feature = "sqlite-local")] + sqlite_workers_active, + #[cfg(feature = "sqlite-local")] sqlite_worker_queue_overload_total, #[cfg(feature = "sqlite-local")] sqlite_worker_command_duration_seconds, @@ -489,12 +630,20 @@ impl ActorMetrics { let labels = ActorMetricLabels { actor_name: actor_name.into(), }; - METRICS + let metrics = &*METRICS; + metrics .actor_active_count .with_label_values(&labels.as_label_values()) .inc(); + metrics + .actor_started_total + .with_label_values(&labels.as_label_values()) + .inc(); Self { - inner: Arc::new(ActorMetricInner { labels }), + inner: Arc::new(ActorMetricInner { + labels, + state: Mutex::new(ActorMetricState::default()), + }), } } @@ -520,7 +669,16 @@ impl ActorMetrics { .observe(duration.as_secs_f64()); } - pub(crate) fn set_queue_depth(&self, _depth: u32) {} + pub(crate) fn set_queue_depth(&self, depth: u32) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.queue_depth, + i64::from(depth), + &METRICS.queue_depth, + &labels, + ); + } pub(crate) fn add_queue_messages_sent(&self, count: u64) { METRICS @@ -536,7 +694,16 @@ impl ActorMetrics { .inc_by(count); } - pub(crate) fn set_active_connections(&self, _count: usize) {} + pub(crate) fn set_active_connections(&self, count: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.active_connections, + usize_to_i64(count), + &METRICS.active_connections, + &labels, + ); + } pub(crate) fn inc_connections_total(&self) { METRICS @@ -545,22 +712,113 @@ impl ActorMetrics { .inc(); } - pub(crate) fn set_lifecycle_inbox_depth(&self, _depth: usize) {} + pub(crate) fn set_lifecycle_inbox_depth(&self, depth: usize) { + self.set_inbox_depth("lifecycle", depth); + } - pub(crate) fn set_dispatch_inbox_depth(&self, _depth: usize) {} + pub(crate) fn set_dispatch_inbox_depth(&self, depth: usize) { + self.set_inbox_depth("dispatch", depth); + } - pub(crate) fn set_lifecycle_event_inbox_depth(&self, _depth: usize) {} + pub(crate) fn set_lifecycle_event_inbox_depth(&self, depth: usize) { + self.set_inbox_depth("lifecycle_event", depth); + } - pub(crate) fn begin_user_task(&self, _kind: UserTaskKind) {} + fn set_inbox_depth(&self, inbox: &'static str, depth: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + let current = match inbox { + "lifecycle" => &mut state.lifecycle_inbox_depth, + "dispatch" => &mut state.dispatch_inbox_depth, + "lifecycle_event" => &mut state.lifecycle_event_inbox_depth, + _ => unreachable!("unknown inbox metric label"), + }; + set_aggregated_gauge( + current, + usize_to_i64(depth), + &METRICS.inbox_depth, + &[labels[0], inbox], + ); + } + + pub(crate) fn begin_user_task(&self, kind: UserTaskKind) { + let labels = self.actor_labels(); + let kind = kind.as_metric_label(); + let mut state = self.inner.state.lock(); + let current = state.user_tasks_active.entry(kind).or_default(); + let next = (*current).saturating_add(1); + set_aggregated_gauge( + current, + next, + &METRICS.user_tasks_active, + &[labels[0], kind], + ); + } pub(crate) fn end_user_task(&self, kind: UserTaskKind, duration: Duration) { let labels = self.actor_labels(); + let kind = kind.as_metric_label(); + { + let mut state = self.inner.state.lock(); + let current = state.user_tasks_active.entry(kind).or_default(); + let next = (*current).saturating_sub(1); + set_aggregated_gauge( + current, + next, + &METRICS.user_tasks_active, + &[labels[0], kind], + ); + } METRICS .user_task_duration_seconds - .with_label_values(&[labels[0], kind.as_metric_label()]) + .with_label_values(&[labels[0], kind]) .observe(duration.as_secs_f64()); } + pub(crate) fn set_http_requests_active(&self, count: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.http_requests_active, + usize_to_i64(count), + &METRICS.http_requests_active, + &labels, + ); + } + + pub(crate) fn set_keep_awake_active(&self, count: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.keep_awake_active, + usize_to_i64(count), + &METRICS.keep_awake_active, + &[labels[0], "keep_awake"], + ); + } + + pub(crate) fn set_internal_keep_awake_active(&self, count: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.internal_keep_awake_active, + usize_to_i64(count), + &METRICS.keep_awake_active, + &[labels[0], "internal_keep_awake"], + ); + } + + pub(crate) fn set_shutdown_tasks_active(&self, count: usize) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.shutdown_tasks_active, + usize_to_i64(count), + &METRICS.shutdown_tasks_active, + &labels, + ); + } + pub(crate) fn observe_shutdown_wait(&self, reason: ShutdownKind, duration: Duration) { let labels = self.actor_labels(); METRICS @@ -596,10 +854,95 @@ impl ActorMetrics { impl Drop for ActorMetricInner { fn drop(&mut self) { - METRICS + self.clear_aggregated_gauges(); + let metrics = &*METRICS; + metrics .actor_active_count .with_label_values(&self.labels.as_label_values()) .dec(); + metrics + .actor_stopped_total + .with_label_values(&self.labels.as_label_values()) + .inc(); + } +} + +impl ActorMetricInner { + fn clear_aggregated_gauges(&self) { + let labels = self.labels.as_label_values(); + let mut state = self.state.lock(); + set_aggregated_gauge(&mut state.queue_depth, 0, &METRICS.queue_depth, &labels); + set_aggregated_gauge( + &mut state.active_connections, + 0, + &METRICS.active_connections, + &labels, + ); + set_aggregated_gauge( + &mut state.lifecycle_inbox_depth, + 0, + &METRICS.inbox_depth, + &[labels[0], "lifecycle"], + ); + set_aggregated_gauge( + &mut state.dispatch_inbox_depth, + 0, + &METRICS.inbox_depth, + &[labels[0], "dispatch"], + ); + set_aggregated_gauge( + &mut state.lifecycle_event_inbox_depth, + 0, + &METRICS.inbox_depth, + &[labels[0], "lifecycle_event"], + ); + for (kind, current) in state.user_tasks_active.iter_mut() { + set_aggregated_gauge( + current, + 0, + &METRICS.user_tasks_active, + &[labels[0], *kind], + ); + } + set_aggregated_gauge( + &mut state.http_requests_active, + 0, + &METRICS.http_requests_active, + &labels, + ); + set_aggregated_gauge( + &mut state.keep_awake_active, + 0, + &METRICS.keep_awake_active, + &[labels[0], "keep_awake"], + ); + set_aggregated_gauge( + &mut state.internal_keep_awake_active, + 0, + &METRICS.keep_awake_active, + &[labels[0], "internal_keep_awake"], + ); + set_aggregated_gauge( + &mut state.shutdown_tasks_active, + 0, + &METRICS.shutdown_tasks_active, + &labels, + ); + #[cfg(feature = "sqlite-local")] + { + set_aggregated_gauge( + &mut state.sqlite_worker_queue_depth, + 0, + &METRICS.sqlite_worker_queue_depth, + &labels, + ); + set_aggregated_gauge( + &mut state.sqlite_workers_active, + 0, + &METRICS.sqlite_workers_active, + &labels, + ); + } } } @@ -707,7 +1050,27 @@ impl depot_client::vfs::SqliteVfsMetrics for ActorMetrics { .inc_by(ns_to_seconds(total_ns)); } - fn set_worker_queue_depth(&self, _depth: u64) {} + fn set_worker_queue_depth(&self, depth: u64) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.sqlite_worker_queue_depth, + u64_to_i64(depth), + &METRICS.sqlite_worker_queue_depth, + &labels, + ); + } + + fn set_worker_active(&self, active: bool) { + let labels = self.actor_labels(); + let mut state = self.inner.state.lock(); + set_aggregated_gauge( + &mut state.sqlite_workers_active, + if active { 1 } else { 0 }, + &METRICS.sqlite_workers_active, + &labels, + ); + } fn record_worker_queue_overload(&self) { METRICS @@ -774,6 +1137,28 @@ fn sqlite_worker_duration_buckets() -> Vec { ] } +fn set_aggregated_gauge( + current: &mut i64, + next: i64, + gauge: &IntGaugeVec, + labels: &[&str], +) { + let delta = next.saturating_sub(*current); + if delta != 0 { + gauge.with_label_values(labels).add(delta); + *current = next; + } +} + +fn usize_to_i64(value: usize) -> i64 { + i64::try_from(value).unwrap_or(i64::MAX) +} + +#[cfg(feature = "sqlite-local")] +fn u64_to_i64(value: u64) -> i64 { + i64::try_from(value).unwrap_or(i64::MAX) +} + impl ActorMetricLabels { fn as_label_values(&self) -> [&str; 1] { [self.actor_name.as_str()] diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs index 9144fc1648..dd07197d04 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sleep.rs @@ -863,8 +863,14 @@ impl ActorContext { // re-evaluated when a request starts or completes. let ctx = self.clone(); counter.register_change_callback(Arc::new(move || { + ctx.0 + .metrics + .set_http_requests_active(ctx.active_http_request_count()); ctx.reset_sleep_timer(); })); + self.0 + .metrics + .set_http_requests_active(counter.load()); Some(counter) } } diff --git a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs index a401cefe2b..4c5326d5e5 100644 --- a/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs +++ b/rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs @@ -185,6 +185,9 @@ impl SqliteDb { )?; self.start_worker_failure_monitor(native_db.clone(), config); *self.db.lock() = Some(native_db); + if let Some(metrics) = self.vfs_metrics.as_ref() { + metrics.set_worker_active(true); + } Ok(()) } @@ -363,6 +366,9 @@ impl SqliteDb { if let Some(native_db) = native_db { let result = self.map_local_worker_result(native_db.close().await); self.abort_worker_failure_monitor(); + if let Some(metrics) = self.vfs_metrics.as_ref() { + metrics.set_worker_active(false); + } result?; } } diff --git a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs index 8346464f47..c25da3c2c1 100644 --- a/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs +++ b/rivetkit-rust/packages/rivetkit-core/tests/metrics.rs @@ -9,6 +9,8 @@ mod moved_tests { use rivet_metrics::prometheus::{IntGauge, Opts, Registry}; + use crate::actor::task_types::UserTaskKind; + use super::*; use super::metrics_helpers::{metric_line_for_actor, render_global_metrics}; @@ -87,12 +89,85 @@ mod moved_tests { assert!(line.ends_with('0'), "actor should be inactive: {line}"); } + #[test] + fn actor_current_gauges_aggregate_by_actor_name() { + let actor_name = "counter-gauge-aggregate"; + let first = ActorMetrics::new(actor_name); + let second = ActorMetrics::new(actor_name); + + first.set_active_connections(2); + second.set_active_connections(3); + first.set_queue_depth(4); + second.set_queue_depth(5); + first.set_dispatch_inbox_depth(6); + second.set_dispatch_inbox_depth(7); + first.begin_user_task(UserTaskKind::Action); + second.begin_user_task(UserTaskKind::Action); + + let rendered = render_global_metrics(); + assert_metric_value(&rendered, "rivetkit_actor_connections_active", actor_name, "5"); + assert_metric_value(&rendered, "rivetkit_actor_queue_depth", actor_name, "9"); + assert_metric_value_with_label( + &rendered, + "rivetkit_actor_inbox_depth", + actor_name, + "inbox=\"dispatch\"", + "13", + ); + assert_metric_value_with_label( + &rendered, + "rivetkit_actor_user_tasks_active", + actor_name, + "kind=\"action\"", + "2", + ); + + first.set_active_connections(1); + first.end_user_task(UserTaskKind::Action, Duration::from_millis(1)); + drop(first); + + let rendered = render_global_metrics(); + assert_metric_value(&rendered, "rivetkit_actor_connections_active", actor_name, "3"); + assert_metric_value(&rendered, "rivetkit_actor_queue_depth", actor_name, "5"); + assert_metric_value_with_label( + &rendered, + "rivetkit_actor_user_tasks_active", + actor_name, + "kind=\"action\"", + "1", + ); + + drop(second); + + let rendered = render_global_metrics(); + assert_metric_value(&rendered, "rivetkit_actor_connections_active", actor_name, "0"); + assert_metric_value(&rendered, "rivetkit_actor_queue_depth", actor_name, "0"); + assert_metric_value_with_label( + &rendered, + "rivetkit_actor_user_tasks_active", + actor_name, + "kind=\"action\"", + "0", + ); + } + fn assert_metric_value(metrics: &str, name: &str, actor_name: &str, value: &str) { + assert_metric_value_with_label(metrics, name, actor_name, "", value); + } + + fn assert_metric_value_with_label( + metrics: &str, + name: &str, + actor_name: &str, + label: &str, + value: &str, + ) { let line = metrics .lines() .find(|line| { line.starts_with(name) && line.contains(&format!("actor_name=\"{actor_name}\"")) + && (label.is_empty() || line.contains(label)) }) .unwrap_or_else(|| panic!("{name} should render")); assert!(line.ends_with(value), "{name} should have value {value}: {line}");